from __future__ import annotations
+import contextlib
import hashlib
import logging
import secrets
from datetime import datetime, timedelta
+from sqlite3 import OperationalError
from typing import TYPE_CHECKING, Any
from music_assistant_models.auth import (
UserRole,
)
from music_assistant_models.errors import (
- AuthenticationFailed,
AuthenticationRequired,
InsufficientPermissions,
InvalidDataError,
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth")
# Database schema version
-DB_SCHEMA_VERSION = 2
+DB_SCHEMA_VERSION = 3
# Token expiration constants (in days)
TOKEN_SHORT_LIVED_EXPIRATION = 30 # Short-lived tokens (auto-renewing on use)
user_id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
role TEXT NOT NULL,
- enabled INTEGER DEFAULT 1,
+ enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
display_name TEXT,
avatar_url TEXT,
- preferences TEXT DEFAULT '{}'
+ preferences json NOT NULL DEFAULT '{}',
+ player_filter json NOT NULL DEFAULT '[]',
+ provider_filter json NOT NULL DEFAULT '[]'
)
"""
)
created_at TEXT NOT NULL,
expires_at TEXT,
last_used_at TEXT,
- is_long_lived INTEGER DEFAULT 0,
+ is_long_lived INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users(user_id) ON DELETE CASCADE
)
"""
# Recreate tables with current schema
await self._create_database_tables()
+ # Migration to version 3: Add player_filter and provider_filter columns
+ if from_version < 3:
+ with contextlib.suppress(OperationalError):
+ # Column(s) may already exist
+ await self.database.execute(
+ "ALTER TABLE users ADD COLUMN player_filter json NOT NULL DEFAULT '[]'"
+ )
+ await self.database.execute(
+ "ALTER TABLE users ADD COLUMN provider_filter json NOT NULL DEFAULT '[]'"
+ )
+ await self.database.commit()
+
async def _setup_login_providers(self, allow_self_registration: bool) -> None:
"""
Set up available login providers based on configuration.
if not user_row or not user_row["enabled"]:
return None
- # Convert Row to dict for easier handling of optional fields
- user_dict = dict(user_row)
-
- # Parse preferences from JSON
- preferences = {}
- if prefs_json := user_dict.get("preferences"):
- try:
- preferences = json_loads(prefs_json)
- except Exception:
- self.logger.warning("Failed to parse preferences for user %s", user_id)
-
return User(
- user_id=user_dict["user_id"],
- username=user_dict["username"],
- role=UserRole(user_dict["role"]),
- enabled=bool(user_dict["enabled"]),
- created_at=datetime.fromisoformat(user_dict["created_at"]),
- display_name=user_dict.get("display_name"),
- avatar_url=user_dict.get("avatar_url"),
- preferences=preferences,
+ user_id=user_row["user_id"],
+ username=user_row["username"],
+ role=UserRole(user_row["role"]),
+ enabled=bool(user_row["enabled"]),
+ created_at=datetime.fromisoformat(user_row["created_at"]),
+ display_name=user_row["display_name"],
+ avatar_url=user_row["avatar_url"],
+ preferences=json_loads(user_row["preferences"]),
+ player_filter=json_loads(user_row["player_filter"]),
+ provider_filter=json_loads(user_row["provider_filter"]),
)
async def get_user_by_provider_link(
display_name: str | None = None,
avatar_url: str | None = None,
preferences: dict[str, Any] | None = None,
+ player_filter: list[str] | None = None,
+ provider_filter: list[str] | None = None,
) -> User:
"""
Create a new user.
:param display_name: Optional display name.
:param avatar_url: Optional avatar URL.
:param preferences: Optional user preferences dict.
+ :param player_filter: Optional list of player IDs user has access to.
+ :param provider_filter: Optional list of provider instance IDs user has access to.
"""
user_id = secrets.token_urlsafe(32)
created_at = utc()
if preferences is None:
preferences = {}
+ if player_filter is None:
+ player_filter = []
+ if provider_filter is None:
+ provider_filter = []
user_data = {
"user_id": user_id,
"display_name": display_name,
"avatar_url": avatar_url,
"preferences": json_dumps(preferences),
+ "player_filter": json_dumps(player_filter),
+ "provider_filter": json_dumps(provider_filter),
}
await self.database.insert("users", user_data)
display_name=display_name,
avatar_url=avatar_url,
preferences=preferences,
+ player_filter=player_filter,
+ provider_filter=provider_filter,
)
async def get_homeassistant_system_user(self) -> User:
user_rows = await self.database.get_rows("users", limit=1000)
users = []
for row in user_rows:
- row_dict = dict(row)
-
# Skip system users
- if row_dict["username"] == HOMEASSISTANT_SYSTEM_USER:
+ if row["username"] == HOMEASSISTANT_SYSTEM_USER:
continue
-
- # Parse preferences
- preferences = {}
- if prefs_json := row_dict.get("preferences"):
- try:
- preferences = json_loads(prefs_json)
- except Exception:
- self.logger.warning(
- "Failed to parse preferences for user %s", row_dict["user_id"]
- )
-
users.append(
User(
- user_id=row_dict["user_id"],
- username=row_dict["username"],
- role=UserRole(row_dict["role"]),
- enabled=bool(row_dict["enabled"]),
- created_at=datetime.fromisoformat(row_dict["created_at"]),
- display_name=row_dict.get("display_name"),
- avatar_url=row_dict.get("avatar_url"),
- preferences=preferences,
+ user_id=row["user_id"],
+ username=row["username"],
+ role=UserRole(row["role"]),
+ enabled=bool(row["enabled"]),
+ created_at=datetime.fromisoformat(row["created_at"]),
+ display_name=row["display_name"],
+ avatar_url=row["avatar_url"],
+ preferences=json_loads(row["preferences"]),
+ player_filter=json_loads(row["player_filter"]),
+ provider_filter=json_loads(row["provider_filter"]),
)
)
return users
role: str = "user",
display_name: str | None = None,
avatar_url: str | None = None,
+ player_filter: list[str] | None = None,
+ provider_filter: list[str] | None = None,
) -> User:
"""
Create a new user with built-in authentication (admin only).
:param role: User role - "admin" or "user" (default: "user").
:param display_name: Optional display name.
:param avatar_url: Optional avatar URL.
+ :param player_filter: Optional list of player IDs user has access to.
+ :param provider_filter: Optional list of provider instance IDs user has access to.
:return: Created user object.
"""
# Validation
raise InvalidDataError("Built-in auth provider not available")
# Create user with password
- user = await builtin_provider.create_user_with_password(username, password, role=user_role)
+ user = await builtin_provider.create_user_with_password(
+ username,
+ password,
+ role=user_role,
+ player_filter=player_filter,
+ provider_filter=provider_filter,
+ )
# Update optional fields if provided
if display_name or avatar_url:
self,
target_user: User,
password: str,
- old_password: str | None,
is_admin_update: bool,
current_user: User,
) -> None:
if not builtin_provider or not isinstance(builtin_provider, BuiltinLoginProvider):
raise InvalidDataError("Built-in auth not available")
+ # Update password (used for both admin resets and user password changes)
+ await builtin_provider.reset_password(target_user, password)
+
if is_admin_update:
- # Admin can reset password without old password
- await builtin_provider.reset_password(target_user, password)
self.logger.info(
"Password reset for user %s by admin %s",
target_user.username,
current_user.username,
)
else:
- # User updating own password - requires old password verification
- if not old_password:
- raise InvalidDataError("old_password is required to change your own password")
+ self.logger.info("Password changed for user %s", target_user.username)
- # Verify old password and change to new one
- success = await builtin_provider.change_password(target_user, old_password, password)
- if not success:
- raise AuthenticationFailed("Invalid current password")
+ async def _update_user_filters(
+ self,
+ target_user: User,
+ player_filter: list[str] | None,
+ provider_filter: list[str] | None,
+ ) -> User:
+ """Update user player and provider filters (helper method)."""
+ updates = {}
+ if player_filter is not None:
+ updates["player_filter"] = json_dumps(player_filter)
+ if provider_filter is not None:
+ updates["provider_filter"] = json_dumps(provider_filter)
- self.logger.info("Password changed for user %s", target_user.username)
+ if updates:
+ await self.database.update("users", {"user_id": target_user.user_id}, updates)
+ # Refresh target user to get updated filters
+ refreshed_user = await self.get_user(target_user.user_id)
+ if not refreshed_user:
+ raise InvalidDataError("Failed to refresh user after filter update")
+ return refreshed_user
+ return target_user
@api_command("auth/user/update")
async def update_user_profile(
display_name: str | None = None,
avatar_url: str | None = None,
password: str | None = None,
- old_password: str | None = None,
role: str | None = None,
preferences: dict[str, Any] | None = None,
+ player_filter: list[str] | None = None,
+ provider_filter: list[str] | None = None,
) -> User:
"""
Update user profile information.
:param display_name: New display name (optional).
:param avatar_url: New avatar URL (optional).
:param password: New password (optional, minimum 8 characters).
- :param old_password: Current password (required when user updates own password).
- :param role: New role - "admin" or "user" (optional, admin only).
+ :param role: New role - "admin" or "user" (optional, set by admin only).
:param preferences: User preferences dict (completely replaces existing, optional).
+ :param player_filter: List of player IDs user has access to (set by admin only, optional).
+ :param provider_filter: List of provider instance IDs user has access to (set by admin only, optional).
:return: Updated user object.
- """
+ """ # noqa: E501
current_user_obj = get_current_user()
if not current_user_obj:
raise AuthenticationRequired("Not authenticated")
if preferences is not None:
target_user = await self.update_user_preferences(target_user, preferences)
+ # Update player_filter and provider_filter (admin only)
+ if player_filter is not None or provider_filter is not None:
+ if not is_admin_update:
+ raise InsufficientPermissions("Only admins can update player/provider filters")
+ target_user = await self._update_user_filters(
+ target_user, player_filter, provider_filter
+ )
+
# Update password if provided
if password:
await self._update_profile_password(
- target_user, password, old_password, is_admin_update, current_user_obj
+ target_user, password, is_admin_update, current_user_obj
)
return target_user
from __future__ import annotations
+import asyncio
import base64
import hashlib
import json
import secrets
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from datetime import datetime
+from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, TypedDict, cast
from urllib.parse import urlparse
from music_assistant_models.auth import AuthProviderType, User, UserRole
from music_assistant.constants import MASS_LOGGER_NAME
+from music_assistant.helpers.datetime import utc
if TYPE_CHECKING:
from music_assistant import MusicAssistant
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth")
+class LoginRateLimiter:
+ """Rate limiter for login attempts to prevent brute force attacks."""
+
+ def __init__(self) -> None:
+ """Initialize the rate limiter."""
+ # Track failed attempts per username: {username: [timestamp1, timestamp2, ...]}
+ self._failed_attempts: dict[str, list[datetime]] = {}
+ # Time window for tracking attempts (30 minutes)
+ self._tracking_window = timedelta(minutes=30)
+ # Lock for thread-safe access to _failed_attempts
+ self._lock = asyncio.Lock()
+
+ def _cleanup_old_attempts(self, username: str) -> None:
+ """
+ Remove failed attempts outside the tracking window.
+
+ :param username: The username to clean up.
+ """
+ if username not in self._failed_attempts:
+ return
+
+ cutoff_time = utc() - self._tracking_window
+ self._failed_attempts[username] = [
+ timestamp for timestamp in self._failed_attempts[username] if timestamp > cutoff_time
+ ]
+
+ # Remove username if no attempts left
+ if not self._failed_attempts[username]:
+ del self._failed_attempts[username]
+
+ def get_delay(self, username: str) -> int:
+ """
+ Get the delay in seconds before next login attempt is allowed.
+
+ Progressive delays based on failed attempts:
+ - 1-2 attempts: no delay
+ - 3-5 attempts: 30 seconds
+ - 6-9 attempts: 60 seconds
+ - 10-14 attempts: 120 seconds
+ - 15+ attempts: 300 seconds (5 minutes)
+
+ :param username: The username attempting to log in.
+ :return: Delay in seconds (0 if no delay needed).
+ """
+ self._cleanup_old_attempts(username)
+
+ if username not in self._failed_attempts:
+ return 0
+
+ attempt_count = len(self._failed_attempts[username])
+
+ if attempt_count < 3:
+ return 0
+ if attempt_count < 6:
+ return 30
+ if attempt_count < 10:
+ return 60
+ if attempt_count < 15:
+ return 120
+ return 300 # 5 minutes max delay
+
+ async def check_rate_limit(self, username: str) -> tuple[bool, int]:
+ """
+ Check if login attempt is allowed and apply delay if needed.
+
+ :param username: The username attempting to log in.
+ :return: Tuple of (allowed, delay_seconds). If not allowed, includes remaining delay.
+ """
+ async with self._lock:
+ self._cleanup_old_attempts(username)
+
+ if username not in self._failed_attempts or not self._failed_attempts[username]:
+ return True, 0
+
+ # Get the most recent failed attempt
+ last_attempt = self._failed_attempts[username][-1]
+ required_delay = self.get_delay(username)
+
+ if required_delay == 0:
+ return True, 0
+
+ # Calculate how much time has passed since last attempt
+ time_since_last = (utc() - last_attempt).total_seconds()
+
+ if time_since_last < required_delay:
+ # Still in cooldown period
+ remaining_delay = int(required_delay - time_since_last)
+ return False, remaining_delay
+
+ return True, 0
+
+ async def record_failed_attempt(self, username: str) -> None:
+ """
+ Record a failed login attempt.
+
+ :param username: The username that failed to log in.
+ """
+ async with self._lock:
+ self._cleanup_old_attempts(username)
+
+ if username not in self._failed_attempts:
+ self._failed_attempts[username] = []
+
+ self._failed_attempts[username].append(utc())
+
+ # Log warning for suspicious activity
+ attempt_count = len(self._failed_attempts[username])
+ if attempt_count == 10:
+ LOGGER.warning(
+ "Suspicious login activity: 10 failed attempts for username '%s'", username
+ )
+ elif attempt_count == 20:
+ LOGGER.warning(
+ "High suspicious login activity: 20 failed attempts for username '%s'. "
+ "Consider manually disabling this account.",
+ username,
+ )
+
+ async def clear_attempts(self, username: str) -> None:
+ """
+ Clear failed attempts for a username (called after successful login).
+
+ :param username: The username to clear.
+ """
+ async with self._lock:
+ if username in self._failed_attempts:
+ del self._failed_attempts[username]
+
+
class LoginProviderConfig(TypedDict, total=False):
"""Base configuration for login providers."""
class BuiltinLoginProvider(LoginProvider):
"""Built-in username/password login provider."""
+ def __init__(self, mass: MusicAssistant, provider_id: str, config: LoginProviderConfig) -> None:
+ """
+ Initialize built-in login provider.
+
+ :param mass: MusicAssistant instance.
+ :param provider_id: Unique identifier for this provider instance.
+ :param config: Provider-specific configuration.
+ """
+ super().__init__(mass, provider_id, config)
+ self._rate_limiter = LoginRateLimiter()
+
@property
def provider_type(self) -> AuthProviderType:
"""Return the provider type."""
if not username or not password:
return AuthResult(success=False, error="Username and password required")
+ # Check rate limit before attempting authentication
+ allowed, remaining_delay = await self._rate_limiter.check_rate_limit(username)
+ if not allowed:
+ self.logger.warning(
+ "Rate limit exceeded for username '%s'. %d seconds remaining.",
+ username,
+ remaining_delay,
+ )
+ return AuthResult(
+ success=False,
+ error=f"Too many failed attempts. Please try again in {remaining_delay} seconds.",
+ )
+
# First, look up user by username to get user_id
# This is needed to create the password hash with user_id in the salt
user_row = await self.auth_manager.database.get_row("users", {"username": username})
if not user_row:
+ # Record failed attempt even if username doesn't exist
+ # This prevents username enumeration timing attacks
+ await self._rate_limiter.record_failed_attempt(username)
return AuthResult(success=False, error="Invalid username or password")
user_id = user_row["user_id"]
)
if not user:
+ # Record failed attempt
+ await self._rate_limiter.record_failed_attempt(username)
return AuthResult(success=False, error="Invalid username or password")
# Check if user is enabled
if not user.enabled:
+ # Record failed attempt for disabled accounts too
+ await self._rate_limiter.record_failed_attempt(username)
return AuthResult(success=False, error="User account is disabled")
+ # Successful login - clear any failed attempts
+ await self._rate_limiter.clear_attempts(username)
return AuthResult(success=True, user=user)
async def create_user_with_password(
password: str,
role: UserRole = UserRole.USER,
display_name: str | None = None,
+ player_filter: list[str] | None = None,
+ provider_filter: list[str] | None = None,
) -> User:
"""
Create a new built-in user with password.
:param password: The password (will be hashed).
:param role: The user role (default: USER).
:param display_name: Optional display name.
+ :param player_filter: Optional list of player IDs user has access to.
+ :param provider_filter: Optional list of provider instance IDs user has access to.
"""
# Create the user
user = await self.auth_manager.create_user(
username=username,
role=role,
display_name=display_name,
+ player_filter=player_filter,
+ provider_filter=provider_filter,
)
# Hash password using user_id for enhanced security