from __future__ import annotations
-import asyncio
import contextlib
-import json
import os
import platform
-import shutil
import time
-from json.decoder import JSONDecodeError
-from tempfile import gettempdir
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
+from urllib.parse import urlencode
from music_assistant.common.helpers.json import json_loads
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.streamdetails import StreamDetails
# pylint: disable=no-name-in-module
+from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.app_vars import app_var
# pylint: enable=no-name-in-module
from music_assistant.server.helpers.audio import get_chunksize
+from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.music_provider import MusicProvider
CONF_CLIENT_ID = "client_id"
CONF_ACTION_AUTH = "auth"
-
-CACHE_DIR = gettempdir()
+CONF_ACCESS_TOKEN = "access_token"
+CONF_REFRESH_TOKEN = "refresh_token"
+CONF_AUTH_EXPIRES_AT = "expires_at"
+CONF_ACTION_CLEAR_AUTH = "clear_auth"
+SCOPE = [
+ "playlist-read",
+ "playlist-read-private",
+ "playlist-read-collaborative",
+ "playlist-modify-public",
+ "playlist-modify-private",
+ "user-follow-modify",
+ "user-follow-read",
+ "user-library-read",
+ "user-library-modify",
+ "user-read-private",
+ "user-read-email",
+ "user-top-read",
+ "app-remote-control",
+ "streaming",
+ "user-read-playback-state",
+ "user-modify-playback-state",
+ "user-read-currently-playing",
+ "user-modify-private",
+ "user-modify",
+ "user-read-playback-position",
+ "user-read-recently-played",
+]
+
+CALLBACK_REDIRECT_URL = "https://music-assistant.io/callback"
+
+CACHE_DIR = "/tmp/spotify_cache" # noqa: S108
LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX = "liked_songs"
-SETUP_STORAGE_PATH = "spotify-setup"
SUPPORTED_FEATURES = (
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
- data_dir = os.path.join(mass.storage_path, instance_id or SETUP_STORAGE_PATH)
- data_dir_exists = await asyncio.to_thread(os.path.isdir, data_dir)
if action == CONF_ACTION_AUTH:
- # authenticate with spotify using spotify connect
- # NOTE: we like to switch to Spotify PKCE auth (and even have a branch
- # where this is implemented), but librespot is not yet supporting this
- # once Librespot supports using a Bearer token, we can switch to PKCE
- # and keep the oauth flow in MA itself.
- if not data_dir_exists:
- await asyncio.to_thread(os.makedirs, data_dir)
- # use librespot to perform auth using spotify connect
- librespot_bin = await get_librespot_binary()
- args = [
- librespot_bin,
- "-c",
- data_dir,
- "-a",
- "-n",
- "MUSIC_ASSISTANT",
- ]
- async with asyncio.timeout(300):
- _returncode, output = await check_output(*args)
- if _returncode == 0 and output.decode().strip() != "authorized":
- raise LoginFailed("Authentication failed")
-
- elif not instance_id or not data_dir_exists:
- # authentication required
- return (
- ConfigEntry(
- key="warn",
- type=ConfigEntryType.ALERT,
- label="Spotify needs to be authenticated with your account.\n\n"
- "Click the authenticate button below to start the authentication process.\n\n"
- "Then open the Spotify app on your device and select MUSIC_ASSISTANT "
- "as playback device to authenticate.",
- required=True,
- ),
- ConfigEntry(
- key=CONF_ACTION_AUTH,
- type=ConfigEntryType.ACTION,
- label="Authenticate Spotify",
- required=True,
- ),
- ConfigEntry(
- key=CONF_CLIENT_ID,
- type=ConfigEntryType.SECURE_STRING,
- label=CONF_CLIENT_ID,
- hidden=True,
- required=False,
- ),
+ # spotify PKCE auth flow
+ # https://developer.spotify.com/documentation/web-api/tutorials/code-pkce-flow
+ import pkce
+
+ code_verifier, code_challenge = pkce.generate_pkce_pair()
+ async with AuthenticationHelper(mass, cast(str, values["session_id"])) as auth_helper:
+ params = {
+ "response_type": "code",
+ "client_id": values.get(CONF_CLIENT_ID) or app_var(2),
+ "scope": " ".join(SCOPE),
+ "code_challenge_method": "S256",
+ "code_challenge": code_challenge,
+ "redirect_uri": CALLBACK_REDIRECT_URL,
+ "state": auth_helper.callback_url,
+ }
+ query_string = urlencode(params)
+ url = f"https://accounts.spotify.com/authorize?{query_string}"
+ result = await auth_helper.authenticate(url)
+ authorization_code = result["code"]
+ # now get the access token
+ params = {
+ "grant_type": "authorization_code",
+ "code": authorization_code,
+ "redirect_uri": CALLBACK_REDIRECT_URL,
+ "client_id": values.get(CONF_CLIENT_ID) or app_var(2),
+ "code_verifier": code_verifier,
+ }
+ async with mass.http_session.post(
+ "https://accounts.spotify.com/api/token", data=params
+ ) as response:
+ result = await response.json()
+ values[CONF_ACCESS_TOKEN] = result["access_token"]
+ values[CONF_REFRESH_TOKEN] = result["refresh_token"]
+ values[CONF_AUTH_EXPIRES_AT] = int(time.time() + result["expires_in"])
+
+ auth_required = values.get(CONF_REFRESH_TOKEN) is None
+
+ if auth_required:
+ label_text = (
+ "You need to authenticate to Spotify. Click the authenticate button below "
+ "to start the authentication process which will open in a new (popup window), "
+ "so make sure to disable any popup blockers.\n\n"
)
+ elif action == CONF_ACTION_AUTH:
+ label_text = "Authenticated to Spotify. Press save to complete setup."
+ else:
+ label_text = "Authenticated to Spotify. No further action required."
- # return the default config entries
return (
ConfigEntry(
- key="label_authenticated",
+ key="label_text",
type=ConfigEntryType.LABEL,
- label="Authenticated to Spotify",
+ label=label_text,
),
ConfigEntry(
- key="label_whitespace",
- type=ConfigEntryType.LABEL,
- label=" ",
+ key=CONF_ACCESS_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label=CONF_ACCESS_TOKEN,
+ hidden=True,
+ value=values.get(CONF_ACCESS_TOKEN) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_REFRESH_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label=CONF_REFRESH_TOKEN,
+ hidden=True,
+ required=True,
+ value=values.get(CONF_REFRESH_TOKEN) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_AUTH_EXPIRES_AT,
+ type=ConfigEntryType.INTEGER,
+ label=CONF_AUTH_EXPIRES_AT,
+ hidden=True,
+ default_value=0,
+ value=values.get(CONF_AUTH_EXPIRES_AT) if values else None,
),
ConfigEntry(
key=CONF_CLIENT_ID,
label="Client ID (optional)",
description="By default, a generic client ID is used which is heavy rate limited. "
"It is advised that you create your own Spotify Developer account and use "
- "that client ID here to speedup performance.",
+ "that client ID here to speedup performance. \n\n"
+ f"Use {CALLBACK_REDIRECT_URL} as callback URL.",
required=False,
+ value=values.get(CONF_CLIENT_ID) if values else None,
+ hidden=not auth_required,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_AUTH,
+ type=ConfigEntryType.ACTION,
+ label="Authenticate with Spotify",
+ description="This button will redirect you to Spotify to authenticate.",
+ action=CONF_ACTION_AUTH,
+ hidden=not auth_required,
),
)
class SpotifyProvider(MusicProvider):
"""Implementation of a Spotify MusicProvider."""
- _auth_token: str | None = None
+ _auth_info: str | None = None
_sp_user: dict[str, Any] | None = None
_librespot_bin: str | None = None
# rate limiter needs to be specified on provider-level,
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self._librespot_bin = await get_librespot_binary()
- self._ap_workaround = False
- data_dir = os.path.join(self.mass.storage_path, self.instance_id)
- setup_data_dir = os.path.join(self.mass.storage_path, SETUP_STORAGE_PATH)
- data_dir_exists = await asyncio.to_thread(os.path.isdir, data_dir)
- setup_dir_exists = await asyncio.to_thread(os.path.isdir, setup_data_dir)
- if not data_dir_exists and setup_dir_exists:
- # complete setup: move setup data to data dir
- await asyncio.to_thread(os.rename, setup_data_dir, data_dir)
- elif not data_dir_exists:
- raise LoginFailed("Spotify is not authenticated")
- self._data_dir = data_dir
if self.config.get_value(CONF_CLIENT_ID):
# loosen the throttler a bit when a custom client id is used
self.throttler.rate_limit = 45
self.throttler.period = 30
+ # check if we have a librespot binary for this arch
+ await self.get_librespot_binary()
# try login which will raise if it fails
- await self.get_token()
+ await self.login()
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
ProviderFeature.SIMILAR_TRACKS,
)
+ @property
+ def name(self) -> str:
+ """Return (custom) friendly name for this provider instance."""
+ if self._sp_user:
+ postfix = self._sp_user["display_name"]
+ return f"{self.manifest.name}: {postfix}"
+ return super().name
+
async def search(
self, search_query: str, media_types=list[MediaType], limit: int = 5
) -> SearchResults:
MediaItemImage(
type=ImageType.THUMB,
path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
- provider=self.lookup_key,
+ provider=self.domain,
remotely_accessible=True,
)
]
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
# make sure that the token is still valid by just requesting it
- await self.get_token()
+ auth_info = await self.login()
+ librespot = await self.get_librespot_binary()
args = [
- self._librespot_bin,
+ librespot,
"-c",
- self._data_dir,
- "--pass-through",
+ CACHE_DIR,
+ "-M",
+ "256M",
+ "--passthrough",
"-b",
"320",
+ "--backend",
+ "pipe",
"--single-track",
f"spotify://track:{streamdetails.item_id}",
+ "--token",
+ auth_info["access_token"],
]
if seek_position:
args += ["--start-position", str(int(seek_position))]
- if self._ap_workaround:
- args += ["--ap-port", "12345"]
chunk_size = get_chunksize(streamdetails.audio_format)
- async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc:
+ stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
+ async with AsyncProcess(
+ args,
+ stdout=True,
+ stderr=stderr,
+ name="librespot",
+ ) as librespot_proc:
async for chunk in librespot_proc.iter_any(chunk_size):
yield chunk
MediaItemImage(
type=ImageType.THUMB,
path=img_url,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=album_obj["images"][0]["url"],
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=track_obj["album"]["images"][0]["url"],
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=playlist_obj["images"][0]["url"],
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
playlist.cache_checksum = str(playlist_obj["snapshot_id"])
return playlist
- async def get_token(self) -> dict:
- """Log-in Spotify and return tokeninfo."""
+ async def login(self) -> dict:
+ """Log-in Spotify and return Auth/token info."""
# return existing token if we have one in memory
- if (
- self._auth_token
- and await asyncio.to_thread(os.path.isdir, self._data_dir)
- and (self._auth_token["expiresAt"] > int(time.time()))
- ):
- return self._auth_token
- tokeninfo, userinfo = None, self._sp_user
- # retrieve token with librespot
- retries = 0
- while retries < 5:
- try:
- retries += 1
- if not tokeninfo:
- async with asyncio.timeout(10):
- tokeninfo = await self._get_token()
- if tokeninfo and not userinfo:
- async with asyncio.timeout(10):
- userinfo = await self._get_data("me", tokeninfo=tokeninfo)
- if tokeninfo and userinfo:
- # we have all info we need!
- break
- if retries > 2:
- # switch to ap workaround after 2 retries
- self._ap_workaround = True
- except TimeoutError:
- await asyncio.sleep(2)
- if tokeninfo and userinfo:
- self._auth_token = tokeninfo
- self._sp_user = userinfo
- self.mass.metadata.set_default_preferred_language(userinfo["country"])
- self.logger.debug("Auth token refreshed")
- self._auth_token = tokeninfo
- return tokeninfo
- if tokeninfo and not userinfo:
- msg = (
- "Unable to retrieve userdetails from Spotify API - "
- "probably just a temporary error"
- )
- raise LoginFailed(msg)
- await asyncio.to_thread(shutil.rmtree, self._data_dir)
- msg = "Retrieving token failed, note that only Spotify Premium accounts are supported"
- raise LoginFailed(msg)
-
- async def _get_token(self):
- """Get spotify auth token with librespot bin."""
- time_start = time.time()
- # get token with (pre-authorized) librespot
- scopes = [
- "user-read-playback-state",
- "user-read-currently-playing",
- "user-modify-playback-state",
- "playlist-read-private",
- "playlist-read-collaborative",
- "playlist-modify-public",
- "playlist-modify-private",
- "user-follow-modify",
- "user-follow-read",
- "user-library-read",
- "user-library-modify",
- "user-read-private",
- "user-read-email",
- "user-top-read",
- ]
- scope = ",".join(scopes)
- args = [
- self._librespot_bin,
- "-O",
- "-t",
- "--client-id",
- self.config.get_value(CONF_CLIENT_ID) or app_var(2),
- "--scope",
- scope,
- "-c",
- self._data_dir,
- ]
- if self._ap_workaround:
- args += ["--ap-port", "12345"]
- _returncode, output = await check_output(*args)
- duration = round(time.time() - time_start, 2)
- try:
- result = json.loads(output)
- except JSONDecodeError:
- self.logger.warning(
- "Error while retrieving Spotify token after %s seconds, details: %s",
- duration,
- output.decode(),
- )
- return None
- self.logger.debug(
- "Retrieved Spotify token using librespot in %s seconds",
- duration,
+ if self._auth_info and (self._auth_info["expires_at"] > (time.time() - 60)):
+ return self._auth_info
+
+ if not (refresh_token := self.config.get_value(CONF_REFRESH_TOKEN)):
+ raise LoginFailed("Authentication required")
+
+ expires_at = self.config.get_value(CONF_AUTH_EXPIRES_AT) or 0
+ access_token = self.config.get_value(CONF_ACCESS_TOKEN)
+
+ if expires_at < (time.time() - 300):
+ # refresh token
+ client_id = self.config.get_value(CONF_CLIENT_ID) or app_var(2)
+ params = {
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token,
+ "client_id": client_id,
+ }
+ async with self.mass.http_session.post(
+ "https://accounts.spotify.com/api/token", data=params
+ ) as response:
+ if response.status != 200:
+ err = await response.text()
+ self.mass.config.set_raw_provider_config_value(
+ self.instance_id, CONF_REFRESH_TOKEN, None
+ )
+ raise LoginFailed(f"Failed to refresh access token: {err}")
+ data = await response.json()
+ access_token = data["access_token"]
+ refresh_token = data["refresh_token"]
+ expires_at = int(data["expires_in"] + time.time())
+ self.logger.debug("Successfully refreshed access token")
+
+ self._auth_info = auth_info = {
+ "access_token": access_token,
+ "refresh_token": refresh_token,
+ "expires_at": expires_at,
+ }
+
+ # make sure that our updated creds get stored in config
+ await self.mass.config.set_provider_config_value(
+ self.instance_id, CONF_REFRESH_TOKEN, refresh_token
+ )
+ await self.mass.config.set_provider_config_value(
+ self.instance_id, CONF_ACCESS_TOKEN, access_token
)
- # transform token info to spotipy compatible format
- if result and "accessToken" in result:
- tokeninfo = result
- tokeninfo["expiresAt"] = tokeninfo["expiresIn"] + int(time.time() - 120)
- return tokeninfo
- return None
+ await self.mass.config.set_provider_config_value(
+ self.instance_id, CONF_AUTH_EXPIRES_AT, expires_at
+ )
+
+ # get logged-in user info
+ if not self._sp_user:
+ self._sp_user = userinfo = await self._get_data("me", auth_info=auth_info)
+ self.mass.metadata.set_default_preferred_language(userinfo["country"])
+ self.logger.info("Successfully logged in to Spotify as %s", userinfo["display_name"])
+
+ return auth_info
async def _get_all_items(self, endpoint, key="items", **kwargs) -> list[dict]:
"""Get all items from a paged list."""
url = f"https://api.spotify.com/v1/{endpoint}"
kwargs["market"] = "from_token"
kwargs["country"] = "from_token"
- tokeninfo = kwargs.pop("tokeninfo", None)
- if tokeninfo is None:
- tokeninfo = await self.get_token()
- headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'}
+ auth_info = kwargs.pop("auth_info", await self.login())
+ headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
locale = self.mass.metadata.locale.replace("_", "-")
language = locale.split("-")[0]
headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
async def _delete_data(self, endpoint, data=None, **kwargs) -> None:
"""Delete data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
- token = await self.get_token()
- headers = {"Authorization": f'Bearer {token["accessToken"]}'}
+ auth_info = kwargs.pop("auth_info", await self.login())
+ headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
async with self.mass.http_session.delete(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
async def _put_data(self, endpoint, data=None, **kwargs) -> None:
"""Put data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
- token = await self.get_token()
- headers = {"Authorization": f'Bearer {token["accessToken"]}'}
+ auth_info = kwargs.pop("auth_info", await self.login())
+ headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
async with self.mass.http_session.put(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
async def _post_data(self, endpoint, data=None, **kwargs) -> dict[str, Any]:
"""Post data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
- token = await self.get_token()
- headers = {"Authorization": f'Bearer {token["accessToken"]}'}
+ auth_info = kwargs.pop("auth_info", await self.login())
+ headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
async with self.mass.http_session.post(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
response.raise_for_status()
return await response.json(loads=json_loads)
+ async def get_librespot_binary(self):
+ """Find the correct librespot binary belonging to the platform."""
+ # ruff: noqa: SIM102
+ if self._librespot_bin is not None:
+ return self._librespot_bin
+
+ async def check_librespot(librespot_path: str) -> str | None:
+ try:
+ returncode, output = await check_output(librespot_path, "--version")
+ if returncode == 0 and b"librespot" in output:
+ self._librespot_bin = librespot_path
+ return librespot_path
+ except OSError:
+ return None
+
+ base_path = os.path.join(os.path.dirname(__file__), "bin")
+ system = platform.system().lower().replace("darwin", "macos")
+ architecture = platform.machine().lower()
+
+ if bridge_binary := await check_librespot(
+ os.path.join(base_path, f"librespot-{system}-{architecture}")
+ ):
+ return bridge_binary
+
+ msg = f"Unable to locate Librespot for {system}/{architecture}"
+ raise RuntimeError(msg)
+
def _fix_create_playlist_api_bug(self, playlist_obj: dict[str, Any]) -> None:
"""Fix spotify API bug where incorrect owner id is returned from Create Playlist."""
if playlist_obj["owner"]["id"] != self._sp_user["id"]:
self.logger.warning(
"FIXME: Spotify have fixed their Create Playlist API, this fix can be removed."
)
-
-
-async def get_librespot_binary():
- """Find the correct librespot binary belonging to the platform."""
- # ruff: noqa: SIM102
-
- async def check_librespot(librespot_path: str) -> str | None:
- try:
- returncode, output = await check_output(librespot_path, "--check")
- if returncode == 0 and b"ok spotty" in output and b"using librespot" in output:
- return librespot_path
- except OSError:
- return None
-
- base_path = os.path.join(os.path.dirname(__file__), "bin")
- system = platform.system().lower()
- architecture = platform.machine().lower()
-
- if bridge_binary := await check_librespot(
- os.path.join(base_path, f"librespot-{system}-{architecture}")
- ):
- return bridge_binary
-
- msg = f"Unable to locate Librespot for {system}/{architecture}"
- raise RuntimeError(msg)