import os
import time
from collections.abc import AsyncGenerator
-from typing import Any
+from typing import TYPE_CHECKING, Any
import aiohttp
from music_assistant_models.enums import (
from music_assistant_models.errors import (
LoginFailed,
MediaNotFoundError,
+ ProviderUnavailableError,
ResourceTemporarilyUnavailable,
+ UnsupportedFeaturedException,
)
from music_assistant_models.media_items import (
Album,
Artist,
+ Audiobook,
AudioFormat,
MediaItemImage,
MediaItemType,
Track,
UniqueList,
)
+from music_assistant_models.media_items.metadata import MediaItemChapter
from music_assistant_models.streamdetails import StreamDetails
from music_assistant.controllers.cache import use_cache
from .constants import (
CONF_CLIENT_ID,
- CONF_PLAYED_THRESHOLD,
CONF_REFRESH_TOKEN,
- CONF_SYNC_PLAYED_STATUS,
+ CONF_SYNC_AUDIOBOOK_PROGRESS,
+ CONF_SYNC_PODCAST_PROGRESS,
LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX,
)
from .helpers import get_librespot_binary
from .parsers import (
parse_album,
parse_artist,
+ parse_audiobook,
parse_playlist,
parse_podcast,
parse_podcast_episode,
)
from .streaming import LibrespotStreamer
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant import MusicAssistant
+
class SpotifyProvider(MusicProvider):
"""Implementation of a Spotify MusicProvider."""
custom_client_id_active: bool = False
throttler: ThrottlerManager
+ def __init__(
+ self,
+ mass: MusicAssistant,
+ manifest: ProviderManifest,
+ config: ProviderConfig,
+ supported_features: set[ProviderFeature],
+ ) -> None:
+ """Initialize the provider."""
+ super().__init__(mass, manifest, config)
+ self._base_supported_features = supported_features
+
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self.cache_dir = os.path.join(self.mass.cache_path, self.instance_id)
self._librespot_bin = await get_librespot_binary()
# try login which will raise if it fails
await self.login()
+ self._audiobooks_supported = await self._test_audiobook_support()
+ if not self._audiobooks_supported:
+ self.logger.info(
+ "Audiobook support disabled: Audiobooks are not available in your region. "
+ "See https://support.spotify.com/us/authors/article/audiobooks-availability/ "
+ "for supported countries."
+ )
+
+ async def _test_audiobook_support(self) -> bool:
+ """Test if audiobooks are supported in user's region."""
+ try:
+ await self._get_data("me/audiobooks", limit=1)
+ return True
+ except aiohttp.ClientResponseError as e:
+ if e.status == 403:
+ return False # Not available
+ raise # Re-raise other HTTP errors
+ except (MediaNotFoundError, ProviderUnavailableError):
+ return False
@property
- def sync_played_status_enabled(self) -> bool:
- """Check if played status sync is enabled."""
- value = self.config.get_value(CONF_SYNC_PLAYED_STATUS, True)
- return bool(value) if value is not None else True
+ def audiobooks_supported(self) -> bool:
+ """Check if audiobooks are supported for this user/region."""
+ return getattr(self, "_audiobooks_supported", False)
@property
- def played_threshold(self) -> float:
- """Get the played threshold percentage."""
- value = self.config.get_value(CONF_PLAYED_THRESHOLD, 90)
- if isinstance(value, (int, float)):
- # Convert from 1-100 percentage to 0.0-1.0 decimal
- return float(value) / 100.0
- elif isinstance(value, str):
- try:
- return float(value) / 100.0
- except ValueError:
- return 0.9 # fallback to default (90%)
- else:
- return 0.9 # fallback to default for any other type
+ def audiobook_progress_sync_enabled(self) -> bool:
+ """Check if audiobook progress sync is enabled."""
+ return bool(self.config.get_value(CONF_SYNC_AUDIOBOOK_PROGRESS, False))
+
+ @property
+ def podcast_progress_sync_enabled(self) -> bool:
+ """Check if played status sync is enabled."""
+ value = self.config.get_value(CONF_SYNC_PODCAST_PROGRESS, True)
+ return bool(value) if value is not None else True
@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- features = self._supported_features
+ features = self._base_supported_features.copy()
+ # Add audiobook features if enabled
+ if self.audiobooks_supported:
+ features.add(ProviderFeature.LIBRARY_AUDIOBOOKS)
+ features.add(ProviderFeature.LIBRARY_AUDIOBOOKS_EDIT)
+
if not self.custom_client_id_active:
# Spotify has killed the similar tracks api for developers
# https://developer.spotify.com/blog/2024-11-27-changes-to-the-web-api
return {*features, ProviderFeature.SIMILAR_TRACKS}
+
return features
@property
searchtypes.append("playlist")
if MediaType.PODCAST in media_types:
searchtypes.append("show")
+ if MediaType.AUDIOBOOK in media_types and self.audiobooks_supported:
+ searchtypes.append("audiobook")
if not searchtypes:
return searchresult
searchtype = ",".join(searchtypes)
podcasts.append(parse_podcast(item, self))
searchresult.podcasts = [*searchresult.podcasts, *podcasts]
items_received += len(api_result["shows"]["items"])
+ if "audiobooks" in api_result and self.audiobooks_supported:
+ audiobooks = [
+ parse_audiobook(item, self)
+ for item in api_result["audiobooks"]["items"]
+ if (item and item["id"])
+ ]
+ searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
+ items_received += len(api_result["audiobooks"]["items"])
offset += page_limit
if offset >= limit:
break
continue
yield parse_podcast(show_obj, self)
+ async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
+ """Retrieve library audiobooks from spotify."""
+ if not self.audiobooks_supported:
+ return
+ async for item in self._get_all_items("me/audiobooks"):
+ if item and item["id"]:
+ # Parse the basic audiobook
+ audiobook = parse_audiobook(item, self)
+ # Add chapters from Spotify API data
+ await self._add_audiobook_chapters(audiobook)
+ yield audiobook
+
def _get_liked_songs_playlist_id(self) -> str:
return f"{LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX}-{self.instance_id}"
if liked_songs.metadata.images is None:
liked_songs.metadata.images = UniqueList([image])
else:
- liked_songs.metadata.images.append(image)
+ liked_songs.metadata.add_image(image)
liked_songs.cache_checksum = str(time.time())
raise MediaNotFoundError(f"Podcast not found: {prov_podcast_id}")
return parse_podcast(podcast_obj, self)
+ @use_cache(86400) # 24 hours
+ async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
+ """Get full audiobook details by id."""
+ if not self.audiobooks_supported:
+ raise UnsupportedFeaturedException("Audiobooks are not supported with this account")
+
+ audiobook_obj = await self._get_data(f"audiobooks/{prov_audiobook_id}")
+ if not audiobook_obj:
+ raise MediaNotFoundError(f"Audiobook not found: {prov_audiobook_id}")
+
+ # Parse basic audiobook without chapters first
+ audiobook = parse_audiobook(audiobook_obj, self)
+
+ # Add chapters from Spotify API data
+ await self._add_audiobook_chapters(audiobook)
+
+ # Note: Resume position will be handled by MA's internal system
+ # which calls get_resume_position() when needed
+
+ return audiobook
+
+ async def _add_audiobook_chapters(self, audiobook: Audiobook) -> None:
+ """Add chapter metadata to an audiobook from Spotify API data."""
+ try:
+ chapters_data = await self._get_audiobook_chapters_data(audiobook.item_id)
+ if chapters_data:
+ chapters = []
+ total_duration_seconds = 0.0
+
+ for idx, chapter in enumerate(chapters_data):
+ duration_ms = chapter.get("duration_ms", 0)
+ duration_seconds = duration_ms / 1000.0
+
+ chapter_obj = MediaItemChapter(
+ position=idx + 1,
+ name=chapter.get("name", f"Chapter {idx + 1}"),
+ start=total_duration_seconds,
+ end=total_duration_seconds + duration_seconds,
+ )
+ chapters.append(chapter_obj)
+ total_duration_seconds += duration_seconds
+
+ audiobook.metadata.chapters = chapters
+ audiobook.duration = int(total_duration_seconds)
+
+ except (MediaNotFoundError, ResourceTemporarilyUnavailable, ProviderUnavailableError) as e:
+ self.logger.warning(f"Failed to get chapters for audiobook {audiobook.item_id}: {e}")
+
@use_cache(43200) # 12 hours - balances freshness with performance
async def _get_podcast_episodes_data(self, prov_podcast_id: str) -> list[dict[str, Any]]:
"""Get raw episode data from Spotify API (cached).
return episodes_data
+ @use_cache(7200) # 2 hours - shorter cache for resume point data
+ async def _get_audiobook_chapters_data(self, prov_audiobook_id: str) -> list[dict[str, Any]]:
+ """Get raw chapter data from Spotify API (cached).
+
+ Args:
+ prov_audiobook_id: Spotify audiobook ID
+
+ Returns:
+ List of chapter data dictionaries
+ """
+ chapters_data: list[dict[str, Any]] = []
+
+ try:
+ async for item in self._get_all_items(
+ f"audiobooks/{prov_audiobook_id}/chapters", market="from_token"
+ ):
+ if item and item.get("id"):
+ chapters_data.append(item)
+ except MediaNotFoundError:
+ self.logger.warning("Audiobook %s not found", prov_audiobook_id)
+ return []
+ except ResourceTemporarilyUnavailable as err:
+ self.logger.warning(
+ "Temporary error fetching chapters for %s: %s", prov_audiobook_id, err
+ )
+ raise
+
+ return chapters_data
+
async def get_podcast_episodes(
self, prov_podcast_id: str
) -> AsyncGenerator[PodcastEpisode, None]:
"""Get all podcast episodes."""
- # Get podcast object for context if available - this should be cached from previous calls
+ # Get podcast object for context if available
podcast: Podcast | None = None
-
try:
podcast = await self.mass.music.podcasts.get_provider_item(
prov_podcast_id, self.instance_id
)
except MediaNotFoundError:
- self.logger.debug("Podcast %s not found in Music Assistant library", prov_podcast_id)
-
- # If we don't have the podcast from MA context, get it via the API
- if not podcast:
+ # If not in MA library, get it via API (this is cached)
try:
- podcast = await self.get_podcast(prov_podcast_id) # This is cached
+ podcast = await self.get_podcast(prov_podcast_id)
except MediaNotFoundError:
self.logger.warning(
"Podcast with ID %s is no longer available on Spotify", prov_podcast_id
episode.position = idx + 1
# Set played status if sync is enabled and resume data exists
- if self.sync_played_status_enabled and "resume_point" in episode_data:
+ if self.podcast_progress_sync_enabled and "resume_point" in episode_data:
resume_point = episode_data["resume_point"]
fully_played = resume_point.get("fully_played", False)
position_ms = resume_point.get("resume_position_ms", 0)
- # Apply threshold logic
- if not fully_played and episode_data.get("duration_ms", 0) > 0:
- completion_ratio = position_ms / episode_data["duration_ms"]
- if completion_ratio >= self.played_threshold:
- fully_played = True
-
episode.fully_played = fully_played if fully_played else None
episode.resume_position_ms = position_ms if position_ms > 0 else None
return parse_podcast_episode(episode_obj, self)
async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
- """Get resume position for episode from Spotify."""
- if media_type != MediaType.PODCAST_EPISODE:
- raise NotImplementedError("Resume position only supported for podcast episodes")
-
- if not self.sync_played_status_enabled:
- raise NotImplementedError("Spotify resume sync disabled in settings")
+ """Get resume position for episode/audiobook from Spotify."""
+ if media_type == MediaType.PODCAST_EPISODE:
+ if not self.podcast_progress_sync_enabled:
+ raise NotImplementedError("Spotify podcast resume sync disabled in settings")
- episode_obj = await self._get_data(f"episodes/{item_id}", market="from_token")
-
- if not episode_obj:
- raise NotImplementedError("No episode data from Spotify")
-
- if "resume_point" not in episode_obj or not episode_obj["resume_point"]:
- raise NotImplementedError("No resume point data from Spotify")
+ try:
+ episode_obj = await self._get_data(f"episodes/{item_id}", market="from_token")
+ except MediaNotFoundError:
+ raise NotImplementedError("Episode not found on Spotify")
+ except (ResourceTemporarilyUnavailable, aiohttp.ClientError) as e:
+ self.logger.debug(f"Error fetching episode {item_id}: {e}")
+ raise NotImplementedError("Unable to fetch episode data from Spotify")
+
+ if (
+ not episode_obj
+ or "resume_point" not in episode_obj
+ or not episode_obj["resume_point"]
+ ):
+ raise NotImplementedError("No resume point data from Spotify")
- try:
resume_point = episode_obj["resume_point"]
fully_played = resume_point.get("fully_played", False)
position_ms = resume_point.get("resume_position_ms", 0)
+ return fully_played, position_ms
- # Apply played threshold logic
- if not fully_played and episode_obj.get("duration_ms", 0) > 0:
- completion_ratio = position_ms / episode_obj["duration_ms"]
- if completion_ratio >= self.played_threshold:
- fully_played = True
+ elif media_type == MediaType.AUDIOBOOK:
+ if not self.audiobooks_supported:
+ raise NotImplementedError("Audiobook support is disabled")
+ if not self.audiobook_progress_sync_enabled:
+ raise NotImplementedError("Spotify audiobook resume sync disabled in settings")
- return fully_played, position_ms
- except (KeyError, TypeError, AttributeError) as e:
- self.logger.debug(f"Invalid resume point data structure for {item_id}: {e}")
- raise NotImplementedError("Invalid resume point data from Spotify")
+ try:
+ chapters_data = await self._get_audiobook_chapters_data(item_id)
+ if not chapters_data:
+ raise NotImplementedError("No chapters data available")
+
+ total_position_ms = 0
+ fully_played = True
+
+ for chapter in chapters_data:
+ resume_point = chapter.get("resume_point", {})
+ chapter_fully_played = resume_point.get("fully_played", False)
+ chapter_position_ms = resume_point.get("resume_position_ms", 0)
+
+ if chapter_fully_played:
+ total_position_ms += chapter.get("duration_ms", 0)
+ elif chapter_position_ms > 0:
+ total_position_ms += chapter_position_ms
+ fully_played = False
+ break
+ else:
+ fully_played = False
+ break
+
+ return fully_played, total_position_ms
+
+ except (MediaNotFoundError, ResourceTemporarilyUnavailable, aiohttp.ClientError) as e:
+ self.logger.debug(f"Failed to get audiobook resume position for {item_id}: {e}")
+ raise NotImplementedError("Unable to get audiobook resume position from Spotify")
+
+ else:
+ raise NotImplementedError(f"Resume position not supported for {media_type}")
async def on_played(
self,
is_playing: bool = False,
) -> None:
"""
- Call when an episode is played in MA.
+ Call when an episode/audiobook is played in MA.
- Note: This CANNOT sync back to Spotify as there's no API for it.
- This is just for logging/monitoring purposes.
+ MA automatically handles internal position tracking - this method is for
+ provider-specific actions like syncing to external services.
"""
- if media_type != MediaType.PODCAST_EPISODE:
- return
+ if media_type == MediaType.PODCAST_EPISODE:
+ if not isinstance(media_item, PodcastEpisode):
+ return
+
+ # Log the playback for monitoring/debugging
+ safe_position = position or 0
+ if media_item.duration > 0:
+ completion_percentage = (safe_position / media_item.duration) * 100
+ else:
+ completion_percentage = 0
- if not isinstance(media_item, PodcastEpisode):
- return
+ self.logger.debug(
+ f"Episode played: {prov_item_id} at {safe_position}s "
+ f"({completion_percentage:.1f}%, fully_played: {fully_played})"
+ )
- # Handle case where position might be None (e.g., when marked as played in UI)
- safe_position = position or 0
- if media_item.duration > 0:
- completion_percentage = (safe_position / media_item.duration) * 100
- else:
- completion_percentage = 0
+ # Note: No API exists to sync playback position back to Spotify for episodes
+ # MA handles all internal position tracking automatically
- self.logger.debug(
- f"Episode played in MA: {prov_item_id} "
- f"({completion_percentage:.1f}%, fully_played: {fully_played}) "
- f"- Cannot sync back to Spotify due to API limitations"
- )
+ elif media_type == MediaType.AUDIOBOOK:
+ if not isinstance(media_item, Audiobook):
+ return
+
+ # Log the playback for monitoring/debugging
+ safe_position = position or 0
+ if media_item.duration > 0:
+ completion_percentage = (safe_position / media_item.duration) * 100
+ else:
+ completion_percentage = 0
+
+ self.logger.debug(
+ f"Audiobook played: {prov_item_id} at {safe_position}s "
+ f"({completion_percentage:.1f}%, fully_played: {fully_played})"
+ )
+
+ # Note: No API exists to sync playback position back to Spotify for audiobooks
+ # MA handles all internal position tracking automatically
+
+ # The resume position will be automatically updated by MA's internal tracking
+ # and will be retrieved via get_audiobook() which combines MA + Spotify positions
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get all album tracks for given album id."""
await self._put_data(f"playlists/{item.item_id}/followers", data={"public": False})
elif item.media_type == MediaType.PODCAST:
await self._put_data("me/shows", ids=item.item_id)
+ elif item.media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
+ # For audiobooks, we need special handling to ensure chapter metadata is included
+ self.logger.info(f"Adding audiobook {item.item_id} to library with chapter metadata")
+
+ # First add to Spotify library
+ await self._put_data("me/audiobooks", ids=item.item_id)
+
+ # Then get the full audiobook metadata with chapters by calling our get_audiobook method
+ try:
+ full_audiobook = await self.get_audiobook(item.item_id)
+
+ # Update the audiobook in MA's database with the full metadata including chapters
+ # This ensures when the user plays it from library, it has chapter information
+ await self.mass.music.audiobooks.add_item_to_library(full_audiobook)
+ self.logger.info(
+ f"Updated audiobook {item.item_id} in MA database with chapter metadata"
+ )
+
+ except MediaNotFoundError as e:
+ self.logger.warning(
+ f"Audiobook {item.item_id} not found when fetching chapter metadata: {e}"
+ )
+ except ResourceTemporarilyUnavailable as e:
+ self.logger.warning(
+ "Spotify temporarily unavailable when "
+ f"fetching audiobook {item.item_id} metadata: {e}"
+ )
+ except ProviderUnavailableError as e:
+ self.logger.warning(
+ f"Provider unavailable when fetching audiobook {item.item_id} metadata: {e}"
+ )
+ except Exception as e:
+ # Catch any other unexpected errors
+ self.logger.warning(
+ f"Unexpected error fetching audiobook {item.item_id} metadata: {e}"
+ )
+ self.logger.debug(f"Full error details: {e}", exc_info=True)
return True
async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
await self._delete_data(f"playlists/{prov_item_id}/followers")
elif media_type == MediaType.PODCAST:
await self._delete_data("me/shows", ids=prov_item_id)
+ elif media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
+ await self._delete_data("me/audiobooks", ids=prov_item_id)
return True
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
return [parse_track(item, self) for item in items["tracks"] if (item and item["id"])]
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
- """Return the content details for the given track/episode when it will be streamed."""
+ """Return content details for the given track/episode/audiobook when it will be streamed."""
+ if media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
+ chapters_data = await self._get_audiobook_chapters_data(item_id)
+ if not chapters_data:
+ raise MediaNotFoundError(f"No chapters found for audiobook {item_id}")
+
+ # Calculate total duration and convert to seconds for StreamDetails
+ total_duration_ms = sum(chapter.get("duration_ms", 0) for chapter in chapters_data)
+ duration_seconds = total_duration_ms // 1000
+
+ # Create chapter URIs for streaming
+ chapter_uris = []
+ for chapter in chapters_data:
+ chapter_id = chapter["id"]
+ chapter_uri = f"spotify://episode:{chapter_id}"
+ chapter_uris.append(chapter_uri)
+
+ return StreamDetails(
+ item_id=item_id,
+ provider=self.lookup_key,
+ media_type=MediaType.AUDIOBOOK,
+ audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
+ stream_type=StreamType.CUSTOM,
+ allow_seek=True,
+ can_seek=True,
+ duration=duration_seconds,
+ data={"chapters": chapter_uris, "chapters_data": chapters_data},
+ )
+
+ # For all other media types (tracks, podcast episodes)
return StreamDetails(
item_id=item_id,
provider=self.lookup_key,
media_type=media_type,
- audio_format=AudioFormat(
- content_type=ContentType.OGG,
- bit_rate=320,
- ),
+ audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
stream_type=StreamType.CUSTOM,
allow_seek=True,
can_seek=True,
async def get_audio_stream(
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- async for chunk in self.streamer.get_audio_stream(streamdetails, seek_position):
- yield chunk
+ """Get audio stream from Spotify via librespot."""
+ if streamdetails.media_type == MediaType.AUDIOBOOK and isinstance(streamdetails.data, dict):
+ chapter_uris = streamdetails.data.get("chapters", [])
+ chapters_data = streamdetails.data.get("chapters_data", [])
+
+ # Calculate which chapter to start from based on seek_position
+ seek_position_ms = seek_position * 1000
+ current_seek_ms = seek_position_ms
+ start_chapter = 0
+
+ if seek_position > 0 and chapters_data:
+ accumulated_duration_ms = 0
+
+ for i, chapter_data in enumerate(chapters_data):
+ chapter_duration_ms = chapter_data.get("duration_ms", 0)
+
+ if accumulated_duration_ms + chapter_duration_ms > seek_position_ms:
+ start_chapter = i
+ current_seek_ms = seek_position_ms - accumulated_duration_ms
+ break
+ accumulated_duration_ms += chapter_duration_ms
+ else:
+ start_chapter = len(chapter_uris) - 1
+ current_seek_ms = 0
+
+ # Convert back to seconds for librespot
+ current_seek_seconds = int(current_seek_ms // 1000)
+
+ # Stream chapters starting from the calculated position
+ for i in range(start_chapter, len(chapter_uris)):
+ chapter_uri = chapter_uris[i]
+ chapter_seek = current_seek_seconds if i == start_chapter else 0
+
+ try:
+ async for chunk in self.streamer.stream_spotify_uri(chapter_uri, chapter_seek):
+ yield chunk
+ except Exception as e:
+ self.logger.error(f"Chapter {i + 1} streaming failed: {e}")
+ continue
+ else:
+ # Handle normal tracks and podcast episodes
+ async for chunk in self.streamer.get_audio_stream(streamdetails, seek_position):
+ yield chunk
@lock
async def login(self, force_refresh: bool = False) -> dict[str, Any]: