From e119442403df319fc1f9dddc67a8c3df3c47d733 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 18 Dec 2025 02:48:31 +0100 Subject: [PATCH] Fix SSL configuration (#2836) - allow to specify a path - support both RSA and ECDSA --- .../controllers/webserver/controller.py | 107 +++--- .../controllers/webserver/helpers/ssl.py | 363 ++++++++++++++++++ 2 files changed, 414 insertions(+), 56 deletions(-) create mode 100644 music_assistant/controllers/webserver/helpers/ssl.py diff --git a/music_assistant/controllers/webserver/controller.py b/music_assistant/controllers/webserver/controller.py index 9cb62263..d7deffec 100644 --- a/music_assistant/controllers/webserver/controller.py +++ b/music_assistant/controllers/webserver/controller.py @@ -11,13 +11,10 @@ import asyncio import hashlib import html import os -import ssl -import tempfile import urllib.parse from collections.abc import Awaitable, Callable from concurrent import futures from functools import partial -from pathlib import Path from typing import TYPE_CHECKING, Any, Final, cast from urllib.parse import quote @@ -37,6 +34,11 @@ from music_assistant.constants import ( RESOURCES_DIR, VERBOSE_LOG_LEVEL, ) +from music_assistant.controllers.webserver.helpers.ssl import ( + create_server_ssl_context, + format_certificate_info, + verify_ssl_certificate, +) from music_assistant.helpers.api import parse_arguments from music_assistant.helpers.audio import get_preview_stream from music_assistant.helpers.json import json_dumps, json_loads @@ -67,6 +69,7 @@ CONF_BASE_URL = "base_url" CONF_ENABLE_SSL = "enable_ssl" CONF_SSL_CERTIFICATE = "ssl_certificate" CONF_SSL_PRIVATE_KEY = "ssl_private_key" +CONF_ACTION_VERIFY_SSL = "verify_ssl" MAX_PENDING_MSG = 512 CANCELLATION_ERRORS: Final = (asyncio.CancelledError, futures.CancelledError) @@ -105,6 +108,15 @@ class WebserverController(CoreController): ip_addresses = await get_ip_addresses() default_publish_ip = ip_addresses[0] + # Handle verify SSL action + ssl_verify_result = "" + if action == CONF_ACTION_VERIFY_SSL and values: + cert_info = await verify_ssl_certificate( + str(values.get(CONF_SSL_CERTIFICATE, "")), + str(values.get(CONF_SSL_PRIVATE_KEY, "")), + ) + ssl_verify_result = format_certificate_info(cert_info) + # Determine if SSL is enabled from values ssl_enabled = values.get(CONF_ENABLE_SSL, False) if values else False protocol = "https" if ssl_enabled else "http" @@ -115,8 +127,7 @@ class WebserverController(CoreController): type=ConfigEntryType.BOOLEAN, default_value=True, label="Allow User Self-Registration", - description="Allow users to create accounts via Home Assistant OAuth. \n" - "New users will have USER role by default.", + description="Allow users to create accounts via Home Assistant OAuth.", hidden=not any(provider.domain == "hass" for provider in self.mass.providers), ), ConfigEntry( @@ -138,9 +149,9 @@ class WebserverController(CoreController): ConfigEntry( key="webserver_warn", type=ConfigEntryType.ALERT, - label="Please note that the webserver is unencrypted. " + label="Please note that the webserver is by default unencrypted. " "Never ever expose the webserver directly to the internet! \n\n" - "Use a reverse proxy or VPN to secure access, or enable SSL below. \n\n" + "Enable SSL below or use a reverse proxy or VPN to secure access. \n\n" "As an alternative, consider using the Remote Access feature which " "secures access to your Music Assistant instance without the need to " "expose your webserver directly.", @@ -160,8 +171,11 @@ class WebserverController(CoreController): key=CONF_SSL_CERTIFICATE, type=ConfigEntryType.STRING, label="SSL Certificate", - description="Paste the contents of your SSL certificate file (PEM format). \n" - "This should include the full certificate chain if applicable.", + description="Provide your SSL certificate in PEM format. You can either:\n" + "- Paste the full contents of your certificate file, or\n" + "- Enter an absolute file path (e.g., /ssl/fullchain.pem)\n\n" + "This should include the full certificate chain if applicable.\n" + "Both RSA and ECDSA certificates are supported.", required=False, depends_on=CONF_ENABLE_SSL, ), @@ -169,11 +183,33 @@ class WebserverController(CoreController): key=CONF_SSL_PRIVATE_KEY, type=ConfigEntryType.SECURE_STRING, label="SSL Private Key", - description="Paste the contents of your SSL private key file (PEM format). \n" + description="Provide your SSL private key in PEM format. You can either:\n" + "- Paste the full contents of your private key file, or\n" + "- Enter an absolute file path (e.g., /ssl/privkey.pem)\n\n" + "Both RSA and ECDSA keys are supported. The key must be unencrypted.\n" "This is securely encrypted and stored.", required=False, depends_on=CONF_ENABLE_SSL, ), + ConfigEntry( + key=CONF_ACTION_VERIFY_SSL, + type=ConfigEntryType.ACTION, + label="Verify SSL Certificate", + description="Test your certificate and private key to verify they are valid " + "and match each other.", + action=CONF_ACTION_VERIFY_SSL, + action_label="Verify", + depends_on=CONF_ENABLE_SSL, + required=False, + ), + ConfigEntry( + key="ssl_verify_result", + type=ConfigEntryType.LABEL, + label=ssl_verify_result, + hidden=not ssl_verify_result, + depends_on=CONF_ENABLE_SSL, + required=False, + ), ConfigEntry( key=CONF_BIND_IP, type=ConfigEntryType.STRING, @@ -304,52 +340,11 @@ class WebserverController(CoreController): ssl_context = None ssl_enabled = config.get_value(CONF_ENABLE_SSL, False) if ssl_enabled: - ssl_certificate = config.get_value(CONF_SSL_CERTIFICATE) - ssl_private_key = config.get_value(CONF_SSL_PRIVATE_KEY) - - if not ssl_certificate or not ssl_private_key: - self.logger.error( - "SSL is enabled but certificate or private key is missing. " - "Webserver will start without SSL." - ) - else: - try: - # Create SSL context - ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - - # Write certificate and key to temporary files - # This is necessary because ssl.SSLContext.load_cert_chain requires file paths - with tempfile.NamedTemporaryFile( - mode="w", suffix=".pem", delete=False - ) as cert_file: - cert_file.write(str(ssl_certificate)) - cert_path = cert_file.name - - with tempfile.NamedTemporaryFile( - mode="w", suffix=".pem", delete=False - ) as key_file: - key_file.write(str(ssl_private_key)) - key_path = key_file.name - - try: - # Load certificate and private key - ssl_context.load_cert_chain(cert_path, key_path) - self.logger.info("SSL/TLS enabled for webserver") - finally: - # Clean up temporary files - try: - Path(cert_path).unlink() - Path(key_path).unlink() - except Exception as cleanup_err: - self.logger.debug( - "Failed to cleanup temporary SSL files: %s", cleanup_err - ) - - except Exception as e: - self.logger.exception( - "Failed to create SSL context: %s. Webserver will start without SSL.", e - ) - ssl_context = None + ssl_context = await create_server_ssl_context( + str(config.get_value(CONF_SSL_CERTIFICATE) or ""), + str(config.get_value(CONF_SSL_PRIVATE_KEY) or ""), + logger=self.logger, + ) await self._server.setup( bind_ip=bind_ip, diff --git a/music_assistant/controllers/webserver/helpers/ssl.py b/music_assistant/controllers/webserver/helpers/ssl.py new file mode 100644 index 00000000..393fb895 --- /dev/null +++ b/music_assistant/controllers/webserver/helpers/ssl.py @@ -0,0 +1,363 @@ +"""SSL helpers for the webserver controller.""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +import ssl +import subprocess +import tempfile +from dataclasses import dataclass +from pathlib import Path + +import aiofiles + +LOGGER = logging.getLogger(__name__) + + +@dataclass +class SSLCertificateInfo: + """Information about an SSL certificate.""" + + is_valid: bool + key_type: str # "RSA", "ECDSA", or "Unknown" + subject: str + expiry: str + is_expired: bool + is_expiring_soon: bool # Within 30 days + error_message: str | None = None + + +async def get_ssl_content(value: str) -> str: + """Get SSL content from either a file path or the raw PEM content. + + :param value: Either an absolute file path or the raw PEM content. + :return: The PEM content as a string. + :raises FileNotFoundError: If the file path doesn't exist. + :raises ValueError: If the path is not a file. + """ + value = value.strip() + # Check if this looks like a file path (absolute path starting with /) + # PEM content always starts with "-----BEGIN" + if value.startswith("/") and not value.startswith("-----BEGIN"): + # This looks like a file path + path = Path(value) + if not path.exists(): + raise FileNotFoundError(f"SSL file not found: {value}") + if not path.is_file(): + raise ValueError(f"SSL path is not a file: {value}") + async with aiofiles.open(path) as f: + content: str = await f.read() + return content + # Otherwise, treat as raw PEM content + return value + + +def _run_openssl_command(args: list[str]) -> subprocess.CompletedProcess[str]: + """Run an openssl command synchronously. + + :param args: List of arguments for the openssl command (excluding 'openssl'). + :return: CompletedProcess result. + """ + return subprocess.run( # noqa: S603 + ["openssl", *args], # noqa: S607 + capture_output=True, + text=True, + timeout=10, + check=False, + ) + + +async def create_server_ssl_context( + certificate: str, + private_key: str, + logger: logging.Logger | None = None, +) -> ssl.SSLContext | None: + """Create an SSL context for a server from certificate and private key. + + :param certificate: The SSL certificate (file path or PEM content). + :param private_key: The SSL private key (file path or PEM content). + :param logger: Optional logger for error messages. + :return: SSL context if successful, None otherwise. + """ + log = logger or LOGGER + if not certificate or not private_key: + log.error( + "SSL is enabled but certificate or private key is missing. " + "Server will start without SSL." + ) + return None + + cert_path = None + key_path = None + try: + # Load certificate and key content (supports both file paths and raw content) + cert_content = await get_ssl_content(certificate) + key_content = await get_ssl_content(private_key) + + # Create SSL context + ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + + # Write certificate and key to temporary files + # This is necessary because ssl.SSLContext.load_cert_chain requires file paths + with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as cert_file: + cert_file.write(cert_content) + cert_path = cert_file.name + + with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as key_file: + key_file.write(key_content) + key_path = key_file.name + + # Load certificate and private key + ssl_context.load_cert_chain(cert_path, key_path) + log.info("SSL/TLS enabled for server") + return ssl_context + + except Exception as e: + log.exception("Failed to create SSL context: %s. Server will start without SSL.", e) + return None + finally: + # Clean up temporary files + if cert_path: + with contextlib.suppress(Exception): + Path(cert_path).unlink() + if key_path: + with contextlib.suppress(Exception): + Path(key_path).unlink() + + +async def verify_ssl_certificate(certificate: str, private_key: str) -> SSLCertificateInfo: + """Verify SSL certificate and private key are valid and match. + + :param certificate: The SSL certificate (file path or PEM content). + :param private_key: The SSL private key (file path or PEM content). + :return: SSLCertificateInfo with verification results. + """ + if not certificate or not private_key: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message="Both certificate and private key are required.", + ) + + # Load certificate and key content + try: + cert_content = await get_ssl_content(certificate) + except FileNotFoundError as e: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message=f"Certificate file not found: {e}", + ) + except Exception as e: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message=f"Error loading certificate: {e}", + ) + + try: + key_content = await get_ssl_content(private_key) + except FileNotFoundError as e: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message=f"Private key file not found: {e}", + ) + except Exception as e: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message=f"Error loading private key: {e}", + ) + + # Verify with temp files + try: + return await _verify_ssl_with_temp_files(cert_content, key_content) + except ssl.SSLError as e: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message=_format_ssl_error(e), + ) + except Exception as e: + return SSLCertificateInfo( + is_valid=False, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + error_message=f"Verification failed: {e}", + ) + + +async def _verify_ssl_with_temp_files(cert_content: str, key_content: str) -> SSLCertificateInfo: + """Verify SSL using temporary files. + + :param cert_content: Certificate PEM content. + :param key_content: Private key PEM content. + :return: SSLCertificateInfo with verification results. + """ + cert_path = None + key_path = None + try: + with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as cert_file: + cert_file.write(cert_content) + cert_path = cert_file.name + + with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as key_file: + key_file.write(key_content) + key_path = key_file.name + + # Test loading into SSL context + test_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + test_ctx.load_cert_chain(cert_path, key_path) + + # Get certificate details using openssl + return await _get_certificate_details(cert_path) + finally: + # Clean up temp files + if cert_path: + with contextlib.suppress(Exception): + Path(cert_path).unlink() + if key_path: + with contextlib.suppress(Exception): + Path(key_path).unlink() + + +async def _get_certificate_details(cert_path: str) -> SSLCertificateInfo: + """Get certificate details using openssl. + + :param cert_path: Path to the certificate file. + :return: SSLCertificateInfo with certificate details. + """ + # Get certificate info + result = await asyncio.to_thread( + _run_openssl_command, + ["x509", "-in", cert_path, "-noout", "-subject", "-dates", "-issuer"], + ) + + if result.returncode != 0: + return SSLCertificateInfo( + is_valid=True, + key_type="Unknown", + subject="", + expiry="", + is_expired=False, + is_expiring_soon=False, + ) + + # Parse certificate info + expiry = "" + subject = "" + for line in result.stdout.strip().split("\n"): + if line.startswith("notAfter="): + expiry = line.replace("notAfter=", "") + elif line.startswith("subject="): + subject = line.replace("subject=", "") + + # Check expiry status + expiry_check = await asyncio.to_thread( + _run_openssl_command, + ["x509", "-in", cert_path, "-noout", "-checkend", "0"], + ) + is_expired = expiry_check.returncode != 0 + + expiring_soon_check = await asyncio.to_thread( + _run_openssl_command, + ["x509", "-in", cert_path, "-noout", "-checkend", str(30 * 24 * 60 * 60)], + ) + is_expiring_soon = expiring_soon_check.returncode != 0 + + # Detect key type + key_type_result = await asyncio.to_thread( + _run_openssl_command, + ["x509", "-in", cert_path, "-noout", "-text"], + ) + key_type = "Unknown" + if "rsaEncryption" in key_type_result.stdout: + key_type = "RSA" + elif "id-ecPublicKey" in key_type_result.stdout: + key_type = "ECDSA" + + return SSLCertificateInfo( + is_valid=True, + key_type=key_type, + subject=subject, + expiry=expiry, + is_expired=is_expired, + is_expiring_soon=is_expiring_soon, + ) + + +def _format_ssl_error(e: ssl.SSLError) -> str: + """Format an SSL error into a user-friendly message. + + :param e: The SSL error. + :return: User-friendly error message. + """ + error_msg = str(e) + if "PEM lib" in error_msg: + return ( + "Invalid certificate or key format. " + "Make sure both are valid PEM format and the key is not encrypted." + ) + if "key values mismatch" in error_msg.lower(): + return ( + "Certificate and private key do not match. " + "Please verify you're using the correct key for this certificate." + ) + return f"SSL Error: {error_msg}" + + +def format_certificate_info(info: SSLCertificateInfo) -> str: + """Format SSLCertificateInfo into a human-readable string. + + :param info: The certificate info to format. + :return: Human-readable string. + """ + if not info.is_valid: + return f"Error: {info.error_message}" + + status = "VALID" + warning = "" + if info.is_expired: + status = "EXPIRED" + warning = " (Certificate has expired!)" + elif info.is_expiring_soon: + status = "EXPIRING SOON" + warning = " (Certificate expires within 30 days)" + + lines = [f"Certificate verification: {status}{warning}", f"Key type: {info.key_type}"] + if info.subject: + lines.append(f"Subject: {info.subject}") + if info.expiry: + lines.append(f"Expires: {info.expiry}") + + return "\n".join(lines) -- 2.34.1