from typing import TYPE_CHECKING, Any
import aiohttp
-from aiohttp import web
+from aiohttp import BasicAuth, web
from alexapy import AlexaAPI, AlexaLogin, AlexaProxy
from music_assistant_models.config_entries import ConfigEntry
from music_assistant_models.enums import (
CONF_URL = "url"
CONF_ACTION_AUTH = "auth"
-CONF_AUTH_TOKEN = "token"
+CONF_AUTH_SECRET = "secret"
+CONF_API_BASIC_AUTH_USERNAME = "api_username"
+CONF_API_BASIC_AUTH_PASSWORD = "api_password"
+CONF_API_URL = "api_url"
SUPPORTED_FEATURES: set[ProviderFeature] = set()
url=str(values[CONF_URL]),
email=str(values[CONF_USERNAME]),
password=str(values[CONF_PASSWORD]),
+ otp_secret=str(values.get(CONF_AUTH_SECRET, "")),
outputpath=lambda x: x,
)
# --- Proxy authentication logic using AlexaProxy ---
# Build the proxy path and URL
proxy_path = "/alexa/auth/proxy/"
- post_path = "/alexa/auth/proxy/ap/signin"
+ post_path = "/alexa/auth/proxy/ap/signin/*"
base_url = mass.webserver.base_url.rstrip("/")
proxy_url = f"{base_url}{proxy_path}"
# Notify the callback URL
async with aiohttp.ClientSession() as session:
await session.get(auth_helper.callback_url)
+ _LOGGER.info("Alexa Callback URL: %s", auth_helper.callback_url)
return web.Response(
text="""
<html>
mass.webserver.register_dynamic_route(post_path, proxy_handler, "POST")
try:
- await auth_helper.authenticate(proxy_url, timeout=300)
- await save_cookie(login, str(values[CONF_USERNAME]), mass)
+ await auth_helper.authenticate(proxy_url)
+ if await login.test_loggedin():
+ await save_cookie(login, str(values[CONF_USERNAME]), mass)
+ else:
+ raise LoginFailed(
+ "Authentication login failed, please provide logs to the discussion #431."
+ )
except KeyError:
# no URL param was found so user probably cancelled the auth
pass
except Exception as error:
raise LoginFailed(f"Failed to authenticate with Amazon '{error}'.")
finally:
- mass.webserver.unregister_dynamic_route(proxy_path)
- mass.webserver.unregister_dynamic_route(post_path)
+ mass.webserver.unregister_dynamic_route(proxy_path, "GET")
+ mass.webserver.unregister_dynamic_route(post_path, "POST")
return (
ConfigEntry(
type=ConfigEntryType.STRING,
label="E-Mail",
required=True,
+ value=values.get(CONF_USERNAME) if values else None,
),
ConfigEntry(
key=CONF_PASSWORD,
type=ConfigEntryType.SECURE_STRING,
label="Password",
required=True,
+ value=values.get(CONF_PASSWORD) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_AUTH_SECRET,
+ type=ConfigEntryType.SECURE_STRING,
+ label="OTP Secret",
+ required=False,
+ value=values.get(CONF_AUTH_SECRET) if values else None,
),
ConfigEntry(
key=CONF_ACTION_AUTH,
action=CONF_ACTION_AUTH,
depends_on=CONF_URL,
),
+ ConfigEntry(
+ key=CONF_API_URL,
+ type=ConfigEntryType.STRING,
+ label="API Url",
+ default_value="http://localhost:3000",
+ required=True,
+ value=values.get(CONF_API_URL) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_API_BASIC_AUTH_USERNAME,
+ type=ConfigEntryType.STRING,
+ label="API Basic Auth Username",
+ required=False,
+ value=values.get(CONF_API_BASIC_AUTH_USERNAME) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_API_BASIC_AUTH_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ label="API Basic Auth Password",
+ required=False,
+ value=values.get(CONF_API_BASIC_AUTH_PASSWORD) if values else None,
+ ),
)
if not (player := self.mass.players.get(player_id)):
return
+ username = self.config.get_value(CONF_API_BASIC_AUTH_USERNAME)
+ password = self.config.get_value(CONF_API_BASIC_AUTH_PASSWORD)
+
+ auth = None
+ if username is not None and password is not None:
+ auth = BasicAuth(str(username), str(password))
+
async with aiohttp.ClientSession() as session:
try:
async with session.post(
- "http://localhost:3000/ma/push-url",
+ f"{self.config.get_value(CONF_API_URL)}/ma/push-url",
json={"streamUrl": media.uri},
timeout=aiohttp.ClientTimeout(total=10),
+ auth=auth,
) as resp:
await resp.text()
except Exception as exc:
api = AlexaAPI(device_object, self.login)
await api.run_custom("Ask music assistant to play audio")
- player.current_media = media
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
+ state = await api.get_state()
+ if state:
+ state = state.get("playerInfo", None)
+
+ if state:
+ device_media = state.get("infoText")
+ if device_media:
+ media.title = device_media.get("title")
+ media.artist = device_media.get("subText1")
+ player.current_media = media
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ if state.get("playbackState") == "PLAYING":
+ player.state = PlayerState.PLAYING
self.mass.players.update(player_id)