Fix getting ha user role
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 8 Dec 2025 22:21:20 +0000 (23:21 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 8 Dec 2025 22:21:20 +0000 (23:21 +0100)
music_assistant/controllers/webserver/helpers/auth_providers.py
music_assistant/controllers/webserver/websocket_client.py

index 8ce516715ddf286fe5d490f018aea75d39a70799..51442527006cae694a33ff695767cea23ca6cd51 100644 (file)
@@ -3,9 +3,7 @@
 from __future__ import annotations
 
 import asyncio
-import base64
 import hashlib
-import json
 import logging
 import secrets
 from abc import ABC, abstractmethod
@@ -41,6 +39,36 @@ def normalize_username(username: str) -> str:
 LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth")
 
 
+async def get_ha_user_role(mass: MusicAssistant, ha_user_id: str) -> UserRole:
+    """
+    Get user role based on Home Assistant admin status.
+
+    :param mass: MusicAssistant instance.
+    :param ha_user_id: The Home Assistant user ID to check.
+    """
+    try:
+        hass_prov = mass.get_provider("hass")
+        if hass_prov is None or not hass_prov.available:
+            LOGGER.debug("HA provider not available, returning USER role")
+            return UserRole.USER
+
+        hass_prov = cast("HomeAssistantProvider", hass_prov)
+        # Query HA for user list to check admin status
+        result = await hass_prov.hass.send_command("config/auth/list")
+        for ha_user in result:
+            if ha_user.get("id") == ha_user_id:
+                # User is admin if they have "system-admin" in their group_ids
+                group_ids = ha_user.get("group_ids", [])
+                if "system-admin" in group_ids:
+                    LOGGER.debug("HA user %s is admin, granting ADMIN role", ha_user_id)
+                    return UserRole.ADMIN
+                break
+    except Exception as err:
+        LOGGER.debug("Failed to check HA admin status: %s", err)
+
+    return UserRole.USER
+
+
 class LoginRateLimiter:
     """Rate limiter for login attempts to prevent brute force attacks."""
 
@@ -571,51 +599,15 @@ class HomeAssistantOAuthProvider(LoginProvider):
             ),
         )
 
-    def _decode_ha_jwt_token(self, access_token: str) -> tuple[str | None, str | None]:
-        """
-        Decode Home Assistant JWT token to extract user ID and name.
-
-        :param access_token: The JWT access token from Home Assistant.
-        :return: Tuple of (user_id, username) or (None, None) if decoding fails.
-        """
-        try:
-            # JWT tokens have 3 parts separated by dots: header.payload.signature
-            parts = access_token.split(".")
-            if len(parts) >= 2:
-                # Decode the payload (second part)
-                # Add padding if needed (JWT base64 may not be padded)
-                payload = parts[1]
-                payload += "=" * (4 - len(payload) % 4)
-                decoded = base64.urlsafe_b64decode(payload)
-                token_data = json.loads(decoded)
-
-                # Home Assistant JWT tokens use 'iss' as the user ID
-                ha_user_id: str | None = token_data.get("iss")
-
-                if not ha_user_id:
-                    # Fallback to 'sub' if 'iss' is not present
-                    ha_user_id = token_data.get("sub")
-
-                # Try to extract username from token (name, username, or other fields)
-                username = token_data.get("name") or token_data.get("username")
-
-                if ha_user_id:
-                    return str(ha_user_id), username
-                return None, None
-        except Exception as decode_error:
-            self.logger.error("Failed to decode HA JWT token: %s", decode_error)
-
-        return None, None
-
     async def _fetch_ha_user_via_websocket(
         self, ha_url: str, access_token: str
-    ) -> tuple[str | None, str | None]:
+    ) -> tuple[str | None, str | None, str | None]:
         """
         Fetch user information from Home Assistant via WebSocket.
 
         :param ha_url: Home Assistant URL.
         :param access_token: Access token for WebSocket authentication.
-        :return: Tuple of (username, display_name) or (None, None) if fetch fails.
+        :return: Tuple of (user_id, username, display_name) or (None, None, None) if fetch fails.
         """
         ws_url = get_websocket_url(ha_url)
 
@@ -626,18 +618,19 @@ class HomeAssistantOAuthProvider(LoginProvider):
                 result = await client.send_command("auth/current_user")
 
                 if result:
-                    # Extract username and display name from response
+                    # Extract user_id, username and display name from response
+                    user_id = result.get("id")
                     username = result.get("name") or result.get("username")
                     display_name = result.get("name")
-                    if username:
-                        return username, display_name
+                    if user_id and username:
+                        return user_id, username, display_name
 
                 self.logger.warning("auth/current_user returned no user data")
-                return None, None
+                return None, None, None
 
         except BaseHassClientError as ws_error:
             self.logger.error("Failed to fetch HA user via WebSocket: %s", ws_error)
-            return None, None
+            return None, None, None
 
     async def _get_or_create_user(
         self, username: str, display_name: str | None, ha_user_id: str
@@ -678,18 +671,19 @@ class HomeAssistantOAuthProvider(LoginProvider):
             await self.auth_manager.link_user_to_provider(
                 existing_user, AuthProviderType.HOME_ASSISTANT, ha_user_id
             )
-
-            self.logger.debug("Linked existing user '%s' to Home Assistant provider", username)
             return existing_user
 
         # New HA user - check if self-registration allowed
         if not self.allow_self_registration:
             return None
 
-        # Create new user with USER role
+        # Determine role based on HA admin status
+        role = await get_ha_user_role(self.mass, ha_user_id)
+
+        # Create new user
         user = await self.auth_manager.create_user(
             username=username,
-            role=UserRole.USER,
+            role=role,
             display_name=display_name or username,
         )
 
@@ -743,19 +737,16 @@ class HomeAssistantOAuthProvider(LoginProvider):
             if not access_token:
                 return AuthResult(success=False, error="No access token received from HA")
 
-            # Decode JWT token to get HA user ID
-            ha_user_id, _ = self._decode_ha_jwt_token(access_token)
-            if not ha_user_id:
-                return AuthResult(success=False, error="Failed to decode token")
-
-            # Fetch user information from HA via WebSocket
-            username, display_name = await self._fetch_ha_user_via_websocket(ha_url, access_token)
+            # Fetch user information from HA via WebSocket (includes the real user ID)
+            ha_user_id, username, display_name = await self._fetch_ha_user_via_websocket(
+                ha_url, access_token
+            )
 
-            # If we couldn't get username from WebSocket, fail authentication
-            if not username:
+            # If we couldn't get user info from WebSocket, fail authentication
+            if not ha_user_id or not username:
                 return AuthResult(
                     success=False,
-                    error="Failed to get username from Home Assistant",
+                    error="Failed to get user info from Home Assistant",
                 )
 
             # Get or create user
index ceea99dd49e5d98d19c9e23991a2065a7025d52d..72e6ac396f0f3148d8b587adac2ced786eee1ed0 100644 (file)
@@ -28,6 +28,7 @@ from music_assistant.constants import HOMEASSISTANT_SYSTEM_USER, VERBOSE_LOG_LEV
 from music_assistant.helpers.api import APICommandHandler, parse_arguments
 
 from .helpers.auth_middleware import is_request_from_ingress, set_current_token, set_current_user
+from .helpers.auth_providers import get_ha_user_role
 
 if TYPE_CHECKING:
     from music_assistant_models.event import MassEvent
@@ -377,10 +378,11 @@ class WebsocketClientHandler:
 
                 if not user:
                     # Auto-create user for Ingress (they're already authenticated by HA)
-                    # Always create with USER role (admin is created during setup)
+                    # Determine role based on HA admin status
+                    role = await get_ha_user_role(self.mass, ingress_user_id)
                     user = await self.webserver.auth.create_user(
                         username=ingress_username,
-                        role=UserRole.USER,
+                        role=role,
                         display_name=ingress_display_name,
                     )