from __future__ import annotations
import asyncio
+import json
import logging
import os
import time
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
from music_assistant.helpers.auth import AuthenticationHelper
+from music_assistant.helpers.util import lock
from music_assistant.models.player import Player
from music_assistant.models.player_provider import PlayerProvider
key=CONF_API_URL,
type=ConfigEntryType.STRING,
label="API Url",
- default_value="http://localhost:3000",
+ default_value="http://localhost:5000",
required=True,
value=values.get(CONF_API_URL) if values else None,
),
_LOGGER.debug("Cookie file %s does not exist, nothing to delete.", cookiefile)
+async def _request_with_session(
+ session: aiohttp.ClientSession,
+ method: str,
+ url: str,
+ json_data: dict[str, Any] | None,
+ timeout: int,
+ auth: BasicAuth | None,
+) -> str:
+ """Handle an API request with a provided aiohttp session.
+
+ :param session: The aiohttp session to use.
+ :param method: HTTP method to use for the request.
+ :param url: Full URL for the request.
+ :param json_data: Optional JSON payload or query params.
+ :param timeout: Timeout in seconds for the request.
+ :param auth: Optional basic auth credentials.
+ """
+ request_timeout = aiohttp.ClientTimeout(total=timeout)
+ if method.upper() == "GET":
+ async with session.get(url, params=json_data, timeout=request_timeout, auth=auth) as resp:
+ resp_text = await resp.text()
+ if resp.status < 200 or resp.status >= 300:
+ msg = (
+ f"Failed API request to {url}: Status code: {resp.status}, "
+ f"Response: {resp_text}"
+ )
+ _LOGGER.error(msg)
+ raise ActionUnavailable(msg)
+ return resp_text
+
+ async with session.request(
+ method.upper(),
+ url,
+ json=json_data,
+ timeout=request_timeout,
+ auth=auth,
+ ) as resp:
+ resp_text = await resp.text()
+ if resp.status < 200 or resp.status >= 300:
+ msg = f"Failed API request to {url}: Status code: {resp.status}, Response: {resp_text}"
+ _LOGGER.error(msg)
+ raise ActionUnavailable(msg)
+ return resp_text
+
+
+async def api_request(
+ provider: PlayerProvider,
+ endpoint: str,
+ method: str = "POST",
+ json_data: dict[str, Any] | None = None,
+ timeout: int = 10,
+) -> str:
+ """Send a request to the configured Music Assistant / Alexa API.
+
+ Returns the response text on success or raises `ActionUnavailable` on failure.
+ """
+ username = provider.config.get_value(CONF_API_BASIC_AUTH_USERNAME)
+ password = provider.config.get_value(CONF_API_BASIC_AUTH_PASSWORD)
+
+ auth = None
+ if username is not None and password is not None:
+ auth = BasicAuth(str(username), str(password))
+
+ api_url = str(provider.config.get_value(CONF_API_URL) or "")
+ url = f"{api_url.rstrip('/')}/{endpoint.lstrip('/')}"
+
+ return await _request_with_session(
+ provider.mass.http_session, method, url, json_data, timeout, auth
+ )
+
+
class AlexaDevice:
"""Representation of an Alexa Device."""
self._attr_device_info = DeviceInfo()
self._attr_powered = False
self._attr_available = True
+ # Keep track of the last metadata we pushed to avoid unnecessary uploads
+ self._last_meta_checksum: str | None = None
+ # Keep last stream url pushed (set in play_media)
+ self._last_stream_url: str | None = None
@property
def requires_flow_mode(self) -> bool:
async def stop(self) -> None:
"""Handle STOP command on the player."""
- await self.api.stop()
+ provider = cast("AlexaProvider", self.provider)
+
+ utter = await provider.get_intent_utterance("AMAZON.StopIntent", "stop")
+ await self.api.run_custom(utter)
+
self._attr_current_media = None
self._attr_playback_state = PlaybackState.IDLE
self.update_state()
async def play(self) -> None:
"""Handle PLAY command on the player."""
- await self.api.play()
+ provider = cast("AlexaProvider", self.provider)
+
+ utter = await provider.get_intent_utterance("AMAZON.ResumeIntent", "resume")
+ await self.api.run_custom(utter)
+
self._attr_playback_state = PlaybackState.PLAYING
self.update_state()
async def pause(self) -> None:
"""Handle PAUSE command on the player."""
- await self.api.pause()
+ provider = cast("AlexaProvider", self.provider)
+
+ utter = await provider.get_intent_utterance("AMAZON.PauseIntent", "pause")
+ await self.api.run_custom(utter)
+
self._attr_playback_state = PlaybackState.PAUSED
self.update_state()
async def play_media(self, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on the player."""
- username = self.provider.config.get_value(CONF_API_BASIC_AUTH_USERNAME)
- password = self.provider.config.get_value(CONF_API_BASIC_AUTH_PASSWORD)
+ stream_url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
- auth = None
- if username is not None and password is not None:
- auth = BasicAuth(str(username), str(password))
+ payload = {
+ "streamUrl": stream_url,
+ }
- stream_url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
+ await api_request(
+ self.provider,
+ "/ma/push-url",
+ method="POST",
+ json_data=payload,
+ timeout=10,
+ )
- async with aiohttp.ClientSession() as session:
- try:
- async with session.post(
- f"{self.provider.config.get_value(CONF_API_URL)}/ma/push-url",
- json={
- "streamUrl": stream_url,
- "title": media.title,
- "artist": media.artist,
- "album": media.album,
- "imageUrl": media.image_url,
- },
- timeout=aiohttp.ClientTimeout(total=10),
- auth=auth,
- ) as resp:
- resp_text = await resp.text()
- if resp.status < 200 or resp.status >= 300:
- msg = (
- f"Failed to push URL to MA Alexa API: "
- f"Status code: {resp.status}, Response: {resp_text}. "
- "Please verify your API connection and configuration"
- )
- _LOGGER.error(msg)
- raise ActionUnavailable(msg)
- except ActionUnavailable:
- raise
- except Exception as exc:
- msg = (
- "Failed to push URL to MA Alexa API: "
- "Please verify your API connection and configuration"
- )
- _LOGGER.error("Failed to push URL to MA Alexa API: %s", exc)
- raise ActionUnavailable(msg)
+ # Save last pushed stream url so metadata updates can reuse it
+ self._last_stream_url = stream_url
alexa_locale = self.provider.config.get_value(CONF_ALEXA_LANGUAGE)
self._attr_current_media = media
self.update_state()
+ def _on_player_media_updated(self) -> None:
+ """Handle callback when the current media of the player is updated.
+
+ Upload the stream URL and media metadata (title/artist/album/imageUrl)
+ to the configured Music Assistant / Alexa API so the Alexa side can
+ display/update the playing item.
+ """
+ media = self.state.current_media
+
+ async def _upload_metadata() -> None:
+ stream_url = self._last_stream_url
+ if media is not None:
+ title = media.title
+ artist = media.artist
+ album = media.album
+ image_url = media.image_url
+ else:
+ return
+
+ meta_checksum = f"{stream_url}-{album}-{artist}-{title}-{image_url}"
+ if meta_checksum == self._last_meta_checksum:
+ return
+
+ payload = {
+ "streamUrl": stream_url,
+ "title": title,
+ "artist": artist,
+ "album": album,
+ "imageUrl": image_url,
+ }
+
+ await api_request(
+ self.provider, "/ma/push-url", method="POST", json_data=payload, timeout=10
+ )
+
+ # store last pushed values
+ self._last_meta_checksum = meta_checksum
+
+ self.mass.create_task(_upload_metadata())
+
class AlexaProvider(PlayerProvider):
"""Implementation of an Alexa Device Provider."""
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self.devices = {}
+ self._intents: list[dict[str, Any]] | None = None
+ self._invocation_name: str | None = None
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
# Create AlexaPlayer instance
player = AlexaPlayer(self, player_id, device_object)
await self.mass.players.register_or_update(player)
+
+ await self._load_intents()
+
+ @lock
+ async def _load_intents(self) -> None:
+ """Load intents from the configured API and cache them on the provider."""
+ resp = await api_request(self, "/alexa/intents", method="GET", timeout=5)
+ data = json.loads(resp)
+ if isinstance(data, dict):
+ # cache invocationName if present
+ self._invocation_name = data.get("invocationName")
+ self._intents = data.get("intents", []) or []
+ else:
+ self._intents = []
+
+ async def get_intent_utterance(self, intent_name: str, default: str) -> str:
+ """Return the first utterance for the given intent name (cached).
+
+ If intents are not yet cached, attempt to load them.
+ """
+ if self._intents is None:
+ await self._load_intents()
+
+ for intent in self._intents or []:
+ if intent.get("intent") == intent_name:
+ utts = cast("list[str]", intent.get("utterances") or [])
+ if utts:
+ utter = utts[0]
+ if self._invocation_name:
+ inv = self._invocation_name.strip()
+ return f"{inv} {utter}".strip()
+ return utter
+ return default