from __future__ import annotations
import asyncio
-import base64
import hashlib
-import json
import logging
import secrets
from abc import ABC, abstractmethod
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth")
+async def get_ha_user_role(mass: MusicAssistant, ha_user_id: str) -> UserRole:
+ """
+ Get user role based on Home Assistant admin status.
+
+ :param mass: MusicAssistant instance.
+ :param ha_user_id: The Home Assistant user ID to check.
+ """
+ try:
+ hass_prov = mass.get_provider("hass")
+ if hass_prov is None or not hass_prov.available:
+ LOGGER.debug("HA provider not available, returning USER role")
+ return UserRole.USER
+
+ hass_prov = cast("HomeAssistantProvider", hass_prov)
+ # Query HA for user list to check admin status
+ result = await hass_prov.hass.send_command("config/auth/list")
+ for ha_user in result:
+ if ha_user.get("id") == ha_user_id:
+ # User is admin if they have "system-admin" in their group_ids
+ group_ids = ha_user.get("group_ids", [])
+ if "system-admin" in group_ids:
+ LOGGER.debug("HA user %s is admin, granting ADMIN role", ha_user_id)
+ return UserRole.ADMIN
+ break
+ except Exception as err:
+ LOGGER.debug("Failed to check HA admin status: %s", err)
+
+ return UserRole.USER
+
+
class LoginRateLimiter:
"""Rate limiter for login attempts to prevent brute force attacks."""
),
)
- def _decode_ha_jwt_token(self, access_token: str) -> tuple[str | None, str | None]:
- """
- Decode Home Assistant JWT token to extract user ID and name.
-
- :param access_token: The JWT access token from Home Assistant.
- :return: Tuple of (user_id, username) or (None, None) if decoding fails.
- """
- try:
- # JWT tokens have 3 parts separated by dots: header.payload.signature
- parts = access_token.split(".")
- if len(parts) >= 2:
- # Decode the payload (second part)
- # Add padding if needed (JWT base64 may not be padded)
- payload = parts[1]
- payload += "=" * (4 - len(payload) % 4)
- decoded = base64.urlsafe_b64decode(payload)
- token_data = json.loads(decoded)
-
- # Home Assistant JWT tokens use 'iss' as the user ID
- ha_user_id: str | None = token_data.get("iss")
-
- if not ha_user_id:
- # Fallback to 'sub' if 'iss' is not present
- ha_user_id = token_data.get("sub")
-
- # Try to extract username from token (name, username, or other fields)
- username = token_data.get("name") or token_data.get("username")
-
- if ha_user_id:
- return str(ha_user_id), username
- return None, None
- except Exception as decode_error:
- self.logger.error("Failed to decode HA JWT token: %s", decode_error)
-
- return None, None
-
async def _fetch_ha_user_via_websocket(
self, ha_url: str, access_token: str
- ) -> tuple[str | None, str | None]:
+ ) -> tuple[str | None, str | None, str | None]:
"""
Fetch user information from Home Assistant via WebSocket.
:param ha_url: Home Assistant URL.
:param access_token: Access token for WebSocket authentication.
- :return: Tuple of (username, display_name) or (None, None) if fetch fails.
+ :return: Tuple of (user_id, username, display_name) or (None, None, None) if fetch fails.
"""
ws_url = get_websocket_url(ha_url)
result = await client.send_command("auth/current_user")
if result:
- # Extract username and display name from response
+ # Extract user_id, username and display name from response
+ user_id = result.get("id")
username = result.get("name") or result.get("username")
display_name = result.get("name")
- if username:
- return username, display_name
+ if user_id and username:
+ return user_id, username, display_name
self.logger.warning("auth/current_user returned no user data")
- return None, None
+ return None, None, None
except BaseHassClientError as ws_error:
self.logger.error("Failed to fetch HA user via WebSocket: %s", ws_error)
- return None, None
+ return None, None, None
async def _get_or_create_user(
self, username: str, display_name: str | None, ha_user_id: str
await self.auth_manager.link_user_to_provider(
existing_user, AuthProviderType.HOME_ASSISTANT, ha_user_id
)
-
- self.logger.debug("Linked existing user '%s' to Home Assistant provider", username)
return existing_user
# New HA user - check if self-registration allowed
if not self.allow_self_registration:
return None
- # Create new user with USER role
+ # Determine role based on HA admin status
+ role = await get_ha_user_role(self.mass, ha_user_id)
+
+ # Create new user
user = await self.auth_manager.create_user(
username=username,
- role=UserRole.USER,
+ role=role,
display_name=display_name or username,
)
if not access_token:
return AuthResult(success=False, error="No access token received from HA")
- # Decode JWT token to get HA user ID
- ha_user_id, _ = self._decode_ha_jwt_token(access_token)
- if not ha_user_id:
- return AuthResult(success=False, error="Failed to decode token")
-
- # Fetch user information from HA via WebSocket
- username, display_name = await self._fetch_ha_user_via_websocket(ha_url, access_token)
+ # Fetch user information from HA via WebSocket (includes the real user ID)
+ ha_user_id, username, display_name = await self._fetch_ha_user_via_websocket(
+ ha_url, access_token
+ )
- # If we couldn't get username from WebSocket, fail authentication
- if not username:
+ # If we couldn't get user info from WebSocket, fail authentication
+ if not ha_user_id or not username:
return AuthResult(
success=False,
- error="Failed to get username from Home Assistant",
+ error="Failed to get user info from Home Assistant",
)
# Get or create user