import asyncio
import logging
import os
-from logging.handlers import TimedRotatingFileHandler
+import sys
+import threading
+from contextlib import suppress
+from logging.handlers import RotatingFileHandler
+from typing import Final
-import coloredlogs
from aiorun import run
+from colorlog import ColoredFormatter
from music_assistant.common.helpers.json import json_loads
+from music_assistant.constants import ROOT_LOGGER_NAME
from music_assistant.server import MusicAssistant
+from music_assistant.server.helpers.logging import activate_log_queue_handler
+
+FORMAT_DATE: Final = "%Y-%m-%d"
+FORMAT_TIME: Final = "%H:%M:%S"
+FORMAT_DATETIME: Final = f"{FORMAT_DATE} {FORMAT_TIME}"
+MAX_LOG_FILESIZE = 1000000 * 10 # 10 MB
def get_arguments():
logs_dir = os.path.join(data_path, "logs")
if not os.path.isdir(logs_dir):
os.mkdir(logs_dir)
- logger = logging.getLogger()
- log_fmt = "%(asctime)-15s %(levelname)-5s %(name)s -- %(message)s"
- log_formatter = logging.Formatter(log_fmt)
- consolehandler = logging.StreamHandler()
- consolehandler.setFormatter(log_formatter)
- consolehandler.setLevel(logging.DEBUG)
- logger.addHandler(consolehandler)
- log_filename = os.path.join(logs_dir, "musicassistant.log")
- file_handler = TimedRotatingFileHandler(
- log_filename, when="midnight", interval=1, backupCount=10
+
+ # define log formatter
+ log_fmt = "%(asctime)s.%(msecs)03d %(levelname)s (%(threadName)s) [%(name)s] %(message)s"
+
+ # base logging config for the root logger
+ logging.basicConfig(level=logging.INFO)
+
+ colorfmt = f"%(log_color)s{log_fmt}%(reset)s"
+ logging.getLogger().handlers[0].setFormatter(
+ ColoredFormatter(
+ colorfmt,
+ datefmt=FORMAT_DATETIME,
+ reset=True,
+ log_colors={
+ "DEBUG": "cyan",
+ "INFO": "green",
+ "WARNING": "yellow",
+ "ERROR": "red",
+ "CRITICAL": "red",
+ },
+ )
)
- file_handler.setLevel(logging.DEBUG)
- file_handler.setFormatter(log_formatter)
+
+ # Capture warnings.warn(...) and friends messages in logs.
+ # The standard destination for them is stderr, which may end up unnoticed.
+ # This way they're where other messages are, and can be filtered as usual.
+ logging.captureWarnings(True)
+
+ # setup file handler
+ log_filename = os.path.join(logs_dir, "musicassistant.log")
+ file_handler = RotatingFileHandler(log_filename, maxBytes=MAX_LOG_FILESIZE, backupCount=1)
+ # rotate log at each start
+ with suppress(OSError):
+ file_handler.doRollover()
+ file_handler.setFormatter(logging.Formatter(log_fmt, datefmt=FORMAT_DATETIME))
+ # file_handler.setLevel(logging.INFO)
+
+ logger = logging.getLogger()
logger.addHandler(file_handler)
- # global level is debug by default unless overridden
- logger.setLevel(level)
+ # apply the configured global log level to the (root) music assistant logger
+ logging.getLogger(ROOT_LOGGER_NAME).setLevel(level)
- # silence some loggers
+ # silence some noisy loggers
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
logging.getLogger("databases").setLevel(logging.WARNING)
+ logging.getLogger("requests").setLevel(logging.WARNING)
+ logging.getLogger("urllib3").setLevel(logging.WARNING)
+ logging.getLogger("aiohttp.access").setLevel(logging.WARNING)
+ logging.getLogger("httpx").setLevel(logging.WARNING)
+ logging.getLogger("charset_normalizer").setLevel(logging.WARNING)
+
+ sys.excepthook = lambda *args: logging.getLogger(None).exception(
+ "Uncaught exception", exc_info=args # type: ignore[arg-type]
+ )
+ threading.excepthook = lambda args: logging.getLogger(None).exception(
+ "Uncaught thread exception",
+ exc_info=( # type: ignore[arg-type]
+ args.exc_type,
+ args.exc_value,
+ args.exc_traceback,
+ ),
+ )
- # enable coloredlogs
- coloredlogs.install(level=level, fmt=log_fmt)
return logger
async def start_mass():
loop = asyncio.get_running_loop()
- if dev_mode:
+ activate_log_queue_handler()
+ if dev_mode or log_level == "DEBUG":
loop.set_debug(True)
await mass.start()
routes.append(("GET", "/", handler))
# add info
routes.append(("GET", "/info", self._handle_server_info))
+ # add logging
+ routes.append(("GET", "/log", self._handle_application_log))
# add websocket api
routes.append(("GET", "/ws", self._handle_ws_client))
# also host the image proxy on the webserver
finally:
self.clients.remove(connection)
+ async def _handle_application_log(self, request: web.Request) -> web.Response: # noqa: ARG002
+ """Handle request to get the application log."""
+ log_data = await self.mass.get_application_log()
+ return web.Response(text=log_data, content_type="text/text")
+
class WebSocketLogAdapter(logging.LoggerAdapter):
"""Add connection id to websocket log messages."""
--- /dev/null
+"""
+Logging utilities.
+
+A lot in this file has been copied from Home Assistant:
+https://github.com/home-assistant/core/blob/e5ccd85e7e26c167d0b73669a88bc3a7614dd456/homeassistant/util/logging.py#L78
+
+All rights reserved.
+"""
+from __future__ import annotations
+
+import asyncio
+import inspect
+import logging
+import logging.handlers
+import queue
+import traceback
+from collections.abc import Callable, Coroutine
+from functools import partial, wraps
+from typing import Any, TypeVar, cast, overload
+
+_T = TypeVar("_T")
+
+
+class LoggingQueueHandler(logging.handlers.QueueHandler):
+ """Process the log in another thread."""
+
+ listener: logging.handlers.QueueListener | None = None
+
+ def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
+ """Prepare a record for queuing.
+
+ This is added as a workaround for https://bugs.python.org/issue46755
+ """
+ record = super().prepare(record)
+ record.stack_info = None
+ return record
+
+ def handle(self, record: logging.LogRecord) -> Any:
+ """Conditionally emit the specified logging record.
+
+ Depending on which filters have been added to the handler, push the new
+ records onto the backing Queue.
+
+ The default python logger Handler acquires a lock
+ in the parent class which we do not need as
+ SimpleQueue is already thread safe.
+
+ See https://bugs.python.org/issue24645
+ """
+ return_value = self.filter(record)
+ if return_value:
+ self.emit(record)
+ return return_value
+
+ def close(self) -> None:
+ """Tidy up any resources used by the handler.
+
+ This adds shutdown of the QueueListener
+ """
+ super().close()
+ if not self.listener:
+ return
+ self.listener.stop()
+ self.listener = None
+
+
+def activate_log_queue_handler() -> None:
+ """Migrate the existing log handlers to use the queue.
+
+ This allows us to avoid blocking I/O and formatting messages
+ in the event loop as log messages are written in another thread.
+ """
+ simple_queue: queue.SimpleQueue[logging.Handler] = queue.SimpleQueue()
+ queue_handler = LoggingQueueHandler(simple_queue)
+ logging.root.addHandler(queue_handler)
+
+ migrated_handlers: list[logging.Handler] = []
+ for handler in logging.root.handlers[:]:
+ if handler is queue_handler:
+ continue
+ logging.root.removeHandler(handler)
+ migrated_handlers.append(handler)
+
+ listener = logging.handlers.QueueListener(simple_queue, *migrated_handlers)
+ queue_handler.listener = listener
+
+ listener.start()
+
+
+def log_exception(format_err: Callable[..., Any], *args: Any) -> None:
+ """Log an exception with additional context."""
+ module = inspect.getmodule(inspect.stack(context=0)[1].frame)
+ if module is not None: # noqa: SIM108
+ module_name = module.__name__
+ else:
+ # If Python is unable to access the sources files, the call stack frame
+ # will be missing information, so let's guard.
+ # https://github.com/home-assistant/core/issues/24982
+ module_name = __name__
+
+ # Do not print the wrapper in the traceback
+ frames = len(inspect.trace()) - 1
+ exc_msg = traceback.format_exc(-frames)
+ friendly_msg = format_err(*args)
+ logging.getLogger(module_name).error("%s\n%s", friendly_msg, exc_msg)
+
+
+@overload
+def catch_log_exception(
+ func: Callable[..., Coroutine[Any, Any, Any]], format_err: Callable[..., Any]
+) -> Callable[..., Coroutine[Any, Any, None]]:
+ ...
+
+
+@overload
+def catch_log_exception(
+ func: Callable[..., Any], format_err: Callable[..., Any]
+) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]:
+ ...
+
+
+def catch_log_exception(
+ func: Callable[..., Any], format_err: Callable[..., Any]
+) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]:
+ """Decorate a function func to catch and log exceptions.
+
+ If func is a coroutine function, a coroutine function will be returned.
+ If func is a callback, a callback will be returned.
+ """
+ # Check for partials to properly determine if coroutine function
+ check_func = func
+ while isinstance(check_func, partial):
+ check_func = check_func.func
+
+ wrapper_func: Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
+ if asyncio.iscoroutinefunction(check_func):
+ async_func = cast(Callable[..., Coroutine[Any, Any, None]], func)
+
+ @wraps(async_func)
+ async def async_wrapper(*args: Any) -> None:
+ """Catch and log exception."""
+ try:
+ await async_func(*args)
+ except Exception: # pylint: disable=broad-except
+ log_exception(format_err, *args)
+
+ wrapper_func = async_wrapper
+
+ else:
+
+ @wraps(func)
+ def wrapper(*args: Any) -> None:
+ """Catch and log exception."""
+ try:
+ func(*args)
+ except Exception: # pylint: disable=broad-except
+ log_exception(format_err, *args)
+
+ wrapper_func = wrapper
+ return wrapper_func
+
+
+def catch_log_coro_exception(
+ target: Coroutine[Any, Any, _T], format_err: Callable[..., Any], *args: Any
+) -> Coroutine[Any, Any, _T | None]:
+ """Decorate a coroutine to catch and log exceptions."""
+
+ async def coro_wrapper(*args: Any) -> _T | None:
+ """Catch and log exception."""
+ try:
+ return await target
+ except Exception: # pylint: disable=broad-except
+ log_exception(format_err, *args)
+ return None
+
+ return coro_wrapper(*args)
+
+
+def async_create_catching_coro(target: Coroutine[Any, Any, _T]) -> Coroutine[Any, Any, _T | None]:
+ """Wrap a coroutine to catch and log exceptions.
+
+ The exception will be logged together with a stacktrace of where the
+ coroutine was wrapped.
+
+ target: target coroutine.
+ """
+ trace = traceback.extract_stack()
+ wrapped_target = catch_log_coro_exception(
+ target,
+ lambda: "Exception in {} called from\n {}".format(
+ target.__name__,
+ "".join(traceback.format_list(trace[:-1])),
+ ),
+ )
+
+ return wrapped_target
self.mass = mass
self.manifest = manifest
self.config = config
- self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.providers.{self.instance_id}")
+ mass_logger = logging.getLogger(ROOT_LOGGER_NAME)
+ self.logger = mass_logger.getChild(f"providers.{self.instance_id}")
log_level = config.get_value(CONF_LOG_LEVEL)
- if log_level != "GLOBAL":
+ if log_level == "GLOBAL":
+ self.logger.setLevel(mass_logger.level)
+ else:
self.logger.setLevel(log_level)
+ # if the root logger's level is higher, we need to adjust that too
+ if logging.getLogger().level > self.logger.level:
+ logging.getLogger().setLevel(self.logger.level)
+ # apply logger settings to modules/packages used by this provider
+ for pkg_name in manifest.requirements:
+ dependency = pkg_name.split("=")[0].split("<")[0].split(">")[0]
+ # unless log level is explicitly set to debug,
+ # we silence the dependency logger to warning level
+ conf_log_level = self.config.get_value(CONF_LOG_LEVEL)
+ level = logging.DEBUG if conf_log_level == logging.DEBUG else logging.WARNING
+ logging.getLogger(dependency).setLevel(level)
+ self.logger.debug("Log level configured to %s", log_level)
self.cache = mass.cache
self.available = False
import asyncio
import contextlib
-import logging
import threading
import time
from dataclasses import dataclass
"""Handle async initialization of the provider."""
self._discover_lock = threading.Lock()
self.castplayers = {}
- # silence the cast logger a bit
- logging.getLogger("pychromecast.socket_client").setLevel(logging.INFO)
- logging.getLogger("pychromecast.controllers").setLevel(logging.INFO)
- logging.getLogger("pychromecast.dial").setLevel(logging.INFO)
self.mz_mgr = MultizoneManager()
self.browser = CastBrowser(
SimpleCastListener(
import asyncio
import functools
-import logging
import time
from collections.abc import Awaitable, Callable, Coroutine, Sequence
from dataclasses import dataclass, field
self.dlnaplayers = {}
self.lock = asyncio.Lock()
self.requester = AiohttpSessionRequester(self.mass.http_session, with_sleep=True)
- # silence the async_upnp_client logger a bit
- logging.getLogger("async_upnp_client").setLevel(logging.INFO)
- logging.getLogger("charset_normalizer").setLevel(logging.INFO)
-
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.notify_server = DLNANotifyServer(self.requester, self.mass)
self.mass.create_task(self._run_discovery())
from typing import TYPE_CHECKING, Any
from uuid import uuid4
+import aiofiles
from aiohttp import ClientSession, TCPConnector
from zeroconf import InterfaceChoice, NonUniqueNameException, ServiceInfo, Zeroconf
for task in self._tracked_tasks.values():
task.cancel()
# cleanup all providers
- for prov_id in list(self._providers.keys()):
- asyncio.create_task(self.unload_provider(prov_id))
+ async with asyncio.TaskGroup() as tg:
+ for prov_id in list(self._providers.keys()):
+ tg.create_task(self.unload_provider(prov_id))
# stop core controllers
await self.streams.close()
await self.webserver.close()
x for x in self._providers.values() if provider_type is None or provider_type == x.type
]
+ @api_command("logging/get")
+ async def get_application_log(self) -> str:
+ """Return the application log from file."""
+ logfile = os.path.join(self.storage_path, "logs", "musicassistant.log")
+ async with aiofiles.open(logfile, "r") as _file:
+ return await _file.read()
+
@property
def providers(self) -> list[ProviderInstanceType]:
"""Return all loaded/running Providers (instances)."""
"asyncio-throttle==1.0.2",
"aiofiles==23.1.0",
"aiorun==2022.11.1",
- "coloredlogs==15.0.1",
+ "colorlog==6.7.0",
"aiosqlite==0.19.0",
"python-slugify==8.0.1",
"mashumaro==3.7",
aiosqlite==0.19.0
async-upnp-client==0.33.2
asyncio-throttle==1.0.2
-coloredlogs==15.0.1
+colorlog==6.7.0
cryptography==41.0.1
deezer-python==5.12.0
faust-cchardet>=2.1.18