- SYS_ADMIN
- DAC_READ_SEARCH
privileged: true
+ environment:
+ # Provide logging level as environment variable.
+ # default=info, possible=(critical, error, warning, info, debug, verbose)
+ - LOG_LEVEL=info
from colorlog import ColoredFormatter
from music_assistant.common.helpers.json import json_loads
-from music_assistant.constants import ROOT_LOGGER_NAME, VERBOSE_LOG_LEVEL
+from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
from music_assistant.server import MusicAssistant
from music_assistant.server.helpers.logging import activate_log_queue_handler
MAX_LOG_FILESIZE = 1000000 * 10 # 10 MB
ALPINE_RELEASE_FILE = "/etc/alpine-release"
+LOGGER = logging.getLogger(MASS_LOGGER_NAME)
+
def get_arguments():
"""Arguments handling."""
parser.add_argument(
"--log-level",
type=str,
- default="info",
+ default=os.environ.get("LOG_LEVEL", "info"),
help="Provide logging level. Example --log-level debug, "
- "default=info, possible=(critical, error, warning, info, debug)",
+ "default=info, possible=(critical, error, warning, info, debug, verbose)",
)
return parser.parse_args()
logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE")
# apply the configured global log level to the (root) music assistant logger
- logging.getLogger(ROOT_LOGGER_NAME).setLevel(level)
+ logging.getLogger(MASS_LOGGER_NAME).setLevel(level)
# silence some noisy loggers
logging.getLogger("asyncio").setLevel(logging.WARNING)
else:
hass_options = {}
+ # prefer value in hass_options
log_level = hass_options.get("log_level", args.log_level).upper()
dev_mode = os.environ.get("PYTHONDEVMODE", "0") == "1"
MIN_SCHEMA_VERSION: Final[int] = 24
DB_SCHEMA_VERSION: Final[int] = 28
-ROOT_LOGGER_NAME: Final[str] = "music_assistant"
+MASS_LOGGER_NAME: Final[str] = "music_assistant"
UNKNOWN_ARTIST: Final[str] = "Unknown Artist"
VARIOUS_ARTISTS_NAME: Final[str] = "Various Artists"
DB_SCHEMA_VERSION,
DB_TABLE_CACHE,
DB_TABLE_SETTINGS,
- ROOT_LOGGER_NAME,
+ MASS_LOGGER_NAME,
)
from music_assistant.server.helpers.database import DatabaseConnection
from music_assistant.server.models.core_controller import CoreController
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import CoreConfig
-LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.cache")
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.cache")
CONF_CLEAR_CACHE = "clear_cache"
Note that this only returns the stored value without any validation or default.
"""
- return self.get(f"{CONF_CORE}/{core_module}/{key}", default)
+ return self.get(
+ f"{CONF_CORE}/{core_module}/values/{key}",
+ self.get(f"{CONF_CORE}/{core_module}/{key}", default),
+ )
def get_raw_provider_config_value(
self, provider_instance: str, key: str, default: ConfigValueType = None
Note that this only returns the stored value without any validation or default.
"""
- return self.get(f"{CONF_PROVIDERS}/{provider_instance}/{key}", default)
+ return self.get(
+ f"{CONF_PROVIDERS}/{provider_instance}/values/{key}",
+ self.get(f"{CONF_PROVIDERS}/{provider_instance}/{key}", default),
+ )
def set_raw_provider_config_value(
self, provider_instance: str, key: str, value: ConfigValueType
Track,
media_from_dict,
)
-from music_assistant.constants import DB_TABLE_PROVIDER_MAPPINGS, ROOT_LOGGER_NAME
+from music_assistant.constants import DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable, Mapping
"""Initialize class."""
self.mass = mass
self.base_query = f"SELECT * FROM {self.db_table}"
- self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.music.{self.media_type.value}")
+ self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
@abstractmethod
async def add_item_to_library(self, item: ItemCls, metadata_lookup: bool = True) -> ItemCls:
from __future__ import annotations
import asyncio
-import logging
import os
import urllib.parse
from base64 import b64encode
Radio,
Track,
)
-from music_assistant.constants import (
- ROOT_LOGGER_NAME,
- VARIOUS_ARTISTS_ID_MBID,
- VARIOUS_ARTISTS_NAME,
-)
+from music_assistant.constants import VARIOUS_ARTISTS_ID_MBID, VARIOUS_ARTISTS_NAME
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.images import create_collage, get_image_thumb
from music_assistant.server.models.core_controller import CoreController
from music_assistant.server.models.metadata_provider import MetadataProvider
from music_assistant.server.providers.musicbrainz import MusicbrainzProvider
-LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.metadata")
-
class MetaDataController(CoreController):
"""Several helpers to search and store metadata for mediaitems."""
if self.scan_busy:
return
- LOGGER.debug("Start scan for missing artist metadata")
+ self.logger.debug("Start scan for missing artist metadata")
self.scan_busy = True
async for artist in self.mass.music.artists.iter_library_items():
if artist.metadata.last_refresh is not None:
# this is slow on purpose to not cause stress on the metadata providers
await asyncio.sleep(30)
self.scan_busy = False
- LOGGER.debug("Finished scan for missing artist metadata")
+ self.logger.debug("Finished scan for missing artist metadata")
self.mass.create_task(scan_artist_metadata)
continue
if metadata := await provider.get_artist_metadata(artist):
artist.metadata.update(metadata)
- LOGGER.debug(
+ self.logger.debug(
"Fetched metadata for Artist %s on provider %s",
artist.name,
provider.name,
continue
if metadata := await provider.get_album_metadata(album):
album.metadata.update(metadata)
- LOGGER.debug(
+ self.logger.debug(
"Fetched metadata for Album %s on provider %s",
album.name,
provider.name,
continue
if metadata := await provider.get_track_metadata(track):
track.metadata.update(metadata)
- LOGGER.debug(
+ self.logger.debug(
"Fetched metadata for Track %s on provider %s",
track.name,
provider.name,
MediaItemImage(type=ImageType.THUMB, path=img_path, provider="file")
]
except Exception as err:
- LOGGER.warning(
+ self.logger.warning(
"Error while creating playlist image: %s",
str(err),
exc_info=err if self.logger.isEnabledFor(10) else None,
# lookup failed
ref_albums_str = "/".join(x.name for x in ref_albums) or "none"
ref_tracks_str = "/".join(x.name for x in ref_tracks) or "none"
- LOGGER.debug(
+ self.logger.debug(
"Unable to get musicbrainz ID for artist %s\n"
" - using lookup-album(s): %s\n"
" - using lookup-track(s): %s\n",
from __future__ import annotations
-import logging
import random
import time
from collections.abc import AsyncGenerator
from music_assistant.common.models.media_items import MediaItemType, media_from_dict
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import FALLBACK_DURATION, ROOT_LOGGER_NAME
+from music_assistant.constants import FALLBACK_DURATION
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.audio import get_stream_details
from music_assistant.server.models.core_controller import CoreController
from music_assistant.common.models.player import Player
-LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.players.queue")
CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
"""Configure shuffle setting on the the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
if queue.shuffle_enabled == shuffle_enabled:
def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
"""Configure repeat setting on the the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
if queue.repeat_mode == repeat_mode:
# ruff: noqa: PLR0915,PLR0912
queue = self._queues[queue_id]
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
# a single item or list of items may be provided
- pos_shift: move item to top of queue as next item if 0.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
item_index = self.index_by_id(queue_id, queue_item_id)
def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
"""Delete item (by id or index) from the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
if isinstance(item_id_or_index, str):
item_index = self.index_by_id(queue_id, item_id_or_index)
if item_index <= queue.index_in_buffer:
# ignore request if track already loaded in the buffer
# the frontend should guard so this is just in case
- LOGGER.warning("delete requested for item already loaded in buffer")
+ self.logger.warning("delete requested for item already loaded in buffer")
return
queue_items = self._queue_items[queue_id]
queue_items.pop(item_index)
def clear(self, queue_id: str) -> None:
"""Clear all items in the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
queue.radio_source = []
- queue_id: queue_id of the playerqueue to handle the command.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
# simply forward the command to underlying player
await self.mass.players.cmd_stop(queue_id)
- queue_id: queue_id of the playerqueue to handle the command.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
if self._queues[queue_id].state == PlayerState.PAUSED:
# simply forward the command to underlying player
- queue_id: queue_id of the playerqueue to handle the command.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
player = self.mass.players.get(queue_id, True)
if PlayerFeature.PAUSE not in player.supported_features:
- queue_id: queue_id of the queue to handle the command.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
current_index = self._queues[queue_id].current_index
next_index = self._get_next_index(queue_id, current_index, True)
- queue_id: queue_id of the queue to handle the command.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
current_index = self._queues[queue_id].current_index
if current_index is None:
- seconds: number of seconds to skip in track. Use negative value to skip back.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
await self.seek(queue_id, self._queues[queue_id].elapsed_time + seconds)
- position: position in seconds to seek to in the current playing item.
"""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
assert queue.current_item, "No item loaded"
if prev_state.get("state") != PlayerState.PLAYING:
return
if (player := self.mass.players.get(queue.queue_id)) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
current_item = self.get_item(queue.queue_id, queue.current_index)
if not current_item:
if (
player := self.mass.players.get(queue.queue_id)
) and player.announcement_in_progress:
- LOGGER.warning("Ignore queue command: An announcement is in progress")
+ self.logger.warning("Ignore queue command: An announcement is in progress")
return
with suppress(QueueEmpty):
next_item = await self.preload_next_item(queue.queue_id, index)
import asyncio
import functools
-import logging
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
CONF_GROUP_MEMBERS,
CONF_HIDE_PLAYER,
CONF_PLAYERS,
- ROOT_LOGGER_NAME,
SYNCGROUP_PREFIX,
UGP_PREFIX,
)
from music_assistant.common.models.config_entries import CoreConfig
-LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.players")
-
_PlayerControllerT = TypeVar("_PlayerControllerT", bound="PlayerController")
_R = TypeVar("_R")
_P = ParamSpec("_P")
# initialize sync groups as soon as a player is registered
self.mass.loop.create_task(self._register_syncgroups())
- LOGGER.info(
+ self.logger.info(
"Player registered: %s/%s",
player_id,
player.name,
player = self._players.pop(player_id, None)
if player is None:
return
- LOGGER.info("Player removed: %s", player.name)
+ self.logger.info("Player removed: %s", player.name)
self.mass.player_queues.on_player_remove(player_id)
if cleanup_config:
self.mass.config.remove(f"players/{player_id}")
msg = f"Player {player.name} does not support syncing"
raise UnsupportedFeaturedException(msg)
if not player.synced_to:
- LOGGER.info(
+ self.logger.info(
"Ignoring command to unsync player %s "
"because it is currently not synced to another player.",
player.display_name,
return sync_leader.player_id
if player.synced_to:
sync_leader = self.get(player.synced_to, True)
- LOGGER.warning(
+ self.logger.warning(
"Player %s is synced to %s and can not accept "
"playback related commands itself, "
"redirected the command to the sync leader.",
player.state = PlayerState.IDLE
player.powered = False
except Exception as err: # pylint: disable=broad-except
- LOGGER.warning(
+ self.logger.warning(
"Error while requesting latest state from player %s: %s",
player.display_name,
str(err),
CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
- ROOT_LOGGER_NAME,
+ MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.playlists import (
from music_assistant.common.models.player_queue import QueueItem
from music_assistant.server import MusicAssistant
-LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio")
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
# pylint:disable=consider-using-f-string,too-many-locals,too-many-statements
# ruff: noqa: PLR0915
from music_assistant.common.models.enums import AlbumType
from music_assistant.common.models.errors import InvalidDataError
from music_assistant.common.models.media_items import MediaItemChapter
-from music_assistant.constants import ROOT_LOGGER_NAME, UNKNOWN_ARTIST
+from music_assistant.constants import MASS_LOGGER_NAME, UNKNOWN_ARTIST
from music_assistant.server.helpers.process import AsyncProcess
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
-LOGGER = logging.getLogger(ROOT_LOGGER_NAME).getChild("tags")
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.tags")
# the only multi-item splitter we accept is the semicolon,
# which is also the default in Musicbrainz Picard.
from music_assistant.common.models.enums import ProviderType
from music_assistant.common.models.provider import ProviderManifest
-from music_assistant.constants import CONF_LOG_LEVEL, ROOT_LOGGER_NAME
+from music_assistant.constants import CONF_LOG_LEVEL, MASS_LOGGER_NAME
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import (
def _set_logger(self, log_level: str | None = None) -> None:
"""Set the logger settings."""
- mass_logger = logging.getLogger(ROOT_LOGGER_NAME)
- self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.{self.domain}")
+ self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.{self.domain}")
if log_level is None:
log_level = self.mass.config.get_raw_core_config_value(
self.domain, CONF_LOG_LEVEL, "GLOBAL"
)
if log_level == "GLOBAL":
+ mass_logger = logging.getLogger(MASS_LOGGER_NAME)
self.logger.setLevel(mass_logger.level)
else:
self.logger.setLevel(log_level)
import logging
from typing import TYPE_CHECKING
-from music_assistant.constants import CONF_LOG_LEVEL, ROOT_LOGGER_NAME
+from music_assistant.constants import CONF_LOG_LEVEL, MASS_LOGGER_NAME
if TYPE_CHECKING:
from zeroconf import ServiceStateChange
self.mass = mass
self.manifest = manifest
self.config = config
- mass_logger = logging.getLogger(ROOT_LOGGER_NAME)
- self.logger = mass_logger.getChild(f"providers.{self.domain}")
+ self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.providers.{self.domain}")
log_level = config.get_value(CONF_LOG_LEVEL)
if log_level == "GLOBAL":
+ mass_logger = logging.getLogger(MASS_LOGGER_NAME)
self.logger.setLevel(mass_logger.level)
- elif logging.getLogger().level > self.logger.level:
+ if logging.getLogger().level > self.logger.level:
# if the root logger's level is higher, we need to adjust that too
logging.getLogger().setLevel(self.logger.level)
self.logger.debug("Log level configured to %s", log_level)
from __future__ import annotations
import asyncio
+import logging
import pathlib
import random
import socket
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
+ # set snapcast logging
+ logging.getLogger("snapcast").setLevel(self.logger.level)
self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
if self._use_builtin_server:
self._snapcast_server_host = "127.0.0.1"
from music_assistant.common.models.enums import MediaType
from music_assistant.common.models.errors import MediaNotFoundError
-from music_assistant.constants import ROOT_LOGGER_NAME
DEFAULT_LIMIT = 50
-LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.tidal.helpers")
+LOGGER = logging.getLogger(__name__)
async def get_library_artists(
CONF_PROVIDERS,
CONF_SERVER_ID,
CONFIGURABLE_CORE_CONTROLLERS,
+ MASS_LOGGER_NAME,
MIN_SCHEMA_VERSION,
- ROOT_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
from music_assistant.server.controllers.cache import CacheController
EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
]
-LOGGER = logging.getLogger(ROOT_LOGGER_NAME)
+LOGGER = logging.getLogger(MASS_LOGGER_NAME)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PROVIDERS_PATH = os.path.join(BASE_DIR, "providers")