import pathlib
import threading
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
+from enum import Enum
from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar, cast, overload
from uuid import uuid4
return provider.type == ProviderType.PLAYER
+class CoreState(Enum):
+ """Enum representing the core state of the Music Assistant server."""
+
+ STARTING = "starting"
+ RUNNING = "running"
+ STOPPING = "stopping"
+ STOPPED = "stopped"
+
+
class MusicAssistant:
"""Main MusicAssistant (Server) object."""
def __init__(self, storage_path: str, cache_path: str, safe_mode: bool = False) -> None:
"""Initialize the MusicAssistant Server."""
+ self._state = CoreState.STARTING
self.storage_path = storage_path
self.cache_path = cache_path
self.safe_mode = safe_mode
self._providers: dict[str, ProviderInstanceType] = {}
self._tracked_tasks: dict[str, asyncio.Task[Any]] = {}
self._tracked_timers: dict[str, asyncio.TimerHandle] = {}
- self.closing = False
self.running_as_hass_addon: bool = False
self.version: str = "0.0.0"
+ self.logger = LOGGER
self.dev_mode = (
os.environ.get("PYTHONDEVMODE") == "1"
or pathlib.Path(__file__).parent.resolve().parent.resolve().joinpath(".venv").exists()
for controller_name in CONFIGURABLE_CORE_CONTROLLERS:
controller: CoreController = getattr(self, controller_name)
self._provider_manifests[controller.domain] = controller.manifest
+ # load webserver/api first so the api/frontend is available as soon as possible,
+ # other controllers are not yet available while we're starting (or performing migrations)
+ self._register_api_commands()
+ await self.webserver.setup(await self.config.get_core_config("webserver"))
await self.cache.setup(await self.config.get_core_config("cache"))
- # load streams controller early so we can abort if we can't load it
await self.streams.setup(await self.config.get_core_config("streams"))
await self.music.setup(await self.config.get_core_config("music"))
await self.metadata.setup(await self.config.get_core_config("metadata"))
await self.players.setup(await self.config.get_core_config("players"))
await self.player_queues.setup(await self.config.get_core_config("player_queues"))
- # load webserver/api last so the api/frontend is
- # not yet available while we're starting (or performing migrations)
- self._register_api_commands()
- await self.webserver.setup(await self.config.get_core_config("webserver"))
+ # load builtin providers (always needed, also in safe mode)
+ await self._load_builtin_providers()
# setup discovery
await self._setup_discovery()
- # load providers
+ # at this point we are fully up and running,
+ # set state to running to signal we're ready
+ self._state = CoreState.RUNNING
+ # load regular providers (skip when in safe mode)
+ # providers are loaded in background tasks so they won't block
+ # the startup if they fail or take a long time to load
if not self.safe_mode:
await self._load_providers()
"""Stop running the music assistant server."""
LOGGER.info("Stop called, cleaning up...")
self.signal_event(EventType.SHUTDOWN)
- self.closing = True
+ self._state = CoreState.STOPPING
# cancel all running tasks
for task in list(self._tracked_tasks.values()):
task.cancel()
self._http_session_no_ssl.detach()
if self._http_session_no_ssl.connector:
await self._http_session_no_ssl.connector.close()
+ self._state = CoreState.STOPPED
+
+ @property
+ def state(self) -> CoreState:
+ """Return current state of the core."""
+ return self._state
+
+ @property
+ def closing(self) -> bool:
+ """Return true if the server is (in the process of) closing."""
+ return self._state in (CoreState.STOPPING, CoreState.STOPPED)
@property
def server_id(self) -> str:
required_role = getattr(obj, "api_required_role", None)
self.register_api_command(obj.api_cmd, obj, authenticated, required_role)
- async def _load_providers(self) -> None:
- """Load providers from config."""
- # create default config for any 'builtin' providers (e.g. URL provider)
+ async def _load_builtin_providers(self) -> None:
+ """
+ Load all builtin providers.
+
+ Builtin providers are always needed (also in safe mode) and are fully awaited.
+ On error, setup will fail.
+ """
+ # create default config for any 'builtin' providers
for prov_manifest in self._provider_manifests.values():
if prov_manifest.type == ProviderType.CORE:
# core controllers are not real providers
if not prov_manifest.builtin:
continue
await self.config.create_builtin_provider_config(prov_manifest.domain)
+
+ # load all configured (and enabled) builtin providers
+ prov_configs = await self.config.get_provider_configs(include_values=True)
+ builtin_configs: list[ProviderConfig] = [
+ prov_conf
+ for prov_conf in prov_configs
+ if (manifest := self._provider_manifests.get(prov_conf.domain))
+ and manifest.builtin
+ and (prov_conf.enabled or manifest.allow_disable is False)
+ ]
+
+ # load builtin providers and wait for them to complete
+ await asyncio.gather(
+ *[self.load_provider(conf.instance_id, allow_retry=True) for conf in builtin_configs]
+ )
+
+ async def _load_providers(self) -> None:
+ """
+ Load regular (non-builtin) providers from config.
+
+ Regular providers are loaded in background tasks
+ and can fail without affecting core setup.
+ """
# handle default providers setup
self.config.set_default(CONF_DEFAULT_PROVIDERS_SETUP, set())
default_providers_setup = cast("set[str]", self.config.get(CONF_DEFAULT_PROVIDERS_SETUP))
if changes_made:
self.config.set(CONF_DEFAULT_PROVIDERS_SETUP, default_providers_setup)
self.config.save(True)
-
- # load all configured (and enabled) providers
- # builtin providers are loaded first (and awaited) before loading the rest
+ # load all configured (and enabled) regular (non-builtin) providers
prov_configs = await self.config.get_provider_configs(include_values=True)
- builtin_configs: list[ProviderConfig] = []
- other_configs: list[ProviderConfig] = []
- for prov_conf in prov_configs:
- if not prov_conf.enabled:
- continue
- manifest = self._provider_manifests.get(prov_conf.domain)
- if manifest and manifest.builtin:
- builtin_configs.append(prov_conf)
- else:
- other_configs.append(prov_conf)
-
- # load builtin providers first and wait for them to complete
- await asyncio.gather(
- *[self.load_provider(conf.instance_id, allow_retry=True) for conf in builtin_configs]
- )
-
- # load remaining providers concurrently via tasks
+ other_configs: list[ProviderConfig] = [
+ prov_conf
+ for prov_conf in prov_configs
+ if prov_conf.enabled
+ and (
+ not (manifest := self._provider_manifests.get(prov_conf.domain))
+ or not manifest.builtin
+ )
+ ]
+ # load providers concurrently via tasks
for prov_conf in other_configs:
# Use a task so we can load multiple providers at once.
# If a provider fails, that will not block the loading of other providers.
provider_manifest.allow_disable = False
self._provider_manifests[provider_manifest.domain] = provider_manifest
- LOGGER.debug("Loaded manifest for provider %s", provider_manifest.name)
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL, "Loaded manifest for provider %s", provider_manifest.name
+ )
except Exception as exc:
LOGGER.exception(
"Error while loading manifest for provider %s",
if not await isdir(dir_path):
continue
tg.create_task(load_provider_manifest(dir_str, dir_path))
+ self.logger.debug("Loaded %s provider manifests", len(self._provider_manifests))
async def _setup_discovery(self) -> None:
"""Handle setup of MDNS discovery."""