From: Marcel van der Veldt Date: Mon, 8 Dec 2025 22:21:20 +0000 (+0100) Subject: Fix getting ha user role X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=4bce59e14f3670ef62e016b78b2c8504c7e7558a;p=music-assistant-server.git Fix getting ha user role --- diff --git a/music_assistant/controllers/webserver/helpers/auth_providers.py b/music_assistant/controllers/webserver/helpers/auth_providers.py index 8ce51671..51442527 100644 --- a/music_assistant/controllers/webserver/helpers/auth_providers.py +++ b/music_assistant/controllers/webserver/helpers/auth_providers.py @@ -3,9 +3,7 @@ from __future__ import annotations import asyncio -import base64 import hashlib -import json import logging import secrets from abc import ABC, abstractmethod @@ -41,6 +39,36 @@ def normalize_username(username: str) -> str: LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth") +async def get_ha_user_role(mass: MusicAssistant, ha_user_id: str) -> UserRole: + """ + Get user role based on Home Assistant admin status. + + :param mass: MusicAssistant instance. + :param ha_user_id: The Home Assistant user ID to check. + """ + try: + hass_prov = mass.get_provider("hass") + if hass_prov is None or not hass_prov.available: + LOGGER.debug("HA provider not available, returning USER role") + return UserRole.USER + + hass_prov = cast("HomeAssistantProvider", hass_prov) + # Query HA for user list to check admin status + result = await hass_prov.hass.send_command("config/auth/list") + for ha_user in result: + if ha_user.get("id") == ha_user_id: + # User is admin if they have "system-admin" in their group_ids + group_ids = ha_user.get("group_ids", []) + if "system-admin" in group_ids: + LOGGER.debug("HA user %s is admin, granting ADMIN role", ha_user_id) + return UserRole.ADMIN + break + except Exception as err: + LOGGER.debug("Failed to check HA admin status: %s", err) + + return UserRole.USER + + class LoginRateLimiter: """Rate limiter for login attempts to prevent brute force attacks.""" @@ -571,51 +599,15 @@ class HomeAssistantOAuthProvider(LoginProvider): ), ) - def _decode_ha_jwt_token(self, access_token: str) -> tuple[str | None, str | None]: - """ - Decode Home Assistant JWT token to extract user ID and name. - - :param access_token: The JWT access token from Home Assistant. - :return: Tuple of (user_id, username) or (None, None) if decoding fails. - """ - try: - # JWT tokens have 3 parts separated by dots: header.payload.signature - parts = access_token.split(".") - if len(parts) >= 2: - # Decode the payload (second part) - # Add padding if needed (JWT base64 may not be padded) - payload = parts[1] - payload += "=" * (4 - len(payload) % 4) - decoded = base64.urlsafe_b64decode(payload) - token_data = json.loads(decoded) - - # Home Assistant JWT tokens use 'iss' as the user ID - ha_user_id: str | None = token_data.get("iss") - - if not ha_user_id: - # Fallback to 'sub' if 'iss' is not present - ha_user_id = token_data.get("sub") - - # Try to extract username from token (name, username, or other fields) - username = token_data.get("name") or token_data.get("username") - - if ha_user_id: - return str(ha_user_id), username - return None, None - except Exception as decode_error: - self.logger.error("Failed to decode HA JWT token: %s", decode_error) - - return None, None - async def _fetch_ha_user_via_websocket( self, ha_url: str, access_token: str - ) -> tuple[str | None, str | None]: + ) -> tuple[str | None, str | None, str | None]: """ Fetch user information from Home Assistant via WebSocket. :param ha_url: Home Assistant URL. :param access_token: Access token for WebSocket authentication. - :return: Tuple of (username, display_name) or (None, None) if fetch fails. + :return: Tuple of (user_id, username, display_name) or (None, None, None) if fetch fails. """ ws_url = get_websocket_url(ha_url) @@ -626,18 +618,19 @@ class HomeAssistantOAuthProvider(LoginProvider): result = await client.send_command("auth/current_user") if result: - # Extract username and display name from response + # Extract user_id, username and display name from response + user_id = result.get("id") username = result.get("name") or result.get("username") display_name = result.get("name") - if username: - return username, display_name + if user_id and username: + return user_id, username, display_name self.logger.warning("auth/current_user returned no user data") - return None, None + return None, None, None except BaseHassClientError as ws_error: self.logger.error("Failed to fetch HA user via WebSocket: %s", ws_error) - return None, None + return None, None, None async def _get_or_create_user( self, username: str, display_name: str | None, ha_user_id: str @@ -678,18 +671,19 @@ class HomeAssistantOAuthProvider(LoginProvider): await self.auth_manager.link_user_to_provider( existing_user, AuthProviderType.HOME_ASSISTANT, ha_user_id ) - - self.logger.debug("Linked existing user '%s' to Home Assistant provider", username) return existing_user # New HA user - check if self-registration allowed if not self.allow_self_registration: return None - # Create new user with USER role + # Determine role based on HA admin status + role = await get_ha_user_role(self.mass, ha_user_id) + + # Create new user user = await self.auth_manager.create_user( username=username, - role=UserRole.USER, + role=role, display_name=display_name or username, ) @@ -743,19 +737,16 @@ class HomeAssistantOAuthProvider(LoginProvider): if not access_token: return AuthResult(success=False, error="No access token received from HA") - # Decode JWT token to get HA user ID - ha_user_id, _ = self._decode_ha_jwt_token(access_token) - if not ha_user_id: - return AuthResult(success=False, error="Failed to decode token") - - # Fetch user information from HA via WebSocket - username, display_name = await self._fetch_ha_user_via_websocket(ha_url, access_token) + # Fetch user information from HA via WebSocket (includes the real user ID) + ha_user_id, username, display_name = await self._fetch_ha_user_via_websocket( + ha_url, access_token + ) - # If we couldn't get username from WebSocket, fail authentication - if not username: + # If we couldn't get user info from WebSocket, fail authentication + if not ha_user_id or not username: return AuthResult( success=False, - error="Failed to get username from Home Assistant", + error="Failed to get user info from Home Assistant", ) # Get or create user diff --git a/music_assistant/controllers/webserver/websocket_client.py b/music_assistant/controllers/webserver/websocket_client.py index ceea99dd..72e6ac39 100644 --- a/music_assistant/controllers/webserver/websocket_client.py +++ b/music_assistant/controllers/webserver/websocket_client.py @@ -28,6 +28,7 @@ from music_assistant.constants import HOMEASSISTANT_SYSTEM_USER, VERBOSE_LOG_LEV from music_assistant.helpers.api import APICommandHandler, parse_arguments from .helpers.auth_middleware import is_request_from_ingress, set_current_token, set_current_user +from .helpers.auth_providers import get_ha_user_role if TYPE_CHECKING: from music_assistant_models.event import MassEvent @@ -377,10 +378,11 @@ class WebsocketClientHandler: if not user: # Auto-create user for Ingress (they're already authenticated by HA) - # Always create with USER role (admin is created during setup) + # Determine role based on HA admin status + role = await get_ha_user_role(self.mass, ingress_user_id) user = await self.webserver.auth.create_user( username=ingress_username, - role=UserRole.USER, + role=role, display_name=ingress_display_name, )