from mashumaro import DataClassDictMixin
from music_assistant.common.models.enums import ProviderType
+from music_assistant.common.models.provider import ProviderManifest
from music_assistant.constants import (
CONF_CROSSFADE_DURATION,
CONF_EQ_BASS,
for entry in config_entries:
# create a copy of the entry
conf.values[entry.key] = ConfigEntry.from_dict(entry.to_dict())
- conf.values[entry.key].parse_value(raw["values"].get(entry.key), allow_none=True)
+ conf.values[entry.key].parse_value(
+ raw.get("values", {}).get(entry.key), allow_none=True
+ )
return conf
def to_raw(self) -> dict[str, Any]:
return ENCRYPT_CALLBACK(value.value)
return value.value
- return {
- **self.to_dict(),
- "values": {
- x.key: _handle_value(x)
- for x in self.values.values()
- if (x.value != x.default_value and x.type not in UI_ONLY)
- },
+ res = self.to_dict()
+ res.pop("manifest", None) # filter out from storage
+ res["values"] = {
+ x.key: _handle_value(x)
+ for x in self.values.values()
+ if (x.value != x.default_value and x.type not in UI_ONLY)
}
+ return res
def __post_serialize__(self, d: dict[str, Any]) -> dict[str, Any]:
"""Adjust dict object after it has been serialized."""
# root values (enabled, name)
root_values = ("enabled", "name")
for key in root_values:
- cur_val = getattr(self, key)
if key not in update:
continue
+ cur_val = getattr(self, key)
new_val = update[key]
if new_val == cur_val:
continue
type: ProviderType
domain: str
instance_id: str
+ manifest: ProviderManifest | None = None # copied here for UI convenience only
# enabled: boolean to indicate if the provider is enabled
enabled: bool = True
# name: an (optional) custom name for this provider instance/config
class CoreConfig(Config):
"""CoreController Configuration."""
- module: str # name of the core module
- friendly_name: str # friendly name of the core module
+ domain: str # domain/name of the core module
+ manifest: ProviderManifest | None = None # copied here for UI convenience only
+ # last_error: an optional error message if the module could not be setup with this config
last_error: str | None = None
PLAYER = "player"
METADATA = "metadata"
PLUGIN = "plugin"
+ CORE = "core"
class ConfigEntryType(StrEnum):
class CacheController(CoreController):
"""Basic cache controller using both memory and database."""
- name: str = "cache"
- friendly_name: str = "Cache controller"
+ domain: str = "cache"
def __init__(self, *args, **kwargs) -> None:
"""Initialize core controller."""
super().__init__(*args, **kwargs)
self.database: DatabaseConnection | None = None
self._mem_cache = MemoryCache(500)
+ self.manifest.name = "Cache controller"
+ self.manifest.description = (
+ "Music Assistant's core controller for caching data throughout the application."
+ )
+ self.manifest.icon = "mdi-memory"
async def setup(self) -> None:
"""Async initialize of cache module."""
remove = wrap(os.remove)
rename = wrap(os.rename)
+CONFIGURABLE_CORE_CONTROLLERS = ("streams", "webserver", "players", "metadata", "cache")
+
class ConfigController:
"""Controller that handles storage of persistent configuration settings."""
config_entries = await self.get_provider_config_entries(
raw_conf["domain"], instance_id=instance_id, values=raw_conf.get("values")
)
- return ProviderConfig.parse(config_entries, raw_conf)
+ for prov in self.mass.get_available_providers():
+ if prov.domain == raw_conf["domain"]:
+ manifest = prov
+ break
+ else:
+ raise KeyError(f'Unknown provider domain: {raw_conf["domain"]}')
+ conf: ProviderConfig = ProviderConfig.parse(config_entries, raw_conf)
+ # always copy the manifest to help the UI a bit
+ conf.manifest = manifest
+ return conf
raise KeyError(f"No config found for provider id {instance_id}")
@api_command("config/providers/get_value")
"""Return all core controllers config options."""
return [
await self.get_core_config(core_controller)
- for core_controller in ("streams", "webserver")
+ for core_controller in CONFIGURABLE_CORE_CONTROLLERS
]
@api_command("config/core/get")
- async def get_core_config(self, core_controller: str) -> CoreConfig:
+ async def get_core_config(self, domain: str) -> CoreConfig:
"""Return configuration for a single core controller."""
- raw_conf = self.get(f"{CONF_CORE}/{core_controller}", {})
- config_entries = await self.get_core_config_entries(core_controller)
- return CoreConfig.parse(config_entries, raw_conf)
+ core_controller: CoreController = getattr(self.mass, domain)
+ raw_conf = self.get(f"{CONF_CORE}/{domain}", {"domain": domain})
+ config_entries = await self.get_core_config_entries(domain)
+ conf: CoreConfig = CoreConfig.parse(config_entries, raw_conf)
+ # always copy the manifest to help the UI a bit
+ conf.manifest = core_controller.manifest
+ return conf
+
+ @api_command("config/core/get_value")
+ async def get_core_config_value(self, domain: str, key: str) -> ConfigValueType:
+ """Return single configentry value for a core controller."""
+ conf = await self.get_core_config(domain)
+ return (
+ conf.values[key].value
+ if conf.values[key].value is not None
+ else conf.values[key].default_value
+ )
@api_command("config/core/get_entries")
async def get_core_config_entries(
self,
- core_controller: str,
+ domain: str,
action: str | None = None,
values: dict[str, ConfigValueType] | None = None,
) -> tuple[ConfigEntry, ...]:
values: the (intermediate) raw values for config entries sent with the action.
"""
if values is None:
- values = self.get(f"{CONF_CORE}/{core_controller}/values", {})
- controller: CoreController = getattr(self.mass, core_controller)
+ values = self.get(f"{CONF_CORE}/{domain}/values", {})
+ controller: CoreController = getattr(self.mass, domain)
return (
await controller.get_config_entries(action=action, values=values)
+ DEFAULT_CORE_CONFIG_ENTRIES
@api_command("config/core/save")
async def save_core_config(
self,
- core_controller: str,
+ domain: str,
values: dict[str, ConfigValueType],
) -> CoreConfig:
"""Save CoreController Config values."""
- config = await self.get_core_config(core_controller)
+ config = await self.get_core_config(domain)
changed_keys = config.update(values)
# validate the new config
config.validate()
# no changes
return config
# try to load the provider first to catch errors before we save it.
- controller: CoreController = getattr(self.mass, core_controller)
+ controller: CoreController = getattr(self.mass, domain)
await controller.reload()
# reload succeeded, save new config
config.last_error = None
- conf_key = f"{CONF_CORE}/{core_controller}"
+ conf_key = f"{CONF_CORE}/{domain}"
self.set(conf_key, config.to_raw())
# return full config, just in case
- return await self.get_core_config(core_controller)
+ return await self.get_core_config(domain)
def get_raw_core_config_value(
self, core_module: str, key: str, default: ConfigValueType = None
class MetaDataController(CoreController):
"""Several helpers to search and store metadata for mediaitems."""
- name: str = "metadata"
- friendly_name: str = "Metadata controller"
+ domain: str = "metadata"
def __init__(self, *args, **kwargs) -> None:
"""Initialize class."""
self.cache = self.mass.cache
self._pref_lang: str | None = None
self.scan_busy: bool = False
+ self.manifest.name = "Metadata controller"
+ self.manifest.description = (
+ "Music Assistant's core controller which handles all metadata for music."
+ )
+ self.manifest.icon = "mdi-book-information-variant"
async def setup(self) -> None:
"""Async initialize of module."""
class MusicController(CoreController):
"""Several helpers around the musicproviders."""
- name: str = "music"
- friendly_name: str = "Music library"
+ domain: str = "music"
database: DatabaseConnection | None = None
self.playlists = PlaylistController(self.mass)
self.in_progress_syncs: list[SyncTask] = []
self._sync_lock = asyncio.Lock()
+ self.manifest.name = "Music controller"
+ self.manifest.description = (
+ "Music Assistant's core controller which manages all music from all providers."
+ )
+ self.manifest.icon = "mdi-archive-music"
async def setup(self):
"""Async initialize of module."""
class PlayerController(CoreController):
"""Controller holding all logic to control registered players."""
- name: str = "players"
- friendly_name: str = "Players controller"
+ domain: str = "players"
def __init__(self, *args, **kwargs) -> None:
"""Initialize core controller."""
self._players: dict[str, Player] = {}
self._prev_states: dict[str, dict] = {}
self.queues = PlayerQueuesController(self)
+ self.manifest.name = "Players controller"
+ self.manifest.description = (
+ "Music Assistant's core controller which manages all players from all providers."
+ )
+ self.manifest.icon = "mdi-speaker-multiple"
async def setup(self) -> None:
"""Async initialize of module."""
from aiohttp import web
from music_assistant.common.helpers.util import get_ip, select_free_port
-from music_assistant.common.models.config_entries import (
- DEFAULT_CORE_CONFIG_ENTRIES,
- ConfigEntry,
- ConfigValueType,
-)
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType, ContentType
from music_assistant.common.models.errors import MediaNotFoundError, QueueEmpty
from music_assistant.common.models.media_items import AudioFormat
}
FLOW_MAX_SAMPLE_RATE = 96000
FLOW_MAX_BIT_DEPTH = 24
-WORKAROUND_PLAYERS_CACHE_KEY = "streams.workaround_players"
class MultiClientStreamJob:
class StreamsController(CoreController):
"""Webserver Controller to stream audio to players."""
- name: str = "streams"
- friendly_name: str = "Streamserver"
+ domain: str = "streams"
def __init__(self, *args, **kwargs):
"""Initialize instance."""
self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
self.register_dynamic_route = self._server.register_dynamic_route
self.unregister_dynamic_route = self._server.unregister_dynamic_route
- self.workaround_players: set[str] = set()
+ self.manifest.name = "Streamserver"
+ self.manifest.description = (
+ "Music Assistant's core server that is responsible for "
+ "streaming audio to players on the local network as well as "
+ "some player specific local control callbacks."
+ )
+ self.manifest.icon = "mdi-cast-audio"
@property
def base_url(self) -> str:
values: dict[str, ConfigValueType] | None = None, # noqa: ARG002
) -> tuple[ConfigEntry, ...]:
"""Return all Config Entries for this core module (if any)."""
- return DEFAULT_CORE_CONFIG_ENTRIES + (
+ default_ip = await get_ip()
+ default_port = await select_free_port(8096, 9200)
+ return (
ConfigEntry(
key=CONF_BIND_PORT,
- type=ConfigEntryType.STRING,
- default_value=self._default_port,
+ type=ConfigEntryType.INTEGER,
+ default_value=default_port,
label="TCP Port",
description="The TCP port to run the server. "
"Make sure that this server can be reached "
ConfigEntry(
key=CONF_BIND_IP,
type=ConfigEntryType.STRING,
- default_value=self._default_ip,
+ default_value=default_ip,
label="Bind to IP/interface",
- description="Start the (web)server on this specific interface. \n"
+ description="Start the streamserver on this specific interface. \n"
"This IP address is communicated to players where to find this server. "
"Override the default in advanced scenarios, such as multi NIC configurations. \n"
"Make sure that this server can be reached "
"on the given IP and TCP port by players on the local network. \n"
"This is an advanced setting that should normally "
"not be adjusted in regular setups.",
- advanced=True,
),
)
async def setup(self) -> None:
"""Async initialize of module."""
- self._default_ip = await get_ip()
- self._default_port = await select_free_port(8096, 9200)
ffmpeg_present, libsoxr_support, version = await check_audio_support()
if not ffmpeg_present:
self.logger.error("FFmpeg binary not found on your system, playback will NOT work!.")
version,
"with libsoxr support" if libsoxr_support else "",
)
- # restore known workaround players
- if cache := await self.mass.cache.get(WORKAROUND_PLAYERS_CACHE_KEY):
- self.workaround_players.update(cache)
# start the webserver
- self.publish_port = bind_port = self.mass.config.get_raw_core_config_value(
- self.name, CONF_BIND_IP, self._default_port
- )
- self.publish_ip = bind_ip = self.mass.config.get_raw_core_config_value(
- self.name, CONF_BIND_IP, self._default_ip
+ self.publish_port = await self.mass.config.get_core_config_value(
+ self.domain, CONF_BIND_PORT
)
+ self.publish_ip = await self.mass.config.get_core_config_value(self.domain, CONF_BIND_IP)
await self._server.setup(
- bind_ip=bind_ip,
- bind_port=bind_port,
- base_url=f"http://{bind_ip}:{bind_port}",
+ bind_ip=self.publish_ip,
+ bind_port=self.publish_port,
+ base_url=f"http://{self.publish_ip}:{self.publish_port}",
static_routes=[
("GET", "/preview", self.serve_preview_stream),
(
async def close(self) -> None:
"""Cleanup on exit."""
await self._server.close()
- await self.mass.cache.set(WORKAROUND_PLAYERS_CACHE_KEY, self.workaround_players)
async def resolve_stream_url(
self,
MessageType,
SuccessResultMessage,
)
-from music_assistant.common.models.config_entries import DEFAULT_CORE_CONFIG_ENTRIES, ConfigEntry
+from music_assistant.common.models.config_entries import ConfigEntry
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import InvalidCommand
from music_assistant.common.models.event import MassEvent
from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT
from music_assistant.server.helpers.api import APICommandHandler, parse_arguments
+from music_assistant.server.helpers.util import get_ips, is_hass_supervisor
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
class WebserverController(CoreController):
"""Core Controller that manages the builtin webserver that hosts the api and frontend."""
- name: str = "webserver"
- friendly_name: str = "Web Server (frontend and api)"
+ domain: str = "webserver"
def __init__(self, *args, **kwargs):
"""Initialize instance."""
super().__init__(*args, **kwargs)
self._server = Webserver(self.logger, enable_dynamic_routes=False)
self.clients: set[WebsocketClientHandler] = set()
+ self.manifest.name = "Web Server (frontend and api)"
+ self.manifest.description = (
+ "The built-in webserver that hosts the Music Assistant Websockets API and frontend"
+ )
+ self.manifest.icon = "mdi-web-box"
@property
def base_url(self) -> str:
values: dict[str, ConfigValueType] | None = None, # noqa: ARG002
) -> tuple[ConfigEntry, ...]:
"""Return all Config Entries for this core module (if any)."""
- return DEFAULT_CORE_CONFIG_ENTRIES + (
- ConfigEntry(
- key=CONF_BIND_PORT,
- type=ConfigEntryType.STRING,
- default_value=self._default_port,
- label="TCP Port",
- description="The TCP port to run the webserver.",
- ),
+ if await is_hass_supervisor():
+ # if we're running on the HA supervisor the webserver is secured by HA ingress
+ # we only start the webserver on the internal docker network and ingress connects
+ # to that internally and exposes the webUI securely
+ # if a user also wants to expose a the webserver non securely on his internal
+ # network he/she should open the port in the add-on config.
+ internal_ip = next((x for x in await get_ips() if x.startswith("172")), await get_ip())
+ base_url = f"http://{internal_ip:8095}"
+ return (
+ ConfigEntry(
+ key=CONF_BIND_PORT,
+ type=ConfigEntryType.STRING,
+ # hardcoded/static value
+ default_value=8095,
+ value=8095,
+ label="TCP Port",
+ description="",
+ hidden=True,
+ ),
+ ConfigEntry(
+ key=CONF_BIND_IP,
+ type=ConfigEntryType.STRING,
+ # hardcoded/static value
+ default_value=internal_ip,
+ value=internal_ip,
+ label=CONF_BIND_IP,
+ description="",
+ hidden=True,
+ ),
+ ConfigEntry(
+ key=CONF_BASE_URL,
+ type=ConfigEntryType.STRING,
+ # hardcoded/static value
+ default_value=base_url,
+ value=base_url,
+ label=CONF_BASE_URL,
+ hidden=True,
+ ),
+ )
+
+ # HA supervisor not present: user is responsible for securing the webserver
+ # we give the tools to do so by presenting config options
+ default_ip = await get_ip()
+ default_port = await select_free_port(8095, 9200)
+ default_base_url = f"http://{default_ip}:{default_port}"
+ return (
ConfigEntry(
key=CONF_BASE_URL,
type=ConfigEntryType.STRING,
- default_value=self._default_base_url,
+ default_value=default_base_url,
label="Base URL",
description="The (base) URL to reach this webserver in the network. \n"
"Override this in advanced scenarios where for example you're running "
"the webserver behind a reverse proxy.",
- advanced=True,
+ ),
+ ConfigEntry(
+ key=CONF_BIND_PORT,
+ type=ConfigEntryType.INTEGER,
+ default_value=default_port,
+ label="TCP Port",
+ description="The TCP port to run the webserver.",
),
ConfigEntry(
key=CONF_BIND_IP,
"to enhance security and protect outside access to the webinterface and API. \n\n"
"This is an advanced setting that should normally "
"not be adjusted in regular setups.",
- advanced=True,
),
)
async def setup(self) -> None:
"""Async initialize of module."""
- self._default_ip = await get_ip()
- self._default_port = await select_free_port(8095, 9200)
- self._default_base_url = f"http://{self._default_ip}:{self._default_port}"
# work out all routes
routes: list[tuple[str, str, Awaitable]] = []
# frontend routes
# add websocket api
routes.append(("GET", "/ws", self._handle_ws_client))
# start the webserver
- bind_port = self.mass.config.get_raw_core_config_value(
- self.name, CONF_BIND_IP, self._default_port
- )
- bind_ip = self.mass.config.get_raw_core_config_value(
- self.name, CONF_BIND_IP, self._default_ip
- )
- base_url = self.mass.config.get_raw_core_config_value(
- self.name, CONF_BASE_URL, self._default_ip
- )
await self._server.setup(
- bind_ip=bind_ip,
- bind_port=bind_port,
- base_url=base_url,
+ bind_ip=await self.mass.config.get_core_config_value(self.domain, CONF_BIND_IP),
+ bind_port=await self.mass.config.get_core_config_value(self.domain, CONF_BIND_PORT),
+ base_url=await self.mass.config.get_core_config_value(self.domain, CONF_BASE_URL),
static_routes=routes,
# add assets subdir as static_content
static_content=("/assets", os.path.join(frontend_dir, "assets"), "assets"),
import importlib
import logging
import platform
+import socket
import tempfile
+import urllib.error
+import urllib.parse
+import urllib.request
+from contextlib import suppress
from functools import lru_cache
from typing import TYPE_CHECKING
raise RuntimeError(msg)
+async def get_ips(include_ipv6: bool = False) -> set[str]:
+ """Return all IP-adresses of all network interfaces."""
+
+ def call() -> set[str]:
+ result: set[str] = set()
+ for item in socket.getaddrinfo(socket.gethostname(), None):
+ protocol, *_, (ip, *_) = item
+ if protocol == socket.AddressFamily.AF_INET or (
+ include_ipv6 and protocol == socket.AddressFamily.AF_INET6
+ ):
+ result.add(ip)
+ return result
+
+ return await asyncio.to_thread(call)
+
+
+async def is_hass_supervisor() -> bool:
+ """Return if we're running inside the HA Supervisor (e.g. HAOS)."""
+ with suppress(urllib.error.URLError):
+ res = await asyncio.to_thread(urllib.request.urlopen, "ws://supervisor/core/websocket")
+ return res.code == 401
+ return False
+
+
async def get_provider_module(domain: str) -> ProviderModuleType:
"""Return module for given provider domain."""
import logging
from typing import TYPE_CHECKING
+from music_assistant.common.models.enums import ProviderType
+from music_assistant.common.models.provider import ProviderManifest
from music_assistant.constants import CONF_LOG_LEVEL, ROOT_LOGGER_NAME
if TYPE_CHECKING:
class CoreController:
"""Base representation of a Core controller within Music Assistant."""
- name: str
- friendly_name: str
+ domain: str # used as identifier (=name of the module)
+ manifest: ProviderManifest # some info for the UI only
def __init__(self, mass: MusicAssistant) -> None:
"""Initialize MusicProvider."""
self.mass = mass
- self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.{self.name}")
- log_level = self.mass.config.get_raw_core_config_value(self.name, CONF_LOG_LEVEL, "GLOBAL")
+ self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.{self.domain}")
+ log_level = self.mass.config.get_raw_core_config_value(
+ self.domain, CONF_LOG_LEVEL, "GLOBAL"
+ )
if log_level != "GLOBAL":
self.logger.setLevel(log_level)
+ self.manifest = ProviderManifest(
+ type=ProviderType.CORE,
+ domain=self.domain,
+ name=f"{self.domain.title()} Core controller",
+ description=f"{self.domain.title()} Core controller",
+ codeowners=["@music-assistant"],
+ icon="mdi:puzzle-outline",
+ )
async def get_config_entries(
self,
async def reload(self) -> None:
"""Reload this core controller."""
await self.close()
- log_level = self.mass.config.get_raw_core_config_value(self.name, CONF_LOG_LEVEL, "GLOBAL")
+ log_level = self.mass.config.get_raw_core_config_value(
+ self.domain, CONF_LOG_LEVEL, "GLOBAL"
+ )
if log_level == "GLOBAL":
log_level = logging.getLogger(ROOT_LOGGER_NAME).level
self.logger.setLevel(log_level)
def on_new_cast_status(self, castplayer: CastPlayer, status: CastStatus) -> None:
"""Handle updated CastStatus."""
+ if status is None:
+ return # guard
castplayer.logger.debug(
"Received cast status - app_id: %s - volume: %s",
status.app_id,
"python-slugify==8.0.1",
"mashumaro==3.7",
"memory-tempfile==2.2.3",
- "music-assistant-frontend==20230707.0",
+ "music-assistant-frontend==20230707.1",
"pillow==9.5.0",
"unidecode==1.3.6",
"xmltodict==0.13.0",
git+https://github.com/pytube/pytube.git@refs/pull/1680/head
mashumaro==3.7
memory-tempfile==2.2.3
-music-assistant-frontend==20230707.0
+music-assistant-frontend==20230707.1
orjson==3.9.1
pillow==9.5.0
plexapi==4.14.0