"""Arguments handling."""
parser = argparse.ArgumentParser(description="MusicAssistant")
- data_dir = os.getenv("APPDATA") if os.name == "nt" else os.path.expanduser("~")
- data_dir = os.path.join(data_dir, ".musicassistant")
- if not os.path.isdir(data_dir):
- os.makedirs(data_dir)
+ default_data_dir = (
+ os.getenv("APPDATA") if os.name == "nt" else os.path.expanduser("~")
+ )
+ default_data_dir = os.path.join(default_data_dir, ".musicassistant")
parser.add_argument(
"-c",
"--config",
metavar="path_to_config_dir",
- default=data_dir,
+ default=default_data_dir,
help="Directory that contains the MusicAssistant configuration",
)
parser.add_argument(
# parse arguments
args = get_arguments()
data_dir = args.config
+ if not os.path.isdir(data_dir):
+ os.makedirs(data_dir)
# config debug settings if needed
if args.debug:
logger.setLevel(logging.DEBUG)
- logging.getLogger("aiosqlite").setLevel(logging.INFO)
- logging.getLogger("asyncio").setLevel(logging.WARNING)
else:
logger.setLevel(logging.INFO)
+ # cool down logging for asyncio and aiosqlite
+ logging.getLogger("asyncio").setLevel(logging.WARNING)
+ logging.getLogger("aiosqlite").setLevel(logging.INFO)
- mass = MusicAssistant(data_dir)
+ mass = MusicAssistant(data_dir, args.debug)
def on_shutdown(loop):
logger.info("shutdown requested!")
--- /dev/null
+"""Various helpers for data encryption/decryption."""
+
+import asyncio
+
+from cryptography.fernet import Fernet, InvalidToken
+from music_assistant.helpers.app_vars import get_app_var # noqa # pylint: disable=all
+
+
+async def async_encrypt_string(str_value: str) -> str:
+ """Encrypt a string with Fernet."""
+ return await asyncio.get_running_loop().run_in_executor(
+ None, encrypt_string, str_value
+ )
+
+
+def encrypt_string(str_value: str) -> str:
+ """Encrypt a string with Fernet."""
+ return Fernet(get_app_var(3)).encrypt(str_value.encode()).decode()
+
+
+async def async_encrypt_bytes(bytes_value: bytes) -> bytes:
+ """Encrypt bytes with Fernet."""
+ return await asyncio.get_running_loop().run_in_executor(
+ None, encrypt_bytes, bytes_value
+ )
+
+
+def encrypt_bytes(bytes_value: bytes) -> bytes:
+ """Encrypt bytes with Fernet."""
+ return Fernet(get_app_var(3)).encrypt(bytes_value)
+
+
+async def async_decrypt_string(encrypted_str: str) -> str:
+ """Decrypt a string with Fernet."""
+ return await asyncio.get_running_loop().run_in_executor(
+ None, decrypt_string, encrypted_str
+ )
+
+
+def decrypt_string(encrypted_str: str) -> str:
+ """Decrypt a string with Fernet."""
+ try:
+ return Fernet(get_app_var(3)).decrypt(encrypted_str.encode()).decode()
+ except (InvalidToken, AttributeError):
+ return None
+
+
+async def async_decrypt_bytes(bytes_value: bytes) -> bytes:
+ """Decrypt bytes with Fernet."""
+ return await asyncio.get_running_loop().run_in_executor(
+ None, decrypt_bytes, bytes_value
+ )
+
+
+def decrypt_bytes(bytes_value):
+ """Decrypt bytes with Fernet."""
+ try:
+ return Fernet(get_app_var(3)).decrypt(bytes_value)
+ except (InvalidToken, AttributeError):
+ return None
import memory_tempfile
import orjson
import unidecode
-from cryptography.fernet import Fernet, InvalidToken
-from music_assistant.helpers.app_vars import get_app_var # noqa # pylint: disable=all
# pylint: disable=invalid-name
T = TypeVar("T")
return tempfile.NamedTemporaryFile(buffering=0)
-def encrypt_string(str_value):
- """Encrypt a string with Fernet."""
- return Fernet(get_app_var(3)).encrypt(str_value.encode()).decode()
-
-
-def encrypt_bytes(bytes_value):
- """Encrypt bytes with Fernet."""
- return Fernet(get_app_var(3)).encrypt(bytes_value)
-
-
-def yield_chunks(_obj, chunk_size):
+async def async_yield_chunks(_obj, chunk_size):
"""Yield successive n-sized chunks from list/str/bytes."""
chunk_size = int(chunk_size)
for i in range(0, len(_obj), chunk_size):
yield _obj[i : i + chunk_size]
-def decrypt_string(str_value):
- """Decrypt a string with Fernet."""
- try:
- return Fernet(get_app_var(3)).decrypt(str_value.encode()).decode()
- except (InvalidToken, AttributeError):
- return None
-
-
-def decrypt_bytes(bytes_value):
- """Decrypt bytes with Fernet."""
- try:
- return Fernet(get_app_var(3)).decrypt(bytes_value)
- except (InvalidToken, AttributeError):
- return None
-
-
class CustomIntEnum(int, Enum):
"""Base for IntEnum with some helpers."""
"""All classes and helpers for the Configuration."""
+import copy
import logging
import os
import shutil
-from collections import OrderedDict
-from enum import Enum
from typing import List
import orjson
CONF_ENABLED,
CONF_EXTERNAL_URL,
CONF_FALLBACK_GAIN_CORRECT,
+ CONF_GROUP_DELAY,
CONF_HTTP_PORT,
CONF_HTTPS_PORT,
CONF_KEY_BASE,
CONF_MAX_SAMPLE_RATE,
CONF_NAME,
CONF_PASSWORD,
+ CONF_POWER_CONTROL,
CONF_SSL_CERTIFICATE,
CONF_SSL_KEY,
CONF_TARGET_VOLUME,
CONF_USERNAME,
+ CONF_VOLUME_CONTROL,
CONF_VOLUME_NORMALISATION,
EVENT_CONFIG_CHANGED,
)
-from music_assistant.helpers.util import (
- decrypt_string,
- encrypt_string,
- get_external_ip,
- merge_dict,
- try_load_json_file,
-)
+from music_assistant.helpers.encryption import decrypt_string, encrypt_string
+from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.helpers.util import get_external_ip, merge_dict, try_load_json_file
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
+from music_assistant.models.player import PlayerControlType
from music_assistant.models.provider import ProviderType
from passlib.hash import pbkdf2_sha256
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("config")
DEFAULT_PLAYER_CONFIG_ENTRIES = [
ConfigEntry(
}
-class ConfigBaseType(Enum):
- """Enum with config base types."""
-
- BASE = CONF_KEY_BASE
- PLAYER_SETTINGS = CONF_KEY_PLAYER_SETTINGS
- MUSIC_PROVIDERS = CONF_KEY_MUSIC_PROVIDERS
- PLAYER_PROVIDERS = CONF_KEY_PLAYER_PROVIDERS
- METADATA_PROVIDERS = CONF_KEY_METADATA_PROVIDERS
- PLUGINS = CONF_KEY_PLUGINS
-
-
-PROVIDER_TYPES = [
- ConfigBaseType.MUSIC_PROVIDERS,
- ConfigBaseType.PLAYER_PROVIDERS,
- ConfigBaseType.METADATA_PROVIDERS,
- ConfigBaseType.PLUGINS,
-]
-
-
-class ConfigItem:
- """
- Configuration Item connected to Config Entries.
-
- Returns default value from config entry if no value present.
- """
-
- def __init__(self, mass, parent_item_key: str, base_type: ConfigBaseType):
- """Initialize class."""
- self._parent_item_key = parent_item_key
- self._base_type = base_type
- self.mass = mass
- self.stored_config = OrderedDict()
-
- def __repr__(self):
- """Print class."""
- return f"{OrderedDict}({self.to_dict()})"
-
- def to_dict(self, lang="en") -> dict:
- """Return entire config as dict."""
- result = OrderedDict()
- for entry in self.get_config_entries():
- if entry.entry_key in self.stored_config:
- # use saved value
- entry.value = self.stored_config[entry.entry_key]
- else:
- # use default value for config entry
- entry.value = entry.default_value
- result[entry.entry_key] = entry
- # get translated values
- for entry_key in ["label", "description"]:
- org_value = getattr(result[entry.entry_key], entry_key, None)
- if not org_value:
- org_value = entry.entry_key
- translated_value = self.mass.config.get_translation(org_value, lang)
- if translated_value != org_value:
- setattr(result[entry.entry_key], entry_key, translated_value)
- return result
-
- def get(self, key, default=None):
- """Return value if key exists, default if not."""
- try:
- return self[key]
- except KeyError:
- return default
-
- def get_entry(self, key):
- """Return complete ConfigEntry for specified key."""
- for entry in self.get_config_entries():
- if entry.entry_key == key:
- if key in self.stored_config:
- # use saved value
- entry.value = self.stored_config[key]
- else:
- # use default value for config entry
- entry.value = entry.default_value
- return entry
- raise KeyError(
- "%s\\%s has no key %s!" % (self._base_type, self._parent_item_key, key)
- )
-
- def __getitem__(self, key) -> ConfigEntry:
- """Return default value from ConfigEntry if needed."""
- entry = self.get_entry(key)
- if entry.entry_type == ConfigEntryType.PASSWORD:
- # decrypted password is only returned if explicitly asked for this key
- decrypted_value = decrypt_string(entry.value)
- if decrypted_value:
- return decrypted_value
- return entry.value
-
- def __setitem__(self, key, value):
- """Store value and validate."""
- for entry in self.get_config_entries():
- if entry.entry_key != key:
- continue
- # do some simple type checking
- if entry.multi_value:
- # multi value item
- if not isinstance(value, list):
- raise ValueError
- else:
- # single value item
- if entry.entry_type == ConfigEntryType.STRING and not isinstance(
- value, str
- ):
- if not value:
- value = ""
- else:
- raise ValueError
- if entry.entry_type == ConfigEntryType.BOOL and not isinstance(
- value, bool
- ):
- raise ValueError
- if entry.entry_type == ConfigEntryType.FLOAT and not isinstance(
- value, (float, int)
- ):
- raise ValueError
- if value != self[key]:
- if entry.store_hashed:
- value = pbkdf2_sha256.hash(value)
- if entry.entry_type == ConfigEntryType.PASSWORD:
- value = encrypt_string(value)
- self.stored_config[key] = value
-
- self.mass.add_job(self.mass.config.save)
- # reload provider/plugin if value changed
- if self._base_type in PROVIDER_TYPES:
- self.mass.add_job(
- self.mass.async_reload_provider(self._parent_item_key)
- )
- if self._base_type == ConfigBaseType.PLAYER_SETTINGS:
- # force update of player if it's config changed
- self.mass.add_job(
- self.mass.players.async_trigger_player_update(
- self._parent_item_key
- )
- )
- # signal config changed event
- self.mass.signal_event(
- EVENT_CONFIG_CHANGED, (self._base_type, self._parent_item_key)
- )
- return
- # raise KeyError if we're trying to set a value not defined as ConfigEntry
- raise KeyError
-
- def get_config_entries(self) -> List[ConfigEntry]:
- """Return config entries for this item."""
- if self._base_type == ConfigBaseType.PLAYER_SETTINGS:
- return self.mass.config.get_player_config_entries(self._parent_item_key)
- if self._base_type in PROVIDER_TYPES:
- return self.mass.config.get_provider_config_entries(self._parent_item_key)
- return self.mass.config.get_base_config_entries(self._parent_item_key)
-
-
-class ConfigBase(OrderedDict):
- """Configuration class with ConfigItem items."""
-
- def __init__(self, mass, base_type=ConfigBaseType):
- """Initialize class."""
- self.mass = mass
- self._base_type = base_type
- super().__init__()
-
- def __getitem__(self, item_key):
- """Return convenience method for get."""
- if item_key not in self:
- # create new ConfigDictItem on the fly
- super().__setitem__(
- item_key, ConfigItem(self.mass, item_key, self._base_type)
- )
- return super().__getitem__(item_key)
-
- def to_dict(self, lang="en") -> dict:
- """Return entire config as dict."""
- return {key: value.to_dict(lang) for key, value in self.items()}
+PROVIDER_TYPE_MAPPINGS = {
+ CONF_KEY_MUSIC_PROVIDERS: ProviderType.MUSIC_PROVIDER,
+ CONF_KEY_PLAYER_PROVIDERS: ProviderType.PLAYER_PROVIDER,
+ CONF_KEY_METADATA_PROVIDERS: ProviderType.METADATA_PROVIDER,
+ CONF_KEY_PLUGINS: ProviderType.PLUGIN,
+}
class ConfigManager:
def __init__(self, mass, data_path: str):
"""Initialize class."""
self._data_path = data_path
+ self._stored_config = {}
self.loading = False
self.mass = mass
- self._conf_base = ConfigBase(mass, ConfigBaseType.BASE)
- self._conf_player_settings = ConfigBase(mass, ConfigBaseType.PLAYER_SETTINGS)
- self._conf_player_providers = ConfigBase(mass, ConfigBaseType.PLAYER_PROVIDERS)
- self._conf_music_providers = ConfigBase(mass, ConfigBaseType.MUSIC_PROVIDERS)
- self._conf_metadata_providers = ConfigBase(
- mass, ConfigBaseType.METADATA_PROVIDERS
- )
- self._conf_plugins = ConfigBase(mass, ConfigBaseType.PLUGINS)
if not os.path.isdir(data_path):
raise FileNotFoundError(f"data directory {data_path} does not exist!")
self._translations = self.__get_all_translations()
@property
def base(self):
"""Return base config."""
- return self._conf_base
+ return BaseSettings(self)
@property
def player_settings(self):
"""Return all player configs."""
- return self._conf_player_settings
+ return PlayerSettings(self)
@property
def music_providers(self):
"""Return all music provider configs."""
- return self._conf_music_providers
+ return ProviderSettings(self, CONF_KEY_MUSIC_PROVIDERS)
@property
def player_providers(self):
"""Return all player provider configs."""
- return self._conf_player_providers
+ return ProviderSettings(self, CONF_KEY_PLAYER_PROVIDERS)
@property
def metadata_providers(self):
"""Return all metadata provider configs."""
- return self._conf_metadata_providers
+ return ProviderSettings(self, CONF_KEY_METADATA_PROVIDERS)
@property
def plugins(self):
"""Return all plugin configs."""
- return self._conf_plugins
+ return ProviderSettings(self, CONF_KEY_PLUGINS)
+
+ @property
+ def stored_config(self):
+ """Return the config that is actually stored on disk."""
+ return self._stored_config
def get_provider_config(self, provider_id: str, provider_type: ProviderType = None):
"""Return config for given provider."""
if provider:
provider_type = provider.type
if provider_type == ProviderType.METADATA_PROVIDER:
- return self._conf_metadata_providers[provider_id]
+ return self.metadata_providers[provider_id]
if provider_type == ProviderType.MUSIC_PROVIDER:
- return self._conf_music_providers[provider_id]
+ return self.music_providers[provider_id]
if provider_type == ProviderType.PLAYER_PROVIDER:
- return self._conf_player_providers[provider_id]
+ return self.player_providers[provider_id]
if provider_type == ProviderType.PLUGIN:
- return self._conf_plugins[provider_id]
+ return self.plugins[provider_id]
raise RuntimeError("Invalid provider type")
- def get_player_config(self, player_id):
+ def get_player_config(self, player_id: str):
"""Return config for given player."""
- return self._conf_player_settings[player_id]
-
- def get_provider_config_entries(self, provider_id: str) -> List[ConfigEntry]:
- """Return all config entries for the given provider."""
- provider = self.mass.get_provider(provider_id)
- if provider:
- specials = [
- ConfigEntry(
- "__name__", ConfigEntryType.LABEL, label=provider.name, hidden=True
- )
- ]
- return specials + DEFAULT_PROVIDER_CONFIG_ENTRIES + provider.config_entries
- return DEFAULT_PROVIDER_CONFIG_ENTRIES
-
- def get_player_config_entries(self, player_id: str) -> List[ConfigEntry]:
- """Return all config entries for the given player."""
- player_state = self.mass.players.get_player_state(player_id)
- if player_state:
- return DEFAULT_PLAYER_CONFIG_ENTRIES + player_state.config_entries
- return DEFAULT_PLAYER_CONFIG_ENTRIES
-
- @staticmethod
- def get_base_config_entries(base_key) -> List[ConfigEntry]:
- """Return all base config entries."""
- return DEFAULT_BASE_CONFIG_ENTRIES[base_key]
+ return self.player_settings[player_id]
def validate_credentials(self, username: str, password: str) -> bool:
"""Check if credentials matches."""
"""Save config on exit."""
self.save()
- def get_translation(self, org_string: str, lang: str):
+ def get_translation(self, org_string: str, language: str):
"""Get translated value for a string, fallback to english."""
- for lang in [lang, "en"]:
- translated_value = self.mass.config.translations.get(lang, {}).get(
- org_string
- )
+ for lang in [language, "en"]:
+ translated_value = self._translations.get(lang, {}).get(org_string)
if translated_value:
return translated_value
return org_string
conf_file_backup = os.path.join(self.data_path, "config.json.backup")
if os.path.isfile(conf_file):
shutil.move(conf_file, conf_file_backup)
- # create dict for stored config
- stored_conf = {
- CONF_KEY_BASE: {},
- CONF_KEY_PLAYER_SETTINGS: {},
- CONF_KEY_MUSIC_PROVIDERS: {},
- CONF_KEY_METADATA_PROVIDERS: {},
- CONF_KEY_PLAYER_PROVIDERS: {},
- CONF_KEY_PLUGINS: {},
- }
- for conf_key in stored_conf:
- for key, value in self[conf_key].items():
- stored_conf[conf_key][key] = value.stored_config
-
# write current config to file
with open(conf_file, "wb") as _file:
- _file.write(orjson.dumps(stored_conf, option=orjson.OPT_INDENT_2))
+ _file.write(orjson.dumps(self._stored_config, option=orjson.OPT_INDENT_2))
LOGGER.info("Config saved!")
self.loading = False
conf_file_backup = os.path.join(self.data_path, "config.json.backup")
data = try_load_json_file(conf_file_backup)
if data:
+ self._stored_config = data
+ self.loading = False
- for conf_key in [
- CONF_KEY_BASE,
- CONF_KEY_PLAYER_SETTINGS,
- CONF_KEY_MUSIC_PROVIDERS,
- CONF_KEY_METADATA_PROVIDERS,
- CONF_KEY_PLAYER_PROVIDERS,
- CONF_KEY_PLUGINS,
- ]:
- if not data.get(conf_key):
- continue
- for key, value in data[conf_key].items():
- for subkey, subvalue in value.items():
- self[conf_key][key].stored_config[subkey] = subvalue
- self.loading = False
+class ConfigBaseItem:
+ """Configuration class that holds the ConfigSubItem items."""
+
+ def __init__(self, conf_mgr: ConfigManager, conf_key: str):
+ """Initialize class."""
+ self.conf_mgr = conf_mgr
+ self.mass = conf_mgr.mass
+ self.conf_key = conf_key
+
+ @classmethod
+ def all_keys(cls):
+ """Return all possible keys of this Config object."""
+ return []
+
+ def __getitem__(self, item_key: str):
+ """Return ConfigSubItem for given key."""
+ return ConfigSubItem(self, item_key)
+
+ def all_items(self, translation="en") -> dict:
+ """Return entire config as dict."""
+ return {
+ key: copy.deepcopy(ConfigSubItem(self, key).all_items(translation))
+ for key in self.all_keys()
+ }
+
+
+class BaseSettings(ConfigBaseItem):
+ """Configuration class that holds the base settings."""
+
+ def __init__(self, mass: MusicAssistantType):
+ """Initialize class."""
+ super().__init__(mass, CONF_KEY_BASE)
+
+ def all_keys(self):
+ """Return all possible keys of this Config object."""
+ return list(DEFAULT_BASE_CONFIG_ENTRIES.keys())
+
+ @staticmethod
+ def get_config_entries(child_key) -> List[ConfigEntry]:
+ """Return all base config entries."""
+ return list(DEFAULT_BASE_CONFIG_ENTRIES[child_key])
+
+
+class PlayerSettings(ConfigBaseItem):
+ """Configuration class that holds the player settings."""
+
+ def __init__(self, mass: MusicAssistantType):
+ """Initialize class."""
+ super().__init__(mass, CONF_KEY_PLAYER_SETTINGS)
+
+ def all_keys(self):
+ """Return all possible keys of this Config object."""
+ return (item.player_id for item in self.mass.players.players)
+
+ def get_config_entries(self, child_key: str) -> List[ConfigEntry]:
+ """Return all config entries for the given child entry."""
+ entries = []
+ entries += DEFAULT_PLAYER_CONFIG_ENTRIES
+ player_state = self.mass.players.get_player_state(child_key)
+ if player_state:
+ entries += player_state.player.config_entries
+ # append power control config entries
+ power_controls = self.mass.players.get_player_controls(
+ PlayerControlType.POWER
+ )
+ if power_controls:
+ controls = [
+ {"text": f"{item.provider}: {item.name}", "value": item.control_id}
+ for item in power_controls
+ ]
+ entries.append(
+ ConfigEntry(
+ entry_key=CONF_POWER_CONTROL,
+ entry_type=ConfigEntryType.STRING,
+ description=CONF_POWER_CONTROL,
+ values=controls,
+ )
+ )
+ # append volume control config entries
+ volume_controls = self.mass.players.get_player_controls(
+ PlayerControlType.VOLUME
+ )
+ if volume_controls:
+ controls = [
+ {"text": f"{item.provider}: {item.name}", "value": item.control_id}
+ for item in volume_controls
+ ]
+ entries.append(
+ ConfigEntry(
+ entry_key=CONF_VOLUME_CONTROL,
+ entry_type=ConfigEntryType.STRING,
+ description=CONF_VOLUME_CONTROL,
+ values=controls,
+ )
+ )
+ # append special group player entries
+ for parent_id in player_state.group_parents:
+ parent_player = self.mass.players.get_player_state(parent_id)
+ if parent_player and parent_player.provider_id == "group_player":
+ entries.append(
+ ConfigEntry(
+ entry_key=CONF_GROUP_DELAY,
+ entry_type=ConfigEntryType.INT,
+ default_value=0,
+ range=(0, 500),
+ description=CONF_GROUP_DELAY,
+ )
+ )
+ break
+ return entries
+
+
+class ProviderSettings(ConfigBaseItem):
+ """Configuration class that holds the music provider settings."""
+
+ def all_keys(self):
+ """Return all possible keys of this Config object."""
+ prov_type = PROVIDER_TYPE_MAPPINGS[self.conf_key]
+ return (item.id for item in self.mass.get_providers(prov_type))
+
+ def get_config_entries(self, child_key: str) -> List[ConfigEntry]:
+ """Return all config entries for the given provider."""
+ provider = self.mass.get_provider(child_key)
+ if provider:
+ # append a hidden label with the provider's name
+ specials = [
+ ConfigEntry(
+ "__name__", ConfigEntryType.LABEL, label=provider.name, hidden=True
+ )
+ ]
+ return specials + DEFAULT_PROVIDER_CONFIG_ENTRIES + provider.config_entries
+ return DEFAULT_PROVIDER_CONFIG_ENTRIES
+
+
+class ConfigSubItem:
+ """
+ Configuration Item connected to Config Entries.
+
+ Returns default value from config entry if no value present.
+ """
+
+ def __init__(self, conf_parent: ConfigBaseItem, conf_key: str):
+ """Initialize class."""
+ self.conf_parent = conf_parent
+ self.conf_key = conf_key
+ self.conf_mgr = conf_parent.conf_mgr
+ self.parent_conf_key = conf_parent.conf_key
+
+ def all_items(self, translation="en") -> dict:
+ """Return entire config as dict."""
+ return {
+ item.entry_key: self.get_entry(item.entry_key, translation)
+ for item in self.conf_parent.get_config_entries(self.conf_key)
+ }
+
+ def get(self, key, default=None):
+ """Return value if key exists, default if not."""
+ try:
+ return self[key]
+ except KeyError:
+ return default
+
+ def __getitem__(self, key) -> ConfigEntry:
+ """Get value for ConfigEntry."""
+ # always lookup the config entry because config entries are dynamic
+ # and values may be transformed (e.g. encrypted)
+ entry = self.get_entry(key)
+ if entry.entry_type == ConfigEntryType.PASSWORD:
+ # decrypted password is only returned if explicitly asked for this key
+ decrypted_value = decrypt_string(entry.value)
+ if decrypted_value:
+ return decrypted_value
+ return entry.value
+
+ def get_entry(self, key, translation=None):
+ """Return complete ConfigEntry for specified key."""
+ stored_config = self.conf_mgr.stored_config.get(self.conf_parent.conf_key, {})
+ stored_config = stored_config.get(self.conf_key, {})
+ for conf_entry in self.conf_parent.get_config_entries(self.conf_key):
+ if conf_entry.entry_key == key:
+ if key in stored_config:
+ # use stored value
+ conf_entry.value = stored_config[key]
+ else:
+ # use default value for config entry
+ conf_entry.value = conf_entry.default_value
+ # get translated labels
+ if translation is not None:
+ for entry_subkey in ["label", "description", "__name__"]:
+ org_value = getattr(conf_entry, entry_subkey, None)
+ if not org_value:
+ org_value = conf_entry.entry_key
+ translated_value = self.conf_parent.conf_mgr.get_translation(
+ org_value, translation
+ )
+ if translated_value and translated_value != org_value:
+ setattr(conf_entry, entry_subkey, translated_value)
+ return conf_entry
+ raise KeyError(
+ "%s\\%s has no key %s!" % (self.conf_parent.conf_key, self.conf_key, key)
+ )
+
+ def __setitem__(self, key, value):
+ """Store value and validate."""
+ assert isinstance(key, str)
+ for entry in self.conf_parent.get_config_entries(self.conf_key):
+ if entry.entry_key != key:
+ continue
+ # do some simple type checking
+ if entry.multi_value:
+ # multi value item
+ if not isinstance(value, list):
+ raise ValueError
+ else:
+ # single value item
+ if entry.entry_type == ConfigEntryType.STRING and not isinstance(
+ value, str
+ ):
+ if not value:
+ value = ""
+ else:
+ raise ValueError
+ if entry.entry_type == ConfigEntryType.BOOL and not isinstance(
+ value, bool
+ ):
+ raise ValueError
+ if entry.entry_type == ConfigEntryType.FLOAT and not isinstance(
+ value, (float, int)
+ ):
+ raise ValueError
+ if value != self[key]:
+ if entry.store_hashed:
+ value = pbkdf2_sha256.hash(value)
+ if entry.entry_type == ConfigEntryType.PASSWORD:
+ value = encrypt_string(value)
+
+ # write value to stored config
+ stored_conf = self.conf_mgr.stored_config
+ if self.parent_conf_key not in stored_conf:
+ stored_conf[self.parent_conf_key] = {}
+ if self.conf_key not in stored_conf[self.parent_conf_key]:
+ stored_conf[self.parent_conf_key][self.conf_key] = {}
+ stored_conf[self.parent_conf_key][self.conf_key][key] = value
+
+ self.conf_mgr.mass.add_job(self.conf_mgr.save)
+ # reload provider/plugin if value changed
+ if self.parent_conf_key in PROVIDER_TYPE_MAPPINGS:
+ self.conf_mgr.mass.add_job(
+ self.conf_mgr.mass.async_reload_provider(self.conf_key)
+ )
+ if self.parent_conf_key == CONF_KEY_PLAYER_SETTINGS:
+ # force update of player if it's config changed
+ self.conf_mgr.mass.add_job(
+ self.conf_mgr.mass.players.async_trigger_player_update(
+ self.conf_key
+ )
+ )
+ # signal config changed event
+ self.conf_mgr.mass.signal_event(
+ EVENT_CONFIG_CHANGED, (self.parent_conf_key, self.conf_key)
+ )
+ return
+ # raise KeyError if we're trying to set a value not defined as ConfigEntry
+ raise KeyError
TrackQuality,
)
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("database")
class DbConnect:
from music_assistant.helpers.util import merge_dict
from music_assistant.models.provider import MetadataProvider, ProviderType
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("metadata_manager")
class MetaDataManager:
import aiohttp
from music_assistant.constants import EVENT_MUSIC_SYNC_STATUS, EVENT_PROVIDER_REGISTERED
from music_assistant.helpers.cache import async_cached, async_cached_generator
+from music_assistant.helpers.encryption import async_encrypt_string
from music_assistant.helpers.musicbrainz import MusicBrainz
-from music_assistant.helpers.util import (
- callback,
- compare_strings,
- encrypt_string,
- run_periodic,
-)
+from music_assistant.helpers.util import callback, compare_strings, run_periodic
from music_assistant.models.media_types import (
Album,
Artist,
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
from PIL import Image
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("music_manager")
def sync_task(desc):
# set player_id on the streamdetails so we know what players stream
streamdetails.player_id = player_id
# store the path encrypted as we do not want it to be visible in the api
- streamdetails.path = encrypt_string(streamdetails.path)
+ streamdetails.path = await async_encrypt_string(streamdetails.path)
# set streamdetails as attribute on the media_item
# this way the app knows what content is playing
media_item.streamdetails = streamdetails
POLL_INTERVAL = 30
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("player_manager")
class PlayerManager:
return self._player_states.get(player_id)
@callback
- def get_player(
- self, player_id: str, return_player_state: bool = True
- ) -> PlayerState:
+ def get_player(self, player_id: str) -> PlayerState:
"""Return Player by player_id or None if player does not exist."""
player_state = self._player_states.get(player_id)
if player_state:
async def async_add_player(self, player: Player) -> None:
"""Register a new player or update an existing one."""
- if not player or not player.available:
+ if not player or not player.available or self.mass.exit:
return
if player.player_id in self._player_states:
return await self.async_update_player(player)
async def async_update_player(self, player: Player):
"""Update an existing player (or register as new if non existing)."""
+ if self.mass.exit:
+ return
if player.player_id not in self._player_states:
return await self.async_add_player(player)
await self._player_states[player.player_id].async_update(player)
"""
import asyncio
import gc
-import gzip
import io
import logging
import os
import soundfile
from aiofile import AIOFile, Reader
from music_assistant.constants import EVENT_STREAM_ENDED, EVENT_STREAM_STARTED
+from music_assistant.helpers.encryption import (
+ async_decrypt_bytes,
+ async_decrypt_string,
+ encrypt_bytes,
+)
from music_assistant.helpers.typing import MusicAssistantType
from music_assistant.helpers.util import (
+ async_yield_chunks,
create_tempfile,
- decrypt_bytes,
- decrypt_string,
- encrypt_bytes,
get_ip,
try_parse_int,
- yield_chunks,
)
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("stream_manager")
class SoxOutputFormat(Enum):
output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
resample: Optional[int] = None,
gain_db_adjust: Optional[float] = None,
- chunk_size: int = 128000,
+ chunk_size: int = 5000000,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the sox manipulated audio data for the given streamdetails."""
# collect all args for sox
args += ["vol", str(gain_db_adjust), "dB"]
if resample:
args += ["rate", "-v", str(resample)]
+ if not chunk_size:
+ chunk_size = int(
+ streamdetails.sample_rate * (streamdetails.bit_depth / 8) * 2 * 10
+ )
LOGGER.debug(
"[async_get_sox_stream] [%s/%s] started using args: %s",
streamdetails.provider,
*args,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
- bufsize=0,
+ limit=chunk_size * 5,
)
async def fill_buffer():
)
# feed audio data into sox stdin for processing
async for chunk in self.async_get_media_stream(streamdetails):
+ if self.mass.exit:
+ break
sox_proc.stdin.write(chunk)
await sox_proc.stdin.drain()
+ # send eof when last chunk received
sox_proc.stdin.write_eof()
await sox_proc.stdin.drain()
LOGGER.debug(
prev_chunk = chunk
await asyncio.wait([fill_buffer_task])
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] finished",
- streamdetails.provider,
- streamdetails.item_id,
- )
+
except (GeneratorExit, Exception): # pylint: disable=broad-except
LOGGER.warning(
"[async_get_sox_stream] [%s/%s] aborted",
async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
+ chunk_size = 571392 # 74,7% of pcm
args = ["sox", "-t", "s32", "-c", "2", "-r", "96000", "-", "-t", "flac", "-"]
sox_proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
+ limit=chunk_size,
)
LOGGER.debug(
"[async_queue_stream_flac] [%s] started using args: %s",
player_id,
" ".join(args),
)
- chunk_size = 571392 # 74,7% of pcm
# feed stdin with pcm samples
async def fill_buffer():
"[async_queue_stream_flac] [%s] fill buffer started", player_id
)
async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
+ if self.mass.exit:
+ return
sox_proc.stdin.write(chunk)
await sox_proc.stdin.drain()
+ # write eof when no more data
sox_proc.stdin.write_eof()
await sox_proc.stdin.drain()
LOGGER.debug(
queue_conf = self.mass.config.get_player_config(player_id)
fade_length = try_parse_int(queue_conf["crossfade_duration"])
pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)]
- chunk_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second
+ sample_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second
if fade_length:
- buffer_size = chunk_size * fade_length
+ buffer_size = sample_size * fade_length
else:
- buffer_size = chunk_size * 10
+ buffer_size = sample_size * 10
LOGGER.info("Start Queue Stream for player %s ", player_id)
break
if cur_chunk <= 2 and not last_fadeout_data:
# no fadeout_part available so just pass it to the output directly
- for small_chunk in yield_chunks(chunk, chunk_size):
- yield small_chunk
+ yield chunk
bytes_written += len(chunk)
del chunk
elif cur_chunk == 1 and last_fadeout_data:
fade_in_part, last_fadeout_data, pcm_args, fade_length
)
# send crossfade_part
- for small_chunk in yield_chunks(crossfade_part, chunk_size):
- yield small_chunk
+ yield crossfade_part
bytes_written += len(crossfade_part)
del crossfade_part
del fade_in_part
last_fadeout_data = b""
# also write the leftover bytes from the strip action
- for small_chunk in yield_chunks(remaining_bytes, chunk_size):
- yield small_chunk
+ yield remaining_bytes
bytes_written += len(remaining_bytes)
del remaining_bytes
del chunk
or len(last_part) < buffer_size
):
# crossfading is not enabled so just pass the (stripped) audio data
- for small_chunk in yield_chunks(last_part, chunk_size):
- yield small_chunk
+ yield last_part
bytes_written += len(last_part)
del last_part
del chunk
last_fadeout_data = last_part[-buffer_size:]
remaining_bytes = last_part[:-buffer_size]
# write remaining bytes
- for small_chunk in yield_chunks(remaining_bytes, chunk_size):
- yield small_chunk
+ yield remaining_bytes
bytes_written += len(remaining_bytes)
del last_part
del remaining_bytes
# keep previous chunk in memory so we have enough
# samples to perform the crossfade
if prev_chunk:
- for small_chunk in yield_chunks(prev_chunk, chunk_size):
- yield small_chunk
+ yield prev_chunk
bytes_written += len(prev_chunk)
prev_chunk = chunk
else:
del chunk
# end of the track reached
# update actual duration to the queue for more accurate now playing info
- accurate_duration = bytes_written / chunk_size
+ accurate_duration = bytes_written / sample_size
queue_track.duration = accurate_duration
LOGGER.debug(
"Finished Streaming queue track: %s (%s) on queue %s",
# run garbage collect manually to avoid too much memory fragmentation
gc.collect()
# end of queue reached, pass last fadeout bits to final output
- for small_chunk in yield_chunks(last_fadeout_data, chunk_size):
- yield small_chunk
+ yield last_fadeout_data
del last_fadeout_data
# END OF QUEUE STREAM
# run garbage collect manually to avoid too much memory fragmentation
self, streamdetails: StreamDetails
) -> AsyncGenerator[bytes, None]:
"""Get the (original/untouched) audio data for the given streamdetails. Generator."""
- stream_path = decrypt_string(streamdetails.path)
+ stream_path = await async_decrypt_string(streamdetails.path)
stream_type = StreamType(streamdetails.type)
audio_data = b""
# Handle (optional) caching of audio data
- cache_file = "/tmp/" + f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
+ cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
+ cache_file = os.path.join(self.mass.config.data_path, ".audio_cache", cache_id)
if os.path.isfile(cache_file):
- with gzip.open(cache_file, "rb") as _file:
- audio_data = decrypt_bytes(_file.read())
+ async with AIOFile(cache_file, "rb") as afp:
+ audio_data = await afp.read()
+ audio_data = await async_decrypt_bytes(audio_data)
if audio_data:
stream_type = StreamType.CACHE
)
if stream_type == StreamType.CACHE:
- yield audio_data
+ async for chunk in async_yield_chunks(audio_data, 512000):
+ yield chunk
elif stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
async for chunk in response.content.iter_any():
- audio_data += chunk
yield chunk
+ if len(audio_data) < 100000000:
+ audio_data += chunk
elif stream_type == StreamType.FILE:
async with AIOFile(stream_path) as afp:
async for chunk in Reader(afp):
- audio_data += chunk
yield chunk
+ if len(audio_data) < 100000000:
+ audio_data += chunk
elif stream_type == StreamType.EXECUTABLE:
args = shlex.split(stream_path)
+ chunk_size = 512000
process = await asyncio.create_subprocess_exec(
- *args, stdout=asyncio.subprocess.PIPE
+ *args, stdout=asyncio.subprocess.PIPE, limit=chunk_size
)
try:
- async for chunk in process.stdout:
- audio_data += chunk
+ while True:
+ # read exactly chunksize of data
+ try:
+ chunk = await process.stdout.readexactly(chunk_size)
+ except asyncio.IncompleteReadError as exc:
+ chunk = exc.partial
yield chunk
+ if len(audio_data) < 100000000:
+ audio_data += chunk
+ if len(chunk) < chunk_size:
+ # last chunk
+ break
except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except
LOGGER.warning(
"[async_get_media_stream] [%s/%s] Aborted: %s",
str(exc),
)
# read remaining bytes
+ process.terminate()
await process.communicate()
if process and process.returncode is None:
process.terminate()
await process.wait()
+ raise GeneratorExit from exc
# signal end of stream event
self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
# send analyze job to background worker
- self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
+ if not stream_type == StreamType.CACHE:
+ self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
LOGGER.debug(
"[async_get_media_stream] [%s/%s] Finished",
streamdetails.provider,
if item_key in self.analyze_jobs:
return # prevent multiple analyze jobs for same track
self.analyze_jobs[item_key] = True
- # do we need saving to disk ?
- cache_file = "/tmp/" + f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
- if not os.path.isfile(cache_file):
- with gzip.open(cache_file, "wb") as _file:
+
+ # Save cache file to disk
+ cache_id = f"{streamdetails.item_id}{streamdetails.provider}"[::-1]
+ cache_dir = os.path.join(self.mass.config.data_path, ".audio_cache")
+ if not os.path.isdir(cache_dir):
+ os.mkdir(cache_dir)
+ cache_file = os.path.join(cache_dir, cache_id)
+ if not os.path.isfile(cache_file) and len(audio_data) < 100000000:
+ with open(cache_file, "wb") as _file:
_file.write(encrypt_bytes(audio_data))
+
# get track loudness
track_loudness = self.mass.add_job(
self.mass.database.async_get_track_loudness(
fadeinfile = create_tempfile()
args = ["sox", "--ignore-length", "-t"] + pcm_args
args += ["-", "-t"] + pcm_args + [fadeinfile.name, "fade", "t", str(fade_length)]
- process = await asyncio.create_subprocess_exec(*args, stdin=asyncio.subprocess.PIPE)
+ process = await asyncio.create_subprocess_exec(
+ *args, stdin=asyncio.subprocess.PIPE, limit=10000000
+ )
await process.communicate(fade_in_part)
# create fade-out part
fadeoutfile = create_tempfile()
args = ["sox", "--ignore-length", "-t"] + pcm_args + ["-", "-t"] + pcm_args
args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"]
process = await asyncio.create_subprocess_exec(
- *args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
+ *args,
+ stdout=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE,
+ limit=10000000,
)
await process.communicate(fade_out_part)
# create crossfade using sox and some temp files
args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"]
args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"]
process = await asyncio.create_subprocess_exec(
- *args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
+ *args,
+ stdout=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE,
+ limit=10000000,
)
crossfade_part, _ = await process.communicate()
fadeinfile.close()
if reverse:
args.append("reverse")
process = await asyncio.create_subprocess_exec(
- *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
+ *args,
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ limit=10000000,
)
stripped_data, _ = await process.communicate(audio_data)
return stripped_data
class MusicAssistant:
"""Main MusicAssistant object."""
- def __init__(self, datapath):
+ def __init__(self, datapath: str, debug: bool = False):
"""
Create an instance of MusicAssistant.
:param datapath: file location to store the data
"""
+ self._exit = False
self._loop = None
+ self._debug = debug
self._http_session = None
self._event_listeners = []
self._providers = {}
self._streams = StreamManager(self)
# shared zeroconf instance
self.zeroconf = Zeroconf()
- self._exit = False
async def async_start(self):
"""Start running the music assistant server."""
# initialize loop
self._loop = asyncio.get_event_loop()
self._loop.set_exception_handler(self.__handle_exception)
- if LOGGER.level == logging.DEBUG:
- self._loop.set_debug(True)
+ self._loop.set_debug(self._debug)
# create shared aiohttp ClientSession
self._http_session = aiohttp.ClientSession(
loop=self.loop,
async def async_stop(self):
"""Stop running the music assistant server."""
+ self._exit = True
LOGGER.info("Application shutdown")
self.signal_event(EVENT_SHUTDOWN)
- self._exit = True
await self.config.async_close()
await self._web.async_stop()
for prov in self._providers.values():
"""Return the running event loop."""
return self._loop
+ @property
+ def exit(self) -> bool:
+ """Return bool if the main process is exiting."""
+ return self._exit
+
@property
def players(self) -> PlayerManager:
"""Return the Players controller/manager."""
"""Return the Database controller/manager."""
return self._database
+ @property
+ def metadata(self) -> MetaDataManager:
+ """Return the Metadata controller/manager."""
+ return self._metadata
+
@property
def web(self) -> WebServer:
"""Return the webserver instance."""
task = self.loop.run_in_executor(None, target, *args, *kwargs) # type: ignore
return task
- @staticmethod
- def __handle_exception(loop: asyncio.AbstractEventLoop, context: Dict) -> None:
+ def __handle_exception(
+ self, loop: asyncio.AbstractEventLoop, context: Dict
+ ) -> None:
"""Global exception handler."""
LOGGER.error("Caught exception: %s", context)
- if loop.get_debug():
+ if self._debug:
loop.default_exception_handler(context)
async def __async_setup_discovery(self) -> None:
self._items = []
self._shuffle_enabled = False
self._repeat_enabled = False
+ self._crossfade_enabled = False
self._cur_index = 0
self._cur_item_time = 0
self._last_item = None
@property
def crossfade_enabled(self) -> bool:
"""Return if crossfade is enabled for this player's queue."""
- return (
- self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
- )
+ return self._crossfade_enabled
@property
def cur_index(self) -> OptionalInt:
async def async_next(self) -> None:
"""Play the next track in the queue."""
+ self._crossfade_enabled = (
+ self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
+ )
if self.cur_index is None:
return
if self.use_queue_stream:
async def async_previous(self) -> None:
"""Play the previous track in the queue."""
+ self._crossfade_enabled = (
+ self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
+ )
if self.cur_index is None:
return
if self.use_queue_stream:
return await self.async_play_index(self.cur_index - 1)
- return await self.mass.players.async_cmd_previous(self.player_id)
+ return await self.player.async_cmd_previous()
async def async_resume(self) -> None:
"""Resume previous queue."""
+ self._crossfade_enabled = (
+ self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
+ )
if self.items:
prev_index = self.cur_index
if self.use_queue_stream or not self.supports_queue:
async def async_play_index(self, index: int) -> None:
"""Play item at index X in queue."""
+ self._crossfade_enabled = (
+ self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
+ )
if not isinstance(index, int):
index = self.__index_by_id(index)
if not len(self.items) > index:
async def async_load(self, queue_items: List[QueueItem]) -> None:
"""Load (overwrite) queue with new items."""
+ self._crossfade_enabled = (
+ self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
+ )
for index, item in enumerate(queue_items):
item.sort_index = index
if self._shuffle_enabled:
EVENT_PLAYER_CHANGED,
)
from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.util import callback
-from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
from music_assistant.models.player import (
DeviceInfo,
PlaybackState,
Player,
- PlayerControlType,
PlayerFeature,
)
LOGGER = logging.getLogger("mass")
+# List of all player_state attributes
+PLAYER_ATTRIBUTES = [
+ ATTR_ACTIVE_QUEUE,
+ ATTR_AVAILABLE,
+ ATTR_CURRENT_URI,
+ ATTR_DEVICE_INFO,
+ ATTR_ELAPSED_TIME,
+ ATTR_FEATURES,
+ ATTR_GROUP_CHILDS,
+ ATTR_GROUP_PARENTS,
+ ATTR_IS_GROUP_PLAYER,
+ ATTR_MUTED,
+ ATTR_NAME,
+ ATTR_PLAYER_ID,
+ ATTR_POWERED,
+ ATTR_PROVIDER_ID,
+ ATTR_SHOULD_POLL,
+ ATTR_STATE,
+ ATTR_VOLUME_LEVEL,
+]
# list of Player attributes that can/will cause a player changed event
UPDATE_ATTRIBUTES = [
ATTR_MUTED,
ATTR_IS_GROUP_PLAYER,
ATTR_GROUP_CHILDS,
- ATTR_DEVICE_INFO,
- ATTR_FEATURES,
ATTR_SHOULD_POLL,
]
self._updated_at = datetime.utcnow()
self._group_parents = self.get_group_parents()
self._active_queue = self.get_active_queue()
- self._config_entries = self.get_player_config_entries()
+ self._group_delay = self.get_group_delay()
# schedule update to set the transforms
self.mass.add_job(self.async_update(player))
If provided, the property must return the REALTIME value while playing.
Used for synced playback in player groups.
"""
- return self.player.elapsed_milliseconds # always realtime returned from player
+ # always realtime returned from player
+ if self.player.elapsed_milliseconds is not None:
+ return self.player.elapsed_milliseconds - self.group_delay
+ return None
@property
def state(self) -> PlaybackState:
return self._features
@property
- def config_entries(self) -> List[ConfigEntry]:
- """Return player specific config entries (if any)."""
- return self._config_entries
+ def group_delay(self) -> int:
+ """Return group delay of this player in milliseconds (if configured)."""
+ return self._group_delay
async def async_update(self, player: Player):
+ """Run update player state task in executor."""
+ self.mass.add_job(self.update, player)
+
+ def update(self, player: Player):
"""Update attributes from player object."""
+ new_available = self.get_available(player.available)
+ if self.available == new_available and not new_available:
+ return # ignore players that are unavailable
+
# detect state changes
changed_keys = set()
- for attr in UPDATE_ATTRIBUTES:
- new_value = getattr(self._player, attr)
+ for attr in PLAYER_ATTRIBUTES:
+
+ new_value = getattr(self._player, attr, None)
# handle transformations
if attr == ATTR_NAME:
new_value = self.get_available(new_value)
elif attr == ATTR_VOLUME_LEVEL:
new_value = self.get_volume_level(new_value)
+ elif attr == ATTR_GROUP_PARENTS:
+ new_value = self.get_group_parents()
+ elif attr == ATTR_ACTIVE_QUEUE:
+ new_value = self.get_active_queue()
current_value = getattr(self, attr)
# value changed
setattr(self, "_" + attr, new_value)
changed_keys.add(attr)
- LOGGER.debug("Attribute %s changed on player %s", attr, self.player_id)
-
- # some attributes are always updated
- self._elapsed_time = player.elapsed_time
- self._updated_at = datetime.utcnow()
- self._group_parents = self.get_group_parents()
- self._active_queue = self.get_active_queue()
- self._config_entries = self.get_player_config_entries()
if changed_keys:
- self.mass.signal_event(EVENT_PLAYER_CHANGED, self)
- # update group player childs when parent updates
- if ATTR_GROUP_CHILDS in changed_keys:
+ self._updated_at = datetime.utcnow()
+
+ if changed_keys.intersection(set(UPDATE_ATTRIBUTES)):
+ self.mass.signal_event(EVENT_PLAYER_CHANGED, self)
+ # update group player childs when parent updates
for child_player_id in self.group_childs:
self.mass.add_job(
self.mass.players.async_trigger_player_update(child_player_id)
)
+ # update group player when child updates
+ for group_player_id in self.group_parents:
+ self.mass.add_job(
+ self.mass.players.async_trigger_player_update(group_player_id)
+ )
- # always update the player queue
- player_queue = self.mass.players.get_player_queue(self.active_queue)
- if player_queue:
- self.mass.add_job(player_queue.async_update_state())
+ # always update the player queue
+ player_queue = self.mass.players.get_player_queue(self.active_queue)
+ if player_queue:
+ self.mass.add_job(player_queue.async_update_state())
+ self._group_delay = self.get_group_delay()
- @callback
def get_name(self, name: str) -> str:
"""Return final/calculated player name."""
conf_name = self.mass.config.get_player_config(self.player_id)[CONF_NAME]
return conf_name if conf_name else name
- @callback
def get_power(self, power: bool) -> bool:
"""Return final/calculated player's power state."""
if not self.available:
return control.state
return power
- @callback
def get_state(self, state: PlaybackState) -> PlaybackState:
"""Return final/calculated player's playback state."""
if self.powered and self.active_queue != self.player_id:
return PlaybackState.Off
return state
- @callback
def get_available(self, available: bool) -> bool:
"""Return current availablity of player."""
- player_enabled = bool(
- self.mass.config.get_player_config(self.player_id)[CONF_ENABLED]
- )
+ player_enabled = self.mass.config.get_player_config(self.player_id)[
+ CONF_ENABLED
+ ]
return False if not player_enabled else available
- @callback
def get_volume_level(self, volume_level: int) -> int:
"""Return final/calculated player's volume_level."""
if not self.available:
"""Return all group players this player belongs to."""
return self._group_parents
- @callback
def get_group_parents(self) -> List[str]:
"""Return all group players this player belongs to."""
if self.is_group_player:
"""Return the active parent player/queue for a player."""
return self._active_queue
- @callback
def get_active_queue(self) -> str:
"""Return the active parent player/queue for a player."""
# if a group is powered on, all of it's childs will have/use
"""Return the datetime (UTC) that the player state was last updated."""
return self._updated_at
- @callback
- def get_player_config_entries(self):
- """Get final/calculated config entries for a player."""
- entries = []
- entries += self.player.config_entries
- # append power control config entries
- power_controls = self.mass.players.get_player_controls(PlayerControlType.POWER)
- if power_controls:
- controls = [
- {"text": f"{item.provider}: {item.name}", "value": item.control_id}
- for item in power_controls
- ]
- entries.append(
- ConfigEntry(
- entry_key=CONF_POWER_CONTROL,
- entry_type=ConfigEntryType.STRING,
- description=CONF_POWER_CONTROL,
- values=controls,
- )
- )
- # append volume control config entries
- volume_controls = self.mass.players.get_player_controls(
- PlayerControlType.VOLUME
- )
- if volume_controls:
- controls = [
- {"text": f"{item.provider}: {item.name}", "value": item.control_id}
- for item in volume_controls
- ]
- entries.append(
- ConfigEntry(
- entry_key=CONF_VOLUME_CONTROL,
- entry_type=ConfigEntryType.STRING,
- description=CONF_VOLUME_CONTROL,
- values=controls,
- )
- )
- # append group player entries
- for parent_id in self.group_parents:
- parent_player = self.mass.players.get_player_state(parent_id)
- if parent_player and parent_player.provider_id == "group_player":
- entries.append(
- ConfigEntry(
- entry_key=CONF_GROUP_DELAY,
- entry_type=ConfigEntryType.INT,
- default_value=0,
- range=(0, 500),
- description=CONF_GROUP_DELAY,
- )
- )
- break
- return entries
+ def get_group_delay(self):
+ """Get group delay for a player."""
+ player_settings = self.mass.config.get_player_config(self.player_id)
+ if player_settings:
+ return player_settings.get(CONF_GROUP_DELAY, 0)
+ return 0
- # @callback
def to_dict(self):
"""Instance attributes as dict so it can be serialized to json."""
return {
+++ /dev/null
-"""Local player provider."""
-import asyncio
-import logging
-import signal
-import subprocess
-from typing import List
-
-from music_assistant.models.config_entry import ConfigEntry
-from music_assistant.models.player import DeviceInfo, PlaybackState, Player
-from music_assistant.models.provider import PlayerProvider
-
-PROV_ID = "builtin"
-PROV_NAME = "Built-in (local) player"
-LOGGER = logging.getLogger(PROV_ID)
-
-
-async def async_setup(mass):
- """Perform async setup of this Plugin/Provider."""
- prov = BuiltinPlayerProvider()
- await mass.async_register_provider(prov)
-
-
-class BuiltinPlayerProvider(PlayerProvider):
- """Demo PlayerProvider which provides a single local player."""
-
- @property
- def id(self) -> str:
- """Return provider ID for this provider."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return provider Name for this provider."""
- return PROV_NAME
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return Config Entries for this provider."""
- return []
-
- async def async_on_start(self) -> bool:
- """Handle initialization of the provider based on config."""
- player = BuiltinPlayer("local_player", "Built-in player on the server")
- self.mass.add_job(self.mass.players.async_add_player(player))
- return True
-
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit."""
- for player in self.players:
- await player.async_cmd_stop()
-
-
-class BuiltinPlayer(Player):
- """Representation of a BuiltinPlayer."""
-
- def __init__(self, player_id: str, name: str) -> None:
- """Initialize the built-in player."""
- self._player_id = player_id
- self._name = name
- self._powered = False
- self._elapsed_time = 0
- self._state = PlaybackState.Stopped
- self._current_uri = ""
- self._volume_level = 100
- self._muted = False
- self._sox = None
- self._progress_task = None
-
- @property
- def player_id(self) -> str:
- """Return player id of this player."""
- return self._player_id
-
- @property
- def provider_id(self) -> str:
- """Return provider id of this player."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return name of the player."""
- return self._name
-
- @property
- def powered(self) -> bool:
- """Return current power state of player."""
- return self._powered
-
- @property
- def elapsed_time(self) -> float:
- """Return elapsed_time of current playing uri in seconds."""
- return self._elapsed_time
-
- @property
- def state(self) -> PlaybackState:
- """Return current PlaybackState of player."""
- return self._state
-
- @property
- def available(self) -> bool:
- """Return current availablity of player."""
- return True
-
- @property
- def current_uri(self) -> str:
- """Return currently loaded uri of player (if any)."""
- return self._current_uri
-
- @property
- def volume_level(self) -> int:
- """Return current volume level of player (scale 0..100)."""
- return self._volume_level
-
- @property
- def muted(self) -> bool:
- """Return current mute state of player."""
- return self._muted
-
- @property
- def is_group_player(self) -> bool:
- """Return True if this player is a group player."""
- return False
-
- @property
- def device_info(self) -> DeviceInfo:
- """Return the device info for this player."""
- return DeviceInfo(
- model="Demo", address="http://demo:12345", manufacturer=PROV_NAME
- )
-
- # SERVICE CALLS / PLAYER COMMANDS
-
- async def async_cmd_play_uri(self, uri: str):
- """Play the specified uri/url on the player."""
- if self._sox:
- await self.async_cmd_stop()
- self._current_uri = uri
- self._sox = subprocess.Popen(["play", "-t", "flac", "-q", uri])
- self._state = PlaybackState.Playing
- self._powered = True
- self.update_state()
-
- async def report_progress():
- """Report fake progress while sox is playing."""
- LOGGER.info("Playback started on player %s", self.name)
- self._elapsed_time = 0
- while self._sox and not self._sox.poll():
- await asyncio.sleep(1)
- self._elapsed_time += 1
- self.update_state()
- LOGGER.info("Playback stopped on player %s", self.name)
- self._elapsed_time = 0
- self._state = PlaybackState.Stopped
- self.update_state()
-
- if self._progress_task:
- self._progress_task.cancel()
- self._progress_task = self.mass.add_job(report_progress)
-
- async def async_cmd_stop(self) -> None:
- """Send STOP command to player."""
- if self._sox:
- self._sox.terminate()
- self._sox = None
- self._state = PlaybackState.Stopped
- self.update_state()
-
- async def async_cmd_play(self) -> None:
- """Send PLAY command to player."""
- if self._sox:
- self._sox.send_signal(signal.SIGCONT)
- self._state = PlaybackState.Playing
- self.update_state()
-
- async def async_cmd_pause(self):
- """Send PAUSE command to given player."""
- if self._sox:
- self._sox.send_signal(signal.SIGSTOP)
- self._state = PlaybackState.Paused
- self.update_state()
-
- async def async_cmd_power_on(self) -> None:
- """Send POWER ON command to player."""
- self._powered = True
- self.update_state()
-
- async def async_cmd_power_off(self) -> None:
- """Send POWER OFF command to player."""
- await self.async_cmd_stop()
- self._powered = False
- self.update_state()
-
- async def async_cmd_volume_set(self, volume_level: int) -> None:
- """
- Send volume level command to given player.
-
- :param volume_level: volume level to set (0..100).
- """
- self._volume_level = volume_level
- self.update_state()
-
- async def async_cmd_volume_mute(self, is_muted=False):
- """
- Send volume MUTE command to given player.
-
- :param is_muted: bool with new mute state.
- """
- self._muted = is_muted
- self.update_state()
+++ /dev/null
-{
- "nl": {
- "Built-in (local) player": "Ingebouwde speler van de server"
- }
-}
\ No newline at end of file
--- /dev/null
+"""Local player provider."""
+import asyncio
+import logging
+import signal
+import subprocess
+from typing import List
+
+from music_assistant.models.config_entry import ConfigEntry
+from music_assistant.models.player import DeviceInfo, PlaybackState, Player
+from music_assistant.models.provider import PlayerProvider
+
+PROV_ID = "builtin_player"
+PROV_NAME = "Built-in (local) player"
+LOGGER = logging.getLogger(PROV_ID)
+
+
+async def async_setup(mass):
+ """Perform async setup of this Plugin/Provider."""
+ prov = BuiltinPlayerProvider()
+ await mass.async_register_provider(prov)
+
+
+class BuiltinPlayerProvider(PlayerProvider):
+ """Demo PlayerProvider which provides a single local player."""
+
+ @property
+ def id(self) -> str:
+ """Return provider ID for this provider."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return provider Name for this provider."""
+ return PROV_NAME
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return Config Entries for this provider."""
+ return []
+
+ async def async_on_start(self) -> bool:
+ """Handle initialization of the provider based on config."""
+ player = BuiltinPlayer("local_player", "Built-in player on the server")
+ self.mass.add_job(self.mass.players.async_add_player(player))
+ return True
+
+ async def async_on_stop(self):
+ """Handle correct close/cleanup of the provider on exit."""
+ for player in self.players:
+ await player.async_cmd_stop()
+
+
+class BuiltinPlayer(Player):
+ """Representation of a BuiltinPlayer."""
+
+ def __init__(self, player_id: str, name: str) -> None:
+ """Initialize the built-in player."""
+ self._player_id = player_id
+ self._name = name
+ self._powered = False
+ self._elapsed_time = 0
+ self._state = PlaybackState.Stopped
+ self._current_uri = ""
+ self._volume_level = 100
+ self._muted = False
+ self._sox = None
+ self._progress_task = None
+
+ @property
+ def player_id(self) -> str:
+ """Return player id of this player."""
+ return self._player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ return self._name
+
+ @property
+ def powered(self) -> bool:
+ """Return current power state of player."""
+ return self._powered
+
+ @property
+ def elapsed_time(self) -> float:
+ """Return elapsed_time of current playing uri in seconds."""
+ return self._elapsed_time
+
+ @property
+ def state(self) -> PlaybackState:
+ """Return current PlaybackState of player."""
+ return self._state
+
+ @property
+ def available(self) -> bool:
+ """Return current availablity of player."""
+ return True
+
+ @property
+ def current_uri(self) -> str:
+ """Return currently loaded uri of player (if any)."""
+ return self._current_uri
+
+ @property
+ def volume_level(self) -> int:
+ """Return current volume level of player (scale 0..100)."""
+ return self._volume_level
+
+ @property
+ def muted(self) -> bool:
+ """Return current mute state of player."""
+ return self._muted
+
+ @property
+ def is_group_player(self) -> bool:
+ """Return True if this player is a group player."""
+ return False
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return the device info for this player."""
+ return DeviceInfo(
+ model="Demo", address="http://demo:12345", manufacturer=PROV_NAME
+ )
+
+ # SERVICE CALLS / PLAYER COMMANDS
+
+ async def async_cmd_play_uri(self, uri: str):
+ """Play the specified uri/url on the player."""
+ if self._sox:
+ await self.async_cmd_stop()
+ self._current_uri = uri
+ self._sox = subprocess.Popen(["play", "-t", "flac", "-q", uri])
+ self._state = PlaybackState.Playing
+ self._powered = True
+ self.update_state()
+
+ async def report_progress():
+ """Report fake progress while sox is playing."""
+ LOGGER.info("Playback started on player %s", self.name)
+ self._elapsed_time = 0
+ while self._sox and not self._sox.poll():
+ await asyncio.sleep(1)
+ self._elapsed_time += 1
+ self.update_state()
+ LOGGER.info("Playback stopped on player %s", self.name)
+ self._elapsed_time = 0
+ self._state = PlaybackState.Stopped
+ self.update_state()
+
+ if self._progress_task:
+ self._progress_task.cancel()
+ self._progress_task = self.mass.add_job(report_progress)
+
+ async def async_cmd_stop(self) -> None:
+ """Send STOP command to player."""
+ if self._sox:
+ self._sox.terminate()
+ self._sox = None
+ self._state = PlaybackState.Stopped
+ self.update_state()
+
+ async def async_cmd_play(self) -> None:
+ """Send PLAY command to player."""
+ if self._sox:
+ self._sox.send_signal(signal.SIGCONT)
+ self._state = PlaybackState.Playing
+ self.update_state()
+
+ async def async_cmd_pause(self):
+ """Send PAUSE command to given player."""
+ if self._sox:
+ self._sox.send_signal(signal.SIGSTOP)
+ self._state = PlaybackState.Paused
+ self.update_state()
+
+ async def async_cmd_power_on(self) -> None:
+ """Send POWER ON command to player."""
+ self._powered = True
+ self.update_state()
+
+ async def async_cmd_power_off(self) -> None:
+ """Send POWER OFF command to player."""
+ await self.async_cmd_stop()
+ self._powered = False
+ self.update_state()
+
+ async def async_cmd_volume_set(self, volume_level: int) -> None:
+ """
+ Send volume level command to given player.
+
+ :param volume_level: volume level to set (0..100).
+ """
+ self._volume_level = volume_level
+ self.update_state()
+
+ async def async_cmd_volume_mute(self, is_muted=False):
+ """
+ Send volume MUTE command to given player.
+
+ :param is_muted: bool with new mute state.
+ """
+ self._muted = is_muted
+ self.update_state()
--- /dev/null
+{
+ "nl": {
+ "Built-in (local) player": "Ingebouwde speler van de server"
+ }
+}
\ No newline at end of file
player_id = str(cast_service[1])
friendly_name = cast_service[3]
LOGGER.debug("Chromecast removed: %s - %s", friendly_name, player_id)
- self.mass.add_job(self.mass.players.async_remove_player(player_id))
+ # we ignore this event completely as the Chromecast socket clients handles this on its own
import pychromecast
from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.util import compare_strings, yield_chunks
+from music_assistant.helpers.util import async_yield_chunks, compare_strings
from music_assistant.models.config_entry import ConfigEntry
from music_assistant.models.player import (
DeviceInfo,
self.mz_mgr = None
self.mz_manager = None
self._available = False
- self._powered = False
self._status_listener: Optional[CastStatusListener] = None
self._is_speaker_group = False
@property
def powered(self) -> bool:
"""Return power state of this player."""
- return self._powered
+ return not self.cast_status.volume_muted if self.cast_status else False
@property
def should_poll(self) -> bool:
"""Handle updates of the media status."""
self.media_status = media_status
self.update_state()
- if media_status.player_is_playing:
- self._powered = True
def new_connection_status(self, connection_status) -> None:
"""Handle updates of connection status."""
self._available = new_available
self.update_state()
if self._cast_info.is_audio_group and new_available:
- self._chromecast.mz_controller.update_members()
+ self.mass.add_job(self._chromecast.mz_controller.update_members)
async def async_on_update(self) -> None:
"""Call when player is periodically polled by the player manager (should_poll=True)."""
if self.mass.players.get_player_state(self.player_id).active_queue.startswith(
"group_player"
):
+ # the group player wants very accurate elapsed_time state so we request it very often
self.mass.add_job(self._chromecast.media_controller.update_status)
self.update_state()
async def async_cmd_power_on(self) -> None:
"""Send power ON command to player."""
- self._powered = True
self.mass.add_job(self._chromecast.set_volume_muted, False)
async def async_cmd_power_off(self) -> None:
or self.media_status.player_is_idle
):
self.mass.add_job(self._chromecast.media_controller.stop)
- self._powered = False
# chromecast has no real poweroff so we send mute instead
self.mass.add_job(self._chromecast.set_volume_muted, True)
async def async_cmd_queue_append(self, queue_items: List[QueueItem]) -> None:
"""Append new items at the end of the queue."""
cc_queue_items = self.__create_queue_items(queue_items)
- for chunk in yield_chunks(cc_queue_items, 50):
+ async for chunk in async_yield_chunks(cc_queue_items, 50):
queuedata = {
"type": "QUEUE_INSERT",
"insertBefore": None,
import logging
from typing import List
-from music_assistant.constants import CONF_GROUP_DELAY
from music_assistant.helpers.typing import MusicAssistantType
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
from music_assistant.models.player import DeviceInfo, PlaybackState, Player
self.connected_clients = {}
self.stream_task = None
self.sync_task = None
+ self._config_entries = self.__get_config_entries()
+ self._group_childs = self.__get_group_childs()
@property
def player_id(self) -> str:
@property
def group_childs(self):
"""Return group childs of this group player."""
- player_conf = self.mass.config.get_player_config(self.player_id)
- if player_conf and player_conf.get(CONF_PLAYERS):
- return player_conf[CONF_PLAYERS]
- return []
+ return self._group_childs
@property
def device_info(self) -> DeviceInfo:
@property
def config_entries(self):
+ """Return config entries for this group player."""
+ return self._config_entries
+
+ async def async_on_update(self) -> None:
+ """Call when player is periodically polled by the player manager (should_poll=True)."""
+ self._config_entries = self.__get_config_entries()
+ self._group_childs = self.__get_group_childs()
+ self.update_state()
+
+ def __get_group_childs(self):
+ """Return group childs of this group player."""
+ player_conf = self.mass.config.get_player_config(self.player_id)
+ if player_conf and player_conf.get(CONF_PLAYERS):
+ return player_conf[CONF_PLAYERS]
+ return []
+
+ def __get_config_entries(self):
"""Return config entries for this group player."""
all_players = [
{"text": item.name, "value": item.player_id}
)
default_master = ""
if selected_players:
- default_master = selected_players[0]
+ default_master = selected_players[0]["value"]
return [
ConfigEntry(
entry_key=CONF_PLAYERS,
entry_type=ConfigEntryType.STRING,
default_value=[],
values=all_players,
- description=CONF_PLAYERS,
+ label=CONF_PLAYERS,
+ description="group_player_players_desc",
multi_value=True,
),
ConfigEntry(
entry_type=ConfigEntryType.STRING,
default_value=default_master,
values=selected_players,
- description=CONF_MASTER,
+ label=CONF_MASTER,
+ description="group_player_master_desc",
multi_value=False,
- depends_on=CONF_MASTER,
+ depends_on=CONF_PLAYERS,
),
]
"""Handle streaming queue to connected child players."""
ticks = 0
while ticks < 60 and len(self.connected_clients) != len(self.group_childs):
- # TODO: Support situation where not alle clients of the group are powered
+ # TODO: Support situation where not all clients of the group are powered
await asyncio.sleep(0.1)
ticks += 1
if not self.connected_clients:
)
self.sync_task = asyncio.create_task(self.__synchronize_players())
- received_milliseconds = 0
- received_seconds = 0
async for audio_chunk in self.mass.streams.async_queue_stream_pcm(
self.player_id, sample_rate=96000, bit_depth=32
):
- received_seconds += 1
- received_milliseconds += 1000
- chunk_size = len(audio_chunk)
- start_bytes = 0
# make sure we still have clients connected
if not self.connected_clients:
return
# send the audio chunk to all connected players
- for child_player_id, writer in self.connected_clients.items():
-
- # work out startdelay
- if received_seconds == 1:
- player_delay = self.mass.config.player_settings[
- child_player_id
- ].get(CONF_GROUP_DELAY, 0)
- if player_delay:
- start_bytes = int(
- ((player_delay - received_milliseconds) / 1000) * chunk_size
- )
- else:
- start_bytes = 0
-
- # send the data to the client
- try:
- writer.write(audio_chunk[start_bytes:])
- await writer.drain()
- except (BrokenPipeError, ConnectionResetError, AssertionError):
- pass # happens at client disconnect
+ tasks = []
+ for _writer in self.connected_clients.values():
+ tasks.append(self.mass.add_job(_writer.write, audio_chunk))
+ tasks.append(self.mass.add_job(_writer.drain()))
+ # wait for clients to consume the data
+ await asyncio.wait(tasks)
if not self.connected_clients:
LOGGER.warning("no more clients!")
return
+ self.sync_task.cancel()
async def __synchronize_players(self):
"""Handle drifting/lagging by monitoring progress and compare to master player."""
while self.connected_clients:
- # check every 2 seconds for player sync
+ # check every 0.5 seconds for player sync
await asyncio.sleep(0.5)
for child_player_id in self.connected_clients:
- child_player.elapsed_milliseconds
)
prev_lags[child_player_id].append(lag)
- if len(prev_lags[child_player_id]) == 10:
- # if we have 10 samples calclate the average lag
+ if len(prev_lags[child_player_id]) == 5:
+ # if we have 5 samples calclate the average lag
avg_lag = sum(prev_lags[child_player_id]) / len(
prev_lags[child_player_id]
)
prev_lags[child_player_id] = []
- if avg_lag > 50:
+ if avg_lag > 25:
LOGGER.debug(
"child player %s is lagging behind with %s milliseconds",
child_player_id,
- master_player.elapsed_milliseconds
)
prev_drifts[child_player_id].append(drift)
- if len(prev_drifts[child_player_id]) == 10:
- # if we have 10 samples calculate the average drift
+ if len(prev_drifts[child_player_id]) == 5:
+ # if we have 5 samples calculate the average drift
avg_drift = sum(prev_drifts[child_player_id]) / len(
prev_drifts[child_player_id]
)
prev_drifts[child_player_id] = []
- if avg_drift > 50:
+ if avg_drift > 25:
LOGGER.debug(
"child player %s is drifting ahead with %s milliseconds",
child_player_id,
--- /dev/null
+{
+ "en": {
+ "Universal Group Players": "Universal Group Players",
+ "group_player_count": "Number of group players",
+ "group_player_count_desc": "Select how many Universal group players should be created.",
+ "group_player_players": "Players in group",
+ "group_player_players_desc": "Select the players that should be part of this group.",
+ "group_player_master": "Group master",
+ "group_player_master_desc": "Select the player that should act as group master."
+ },
+ "nl": {
+ "Universal Group Players": "Universele groep spelers",
+ "group_player_count": "Aantal groep spelers",
+ "group_player_count_desc": "Selecteer hoeveel groep spelers er aangemaakt moeten worden.",
+ "group_player_players": "Groepsspelers",
+ "group_player_players_desc": "Selecteer de spelers die deel uitmaken van deze groep.",
+ "group_player_master": "Groepsbeheerder",
+ "group_player_master_desc": "Selecteer de speler die dient als groepsbeheerder."
+ }
+}
\ No newline at end of file
result = orjson.loads(stdout)
except orjson.JSONDecodeError:
LOGGER.warning("Error while retrieving Spotify token!")
- result = None
+ return None
# transform token info to spotipy compatible format
if result and "accessToken" in result:
tokeninfo = result
tokeninfo["expiresAt"] = tokeninfo["expiresIn"] + int(time.time())
- return tokeninfo
+ return tokeninfo
+ return None
async def __async_get_all_items(self, endpoint, params=None, key="items"):
"""Get all items from a paged list."""
"""Handle a client connection on the socket."""
addr = writer.get_extra_info("peername")
LOGGER.debug("Socket client connected: %s", addr)
- socket_client = SqueezeSocketClient(reader, writer)
+ socket_client = SqueezeSocketClient(self.mass, reader, writer)
def handle_event(event: SqueezeEvent, socket_client: SqueezeSocketClient):
player_id = socket_client.player_id
from enum import Enum
from typing import Callable
-from music_assistant.helpers.util import callback, run_periodic
+from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.helpers.util import run_periodic
from .constants import PROV_ID
def __init__(
self,
+ mass: MusicAssistantType,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
event_callback: Callable = None,
):
"""Initialize the socket client."""
+ self.mass = mass
self._reader = reader
self._writer = writer
self._player_id = ""
power_int = 1 if powered else 0
await self.__async_send_frame(b"aude", struct.pack("2B", power_int, 1))
self._powered = powered
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
async def async_cmd_volume_set(self, volume_level: int):
"""Send new volume level command to player."""
muted_int = 0 if muted else 1
await self.__async_send_frame(b"aude", struct.pack("2B", muted_int, 0))
self.muted = muted
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
async def async_play_uri(
self, uri: str, send_flush: bool = True, crossfade_duration: int = 0
self._connected = False
self.signal_event(SqueezeEvent.DISCONNECTED)
- @callback
@staticmethod
def __pack_stream(
command,
server_ip,
)
- @callback
def _process_helo(self, data):
"""Process incoming HELO event from player (player connected)."""
# pylint: disable=unused-variable
asyncio.create_task(self.__async_initialize_player())
self.signal_event(SqueezeEvent.CONNECTED)
- @callback
def _process_stat(self, data):
"""Redirect incoming STAT event from player to correct method."""
event = data[:4].decode()
if event_handler is None:
LOGGER.debug("Unhandled event: %s - event_data: %s", event, event_data)
else:
- event_handler(data[4:])
+ self.mass.add_job(event_handler, data[4:])
- @callback
def _process_stat_aude(self, data):
"""Process incoming stat AUDe message (power level and mute)."""
(spdif_enable, dac_enable) = struct.unpack("2B", data[:4])
self._muted = not powered
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_stat_audg(self, data):
"""Process incoming stat AUDg message (volume level)."""
# TODO: process volume level
LOGGER.debug("AUDg received - Volume level: %s", data)
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_stat_stmd(self, data):
"""Process incoming stat STMd message (decoder ready)."""
# pylint: disable=unused-argument
LOGGER.debug("STMu received - Decoder Ready for next track.")
self.signal_event(SqueezeEvent.DECODER_READY)
- @callback
def _process_stat_stmf(self, data):
"""Process incoming stat STMf message (connection closed)."""
# pylint: disable=unused-argument
self._elapsed_seconds = 0
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
@classmethod
def _process_stat_stmo(cls, data):
"""
# pylint: disable=unused-argument
LOGGER.debug("STMo received - output underrun.")
- @callback
def _process_stat_stmp(self, data):
"""Process incoming stat STMp message: Pause confirmed."""
# pylint: disable=unused-argument
self._state = STATE_PAUSED
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_stat_stmr(self, data):
"""Process incoming stat STMr message: Resume confirmed."""
# pylint: disable=unused-argument
self._state = STATE_PLAYING
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_stat_stms(self, data):
# pylint: disable=unused-argument
"""Process incoming stat STMs message: Playback of new track has started."""
self._state = STATE_PLAYING
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_stat_stmt(self, data):
"""Process incoming stat STMt message: heartbeat from client."""
# pylint: disable=unused-variable
self._elapsed_seconds = elapsed_seconds
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_stat_stmu(self, data):
"""Process incoming stat STMu message: Buffer underrun: Normal end of playback."""
# pylint: disable=unused-argument
self.state = STATE_STOPPED
self.signal_event(SqueezeEvent.STATE_UPDATED)
- @callback
def _process_resp(self, data):
"""Process incoming RESP message: Response received at player."""
# pylint: disable=unused-argument
# send continue
asyncio.create_task(self.__async_send_frame(b"cont", b"0"))
- @callback
def _process_setd(self, data):
"""Process incoming SETD message: Get/set player firmware settings."""
cmd_id = data[0]
import time
from typing import List
+from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.helpers.util import run_periodic
from music_assistant.models.config_entry import ConfigEntry
-from music_assistant.models.player import PlaybackState, Player
-from music_assistant.models.playerprovider import PlayerProvider
-from music_assistant.utils import run_periodic
+from music_assistant.models.player import (
+ DeviceInfo,
+ PlaybackState,
+ Player,
+ PlayerFeature,
+)
+from music_assistant.models.provider import PlayerProvider
PROV_ID = "webplayer"
PROV_NAME = "WebPlayer"
LOGGER = logging.getLogger(PROV_ID)
CONFIG_ENTRIES = []
+PLAYER_CONFIG_ENTRIES = []
+PLAYER_FEATURES = []
EVENT_WEBPLAYER_CMD = "webplayer command"
EVENT_WEBPLAYER_STATE = "webplayer state"
)
self.mass.add_job(self.async_check_players())
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit. Called on shutdown."""
- # nothing to do ?
-
async def async_handle_mass_event(self, msg, msg_details):
"""Handle received event for the webplayer component."""
- if msg == EVENT_WEBPLAYER_REGISTER:
+ player = self.mass.players.get_player(msg_details["player_id"])
+ if not player:
# register new player
- player_id = msg_details["player_id"]
- player = Player(
- player_id=player_id,
- provider_id=PROV_ID,
- name=msg_details["name"],
- powered=True,
- )
- await self.mass.player_manager.async_add_player(player)
-
- elif msg == EVENT_WEBPLAYER_STATE:
- await self.__async_handle_player_state(msg_details)
+ player = WebPlayer(self.mass, msg_details["player_id"], msg_details["name"])
+ await self.mass.players.async_add_player(player)
+ await player.handle_player_state(msg_details)
@run_periodic(30)
async def async_check_players(self) -> None:
"""Invalidate players that did not send a heartbeat message in a while."""
cur_time = time.time()
offline_players = []
- for player in self._players.values():
+ for player in self.players:
if cur_time - player.last_message > 30:
offline_players.append(player.player_id)
for player_id in offline_players:
- await self.mass.player_manager.async_remove_player(player_id)
- self._players.pop(player_id, None)
-
- async def async_cmd_stop(self, player_id: str) -> None:
- """Send stop command to player."""
- data = {"player_id": player_id, "cmd": "stop"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_play(self, player_id: str) -> None:
- """Send play command to player."""
- data = {"player_id": player_id, "cmd": "play"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_pause(self, player_id: str):
- """Send pause command to player."""
- data = {"player_id": player_id, "cmd": "pause"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_power_on(self, player_id: str) -> None:
- """Send power ON command to player."""
- self._players[player_id].powered = True # not supported on webplayer
- data = {"player_id": player_id, "cmd": "stop"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_power_off(self, player_id: str) -> None:
- """Send power OFF command to player."""
- await self.async_cmd_stop(player_id)
- self._players[player_id].powered = False
-
- async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send new volume level command to player."""
- data = {
- "player_id": player_id,
- "cmd": "volume_set",
- "volume_level": volume_level,
- }
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_volume_mute(self, player_id: str, is_muted=False):
- """Send mute command to player."""
- data = {"player_id": player_id, "cmd": "volume_mute", "is_muted": is_muted}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_play_uri(self, player_id: str, uri: str):
- """Play single uri on player."""
- data = {"player_id": player_id, "cmd": "play_uri", "uri": uri}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+ await self.mass.players.async_remove_player(player_id)
async def __async_handle_player_state(self, data):
"""Handle state event from player."""
if "name" in data:
player.name = data["name"]
player.last_message = time.time()
- self.mass.add_job(self.mass.player_manager.async_update_player(player))
+ self.mass.add_job(self.mass.players.async_update_player(player))
+
+
+class WebPlayer(Player):
+ """Definition of a webplayer."""
+
+ def __init__(self, mass: MusicAssistantType, player_id: str, player_name: str):
+ """Initialize the webplayer."""
+ self._player_id = player_id
+ self._player_name = player_name
+ self._powered = True
+ self._elapsed_time = 0
+ self._state = PlaybackState.Stopped
+ self._current_uri = ""
+ self._volume_level = 100
+ self._muted = False
+ self.last_message = time.time()
+
+ async def handle_player_state(self, data: dict):
+ """Handle state event from player."""
+ if "volume_level" in data:
+ self._volume_level = data["volume_level"]
+ if "muted" in data:
+ self._muted = data["muted"]
+ if "state" in data:
+ self._state = PlaybackState(data["state"])
+ if "cur_time" in data:
+ self._elapsed_time = data["elapsed_time"]
+ if "current_uri" in data:
+ self._current_uri = data["current_uri"]
+ if "powered" in data:
+ self._powered = data["powered"]
+ if "name" in data:
+ self._player_name = data["name"]
+ self.last_message = time.time()
+ self.update_state()
+
+ @property
+ def player_id(self) -> str:
+ """Return player id of this player."""
+ return self._player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ return self._player_name
+
+ @property
+ def powered(self) -> bool:
+ """Return current power state of player."""
+ return self._powered
+
+ @property
+ def elapsed_time(self) -> int:
+ """Return elapsed time of current playing media in seconds."""
+ return self._elapsed_time
+
+ @property
+ def state(self) -> PlaybackState:
+ """Return current PlaybackState of player."""
+ return self._state
+
+ @property
+ def current_uri(self) -> str:
+ """Return currently loaded uri of player (if any)."""
+ return self._current_uri
+
+ @property
+ def volume_level(self) -> int:
+ """Return current volume level of player (scale 0..100)."""
+ return self._volume_level
+
+ @property
+ def muted(self) -> bool:
+ """Return current mute state of player."""
+ return self._muted
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return the device info for this player."""
+ return DeviceInfo()
+
+ @property
+ def should_poll(self) -> bool:
+ """Return True if this player should be polled for state updates."""
+ return False
+
+ @property
+ def features(self) -> List[PlayerFeature]:
+ """Return list of features this player supports."""
+ return PLAYER_FEATURES
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return player specific config entries (if any)."""
+ return PLAYER_CONFIG_ENTRIES
+
+ async def async_cmd_play_uri(self, uri: str) -> None:
+ """
+ Play the specified uri/url on the player.
+
+ :param uri: uri/url to send to the player.
+ """
+ data = {"player_id": self.player_id, "cmd": "play_uri", "uri": uri}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_stop(self) -> None:
+ """Send STOP command to player."""
+ data = {"player_id": self.player_id, "cmd": "stop"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_play(self) -> None:
+ """Send PLAY command to player."""
+ data = {"player_id": self.player_id, "cmd": "play"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_pause(self) -> None:
+ """Send PAUSE command to player."""
+ data = {"player_id": self.player_id, "cmd": "pause"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_power_on(self) -> None:
+ """Send POWER ON command to player."""
+ data = {"player_id": self.player_id, "cmd": "power_on"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_power_off(self) -> None:
+ """Send POWER OFF command to player."""
+ data = {"player_id": self.player_id, "cmd": "power_off"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_volume_set(self, volume_level: int) -> None:
+ """
+ Send volume level command to player.
+
+ :param volume_level: volume level to set (0..100).
+ """
+ data = {
+ "player_id": self.player_id,
+ "cmd": "volume_set",
+ "volume_level": volume_level,
+ }
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_volume_mute(self, is_muted: bool = False) -> None:
+ """
+ Send volume MUTE command to given player.
+
+ :param is_muted: bool with new mute state.
+ """
+ data = {"player_id": self.player_id, "cmd": "volume_mute", "is_muted": is_muted}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
"https_port": "HTTPS Port",
"ssl_certificate": "SSL Certificate file location",
"ssl_key": "Path to certificate key file",
-
+
"desc_sample_rate": "Set the maximum sample rate this player can handle.",
"desc_volume_normalisation": "Enable R128 volume normalisation to play music at an equally loud volume.",
"desc_target_volume": "Set the preferred target volume level in LUFS. The R128 default is -22 LUFS.",
"desc_target_volume": "Selecteer het gewenste doelvolume in LUFS. De R128 standaard is -22 LUFS.",
"desc_gain_correct": "Stel een fallback gain correctie in als er geen R128 meting beschikbaar is.",
"desc_crossfade": "Crossfade inschakelen door het instellen van een crossfade duur in seconden.",
- "desc_enable_provider": "Deze provider inschakelen."
+ "desc_enable_provider": "Deze provider inschakelen.",
+ "desc_http_port": "De TCP poort waarop de HTTP webserver gestart mag worden.",
+ "desc_https_port": "De TCP poort waarop de HTTPS webserver gestart mag worden. De HTTPS Server wordt alleen ingeschakeld indien er ook valide certificaat gegevens worden opgegegven.",
+ "desc_ssl_certificate": "Geef het pad op naar het certificaat bestand (PEM).",
+ "desc_ssl_key": "Geef het pad om naar het bestand met de private key.",
+ "desc_external_url": "Geef de URL waarop deze Music Assistant server extern te benaderen is. Zorg dat dit overeenomst met het certificaat.",
+ "desc_base_username": "Gebruikersnaam waarmee deze server beveiligd moet worden.",
+ "desc_base_password": "Wachtwoord waarmee deze server beveiligd moet worden. Mag worden leeggelaten maar dit is extreem gevaarlijk indien je besluit de server extern toegankelijk te maken."
}
}
websocket,
)
-LOGGER = logging.getLogger("mass")
+LOGGER = logging.getLogger("webserver")
routes = web.RouteTableDef()
app.add_routes(websocket.routes)
app.add_routes(routes)
- webdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "web/static/")
+ webdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static/")
if os.path.isdir(webdir):
app.router.add_static("/", webdir, append_version=True)
else:
"""Get the index page, redirect if we do not have a web directory."""
# pylint: disable=unused-argument
html_app = os.path.join(
- os.path.dirname(os.path.abspath(__file__)), "web/static/index.html"
+ os.path.dirname(os.path.abspath(__file__)), "static/index.html"
)
if not os.path.isfile(html_app):
raise web.HTTPFound("https://music-assistant.github.io/app")
"""Albums API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_stream_json
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/albums")
@login_required
-async def async_albums(request: web.Request):
+async def async_albums(request: Request):
"""Get all albums known in the database."""
generator = request.app["mass"].database.async_get_albums()
return await async_stream_json(request, generator)
@routes.get("/api/albums/{item_id}")
@login_required
-async def async_album(request: web.Request):
+async def async_album(request: Request):
"""Get full album details."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
lazy = request.rel_url.query.get("lazy", "true") != "false"
if item_id is None or provider is None:
- return web.Response(text="invalid item or provider", status=501)
+ return Response(text="invalid item or provider", status=501)
result = await request.app["mass"].music.async_get_album(
item_id, provider, lazy=lazy
)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.get("/api/albums/{item_id}/tracks")
@login_required
-async def async_album_tracks(request: web.Request):
+async def async_album_tracks(request: Request):
"""Get album tracks from provider."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
generator = request.app["mass"].music.async_get_album_tracks(item_id, provider)
return await async_stream_json(request, generator)
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
generator = request.app["mass"].music.async_get_album_versions(item_id, provider)
return await async_stream_json(request, generator)
"""Artists API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_stream_json
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/artists")
@login_required
-async def async_artists(request: web.Request):
+async def async_artists(request: Request):
"""Get all artists known in the database."""
generator = request.app["mass"].database.async_get_artists()
return await async_stream_json(request, generator)
@routes.get("/api/artists/{item_id}")
@login_required
-async def async_artist(request: web.Request):
+async def async_artist(request: Request):
"""Get full artist details."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
lazy = request.rel_url.query.get("lazy", "true") != "false"
if item_id is None or provider is None:
- return web.Response(text="invalid item or provider", status=501)
+ return Response(text="invalid item or provider", status=501)
result = await request.app["mass"].music.async_get_artist(
item_id, provider, lazy=lazy
)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.get("/api/artists/{item_id}/toptracks")
@login_required
-async def async_artist_toptracks(request: web.Request):
+async def async_artist_toptracks(request: Request):
"""Get top tracks for given artist."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
generator = request.app["mass"].music.async_get_artist_toptracks(item_id, provider)
return await async_stream_json(request, generator)
@routes.get("/api/artists/{item_id}/albums")
@login_required
-async def async_artist_albums(request: web.Request):
+async def async_artist_albums(request: Request):
"""Get (all) albums for given artist."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
generator = request.app["mass"].music.async_get_artist_albums(item_id, provider)
return await async_stream_json(request, generator)
"""Config API endpoints."""
import orjson
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef, json_response
from aiohttp_jwt import login_required
from music_assistant.constants import (
CONF_KEY_BASE,
)
from music_assistant.helpers.util import json_serializer
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/config")
@login_required
-async def async_get_config(request: web.Request):
+async def async_get_config(request: Request):
"""Get the full config."""
- language = request.rel_url.query.get("lang", "en")
conf = {
- CONF_KEY_BASE: request.app["mass"].config.base.to_dict(language),
- CONF_KEY_MUSIC_PROVIDERS: request.app["mass"].config.music_providers.to_dict(
- language
- ),
- CONF_KEY_PLAYER_PROVIDERS: request.app["mass"].config.player_providers.to_dict(
- language
- ),
- CONF_KEY_METADATA_PROVIDERS: request.app[
- "mass"
- ].config.metadata_providers.to_dict(language),
- CONF_KEY_PLUGINS: request.app["mass"].config.plugins.to_dict(language),
- CONF_KEY_PLAYER_SETTINGS: request.app["mass"].config.player_settings.to_dict(
- language
- ),
+ key: f"/api/config/{key}"
+ for key in [
+ CONF_KEY_BASE,
+ CONF_KEY_MUSIC_PROVIDERS,
+ CONF_KEY_PLAYER_PROVIDERS,
+ CONF_KEY_METADATA_PROVIDERS,
+ CONF_KEY_PLUGINS,
+ CONF_KEY_PLAYER_SETTINGS,
+ ]
}
- return web.Response(body=json_serializer(conf), content_type="application/json")
+ return Response(body=json_serializer(conf), content_type="application/json")
@routes.get("/api/config/{base}")
@login_required
-async def async_get_config_item(request: web.Request):
+async def async_get_config_base_item(request: Request):
"""Get the config by base type."""
language = request.rel_url.query.get("lang", "en")
conf_base = request.match_info.get("base")
- conf = request.app["mass"].config[conf_base]
- return web.Response(
- body=json_serializer(conf.to_dict(language)), content_type="application/json"
- )
+ conf = request.app["mass"].config[conf_base].all_items(language)
+ return Response(body=json_serializer(conf), content_type="application/json")
+
+
+@routes.get("/api/config/{base}/{item}")
+@login_required
+async def async_get_config_item(request: Request):
+ """Get the config by base and item type."""
+ language = request.rel_url.query.get("lang", "en")
+ conf_base = request.match_info.get("base")
+ conf_item = request.match_info.get("item")
+ conf = request.app["mass"].config[conf_base][conf_item].all_items(language)
+ return Response(body=json_serializer(conf), content_type="application/json")
@routes.put("/api/config/{base}/{key}/{entry_key}")
@login_required
-async def async_put_config(request: web.Request):
+async def async_put_config(request: Request):
"""Save the given config item."""
conf_key = request.match_info.get("key")
conf_base = request.match_info.get("base")
.default_value
)
request.app["mass"].config[conf_base][conf_key][entry_key] = new_value
- return web.json_response(True)
+ return json_response(True)
import os
-from aiohttp import web
+from aiohttp.web import FileResponse, Request, Response, RouteTableDef
from music_assistant.models.media_types import MediaType
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/providers/{provider_id}/icon")
-async def async_get_provider_icon(request: web.Request):
+async def async_get_provider_icon(request: Request):
"""Get Provider icon."""
provider_id = request.match_info.get("provider_id")
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
icon_path = os.path.join(base_dir, "..", "providers", provider_id, "icon.png")
if os.path.isfile(icon_path):
headers = {"Cache-Control": "max-age=86400, public", "Pragma": "public"}
- return web.FileResponse(icon_path, headers=headers)
- return web.Response(status=404)
+ return FileResponse(icon_path, headers=headers)
+ return Response(status=404)
@routes.get("/api/{media_type}/{media_id}/thumb")
-async def async_get_image(request: web.Request):
+async def async_get_image(request: Request):
"""Get (resized) thumb image."""
media_type_str = request.match_info.get("media_type")
media_type = MediaType.from_string(media_type_str)
media_id = request.match_info.get("media_id")
provider = request.rel_url.query.get("provider")
if media_id is None or provider is None:
- return web.Response(text="invalid media_id or provider", status=501)
+ return Response(text="invalid media_id or provider", status=501)
size = int(request.rel_url.query.get("size", 0))
img_file = await request.app["mass"].music.async_get_image_thumb(
media_id, provider, media_type, size
)
if not img_file or not os.path.isfile(img_file):
- return web.Response(status=404)
+ return Response(status=404)
headers = {"Cache-Control": "max-age=86400, public", "Pragma": "public"}
- return web.FileResponse(img_file, headers=headers)
+ return FileResponse(img_file, headers=headers)
"""JSON RPC API endpoint."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from music_assistant.helpers.web import require_local_subnet
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.route("get", "/jsonrpc.js")
@routes.route("post", "/jsonrpc.js")
@require_local_subnet
-async def async_json_rpc(request: web.Request):
+async def async_json_rpc(request: Request):
"""
Implement LMS jsonrpc interface.
elif cmd_str == "button power":
await request.app["mass"].players.async_cmd_power_toggle(player_id)
else:
- return web.Response(text="command not supported")
- return web.Response(text="success")
+ return Response(text="command not supported")
+ return Response(text="success")
"""Library API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_media_items_from_body, async_stream_json
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/library/artists")
@login_required
-async def async_library_artists(request: web.Request):
+async def async_library_artists(request: Request):
"""Get all library artists."""
orderby = request.query.get("orderby", "name")
provider_filter = request.rel_url.query.get("provider")
@routes.get("/api/library/albums")
@login_required
-async def async_library_albums(request: web.Request):
+async def async_library_albums(request: Request):
"""Get all library albums."""
orderby = request.query.get("orderby", "name")
provider_filter = request.rel_url.query.get("provider")
@routes.get("/api/library/tracks")
@login_required
-async def async_library_tracks(request: web.Request):
+async def async_library_tracks(request: Request):
"""Get all library tracks."""
orderby = request.query.get("orderby", "name")
provider_filter = request.rel_url.query.get("provider")
@routes.get("/api/library/radios")
@login_required
-async def async_library_radios(request: web.Request):
+async def async_library_radios(request: Request):
"""Get all library radios."""
orderby = request.query.get("orderby", "name")
provider_filter = request.rel_url.query.get("provider")
@routes.get("/api/library/playlists")
@login_required
-async def async_library_playlists(request: web.Request):
+async def async_library_playlists(request: Request):
"""Get all library playlists."""
orderby = request.query.get("orderby", "name")
provider_filter = request.rel_url.query.get("provider")
@routes.put("/api/library")
@login_required
-async def async_library_add(request: web.Request):
+async def async_library_add(request: Request):
"""Add item(s) to the library."""
body = await request.json()
media_items = await async_media_items_from_body(request.app["mass"], body)
result = await request.app["mass"].music.async_library_add(media_items)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.delete("/api/library")
@login_required
-async def async_library_remove(request: web.Request):
+async def async_library_remove(request: Request):
"""Remove item(s) from the library."""
body = await request.json()
media_items = await async_media_items_from_body(request.app["mass"], body)
result = await request.app["mass"].music.async_library_remove(media_items)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
import datetime
import jwt
-from aiohttp import web
+from aiohttp.web import HTTPUnauthorized, Request, Response, RouteTableDef
from music_assistant.helpers.typing import MusicAssistantType
from music_assistant.helpers.util import json_serializer
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.post("/login")
@routes.post("/api/login")
-async def async_login(request: web.Request):
+async def async_login(request: Request):
"""Handle the retrieval of a JWT token."""
form = await request.json()
username = form.get("username")
password = form.get("password")
token_info = await async_get_token(request.app["mass"], username, password)
if token_info:
- return web.Response(
+ return Response(
body=json_serializer(token_info), content_type="application/json"
)
- return web.HTTPUnauthorized(body="Invalid username and/or password provided!")
+ return HTTPUnauthorized(body="Invalid username and/or password provided!")
async def async_get_token(
"""Players API endpoints."""
import orjson
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_media_items_from_body, async_stream_json
from music_assistant.models.player_queue import QueueOption
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/players")
@login_required
-async def async_players(request: web.Request):
+async def async_players(request: Request):
# pylint: disable=unused-argument
"""Get all playerstates."""
player_states = request.app["mass"].players.player_states
player_states.sort(key=lambda x: str(x.name), reverse=False)
players = [player_state.to_dict() for player_state in player_states]
- return web.Response(body=json_serializer(players), content_type="application/json")
+ return Response(body=json_serializer(players), content_type="application/json")
@routes.post("/api/players/{player_id}/cmd/{cmd}")
@login_required
-async def async_player_command(request: web.Request):
+async def async_player_command(request: Request):
"""Issue player command."""
success = False
player_id = request.match_info.get("player_id")
cmd = request.match_info.get("cmd")
try:
cmd_args = await request.json(loads=orjson.loads)
+ if cmd_args in ["", {}, []]:
+ cmd_args = None
except orjson.JSONDecodeError:
cmd_args = None
player_cmd = getattr(request.app["mass"].players, f"async_cmd_{cmd}", None)
elif player_cmd:
success = await player_cmd(player_id)
else:
- return web.Response(text="invalid command", status=501)
+ return Response(text="invalid command", status=501)
result = {"success": success in [True, None]}
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.post("/api/players/{player_id}/play_media/{queue_opt}")
@login_required
-async def async_player_play_media(request: web.Request):
+async def async_player_play_media(request: Request):
"""Issue player play media command."""
player_id = request.match_info.get("player_id")
player_state = request.app["mass"].players.get_player_state(player_id)
if not player_state:
- return web.Response(status=404)
+ return Response(status=404)
queue_opt = QueueOption(request.match_info.get("queue_opt", "play"))
body = await request.json()
media_items = await async_media_items_from_body(request.app["mass"], body)
player_id, media_items, queue_opt
)
result = {"success": success in [True, None]}
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.get("/api/players/{player_id}/queue/items/{queue_item}")
@login_required
-async def async_player_queue_item(request: web.Request):
+async def async_player_queue_item(request: Request):
"""Return item (by index or queue item id) from the player's queue."""
player_id = request.match_info.get("player_id")
item_id = request.match_info.get("queue_item")
queue_item = player_queue.get_item(item_id)
except ValueError:
queue_item = player_queue.by_item_id(item_id)
- return web.Response(
- body=json_serializer(queue_item), content_type="application/json"
- )
+ return Response(body=json_serializer(queue_item), content_type="application/json")
@routes.get("/api/players/{player_id}/queue/items")
@login_required
-async def async_player_queue_items(request: web.Request):
+async def async_player_queue_items(request: Request):
"""Return the items in the player's queue."""
player_id = request.match_info.get("player_id")
player_queue = request.app["mass"].players.get_player_queue(player_id)
@routes.get("/api/players/{player_id}/queue")
@login_required
-async def async_player_queue(request: web.Request):
+async def async_player_queue(request: Request):
"""Return the player queue details."""
player_id = request.match_info.get("player_id")
player_queue = request.app["mass"].players.get_player_queue(player_id)
- return web.Response(
+ return Response(
body=json_serializer(player_queue.to_dict()), content_type="application/json"
)
@routes.put("/api/players/{player_id}/queue/{cmd}")
@login_required
-async def async_player_queue_cmd(request: web.Request):
+async def async_player_queue_cmd(request: Request):
"""Change the player queue details."""
player_id = request.match_info.get("player_id")
player_queue = request.app["mass"].players.get_player_queue(player_id)
await player_queue.async_move_item(cmd_args, 1)
elif cmd == "next":
await player_queue.async_move_item(cmd_args, 0)
- return web.Response(
+ return Response(
body=json_serializer(player_queue.to_dict()), content_type="application/json"
)
@routes.get("/api/players/{player_id}")
@login_required
-async def async_player(request: web.Request):
+async def async_player(request: Request):
"""Get state of single player."""
player_id = request.match_info.get("player_id")
player_state = request.app["mass"].players.get_player_state(player_id)
if not player_state:
- return web.Response(text="invalid player", status=404)
- return web.Response(
+ return Response(text="invalid player", status=404)
+ return Response(
body=json_serializer(player_state.to_dict()), content_type="application/json"
)
"""Playlists API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_media_items_from_body, async_stream_json
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/playlists/{item_id}")
@login_required
-async def async_playlist(request: web.Request):
+async def async_playlist(request: Request):
"""Get full playlist details."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item or provider", status=501)
+ return Response(text="invalid item or provider", status=501)
result = await request.app["mass"].music.async_get_playlist(item_id, provider)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.get("/api/playlists/{item_id}/tracks")
@login_required
-async def async_playlist_tracks(request: web.Request):
+async def async_playlist_tracks(request: Request):
"""Get playlist tracks from provider."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
generator = request.app["mass"].music.async_get_playlist_tracks(item_id, provider)
return await async_stream_json(request, generator)
@routes.put("/api/playlists/{item_id}/tracks")
@login_required
-async def async_add_playlist_tracks(request: web.Request):
+async def async_add_playlist_tracks(request: Request):
"""Add tracks to (editable) playlist."""
item_id = request.match_info.get("item_id")
body = await request.json()
tracks = await async_media_items_from_body(request.app["mass"], body)
result = await request.app["mass"].music.async_add_playlist_tracks(item_id, tracks)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
@routes.delete("/api/playlists/{item_id}/tracks")
@login_required
-async def async_remove_playlist_tracks(request: web.Request):
+async def async_remove_playlist_tracks(request: Request):
"""Remove tracks from (editable) playlist."""
item_id = request.match_info.get("item_id")
body = await request.json()
result = await request.app["mass"].music.async_remove_playlist_tracks(
item_id, tracks
)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
"""Tracks API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_stream_json
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/radios")
@login_required
-async def async_radios(request: web.Request):
+async def async_radios(request: Request):
"""Get all radios known in the database."""
generator = request.app["mass"].database.async_get_radios()
return await async_stream_json(request, generator)
@routes.get("/api/radios/{item_id}")
@login_required
-async def async_radio(request: web.Request):
+async def async_radio(request: Request):
"""Get full radio details."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
result = await request.app["mass"].music.async_get_radio(item_id, provider)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
"""Search API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.models.media_types import MediaType
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/search")
@login_required
-async def async_search(request: web.Request):
+async def async_search(request: Request):
"""Search database and/or providers."""
searchquery = request.rel_url.query.get("query")
media_types_query = request.rel_url.query.get("media_types")
result = await request.app["mass"].music.async_global_search(
searchquery, media_types, limit=limit
)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
"""Players API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef, StreamResponse
from music_assistant.helpers.web import require_local_subnet
from music_assistant.models.media_types import MediaType
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/stream/media/{media_type}/{item_id}")
-async def stream_media(request: web.Request):
+async def stream_media(request: Request):
"""Stream a single audio track."""
media_type = MediaType.from_string(request.match_info["media_type"])
if media_type not in [MediaType.Track, MediaType.Radio]:
- return web.Response(status=404, reason="Media item is not playable!")
+ return Response(status=404, reason="Media item is not playable!")
item_id = request.match_info["item_id"]
provider = request.rel_url.query.get("provider", "database")
media_item = await request.app["mass"].music.async_get_item(
# prepare request
content_type = streamdetails.content_type.value
- resp = web.StreamResponse(
+ resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": f"audio/{content_type}"}
)
resp.enable_chunked_encoding()
@routes.get("/stream/queue/{player_id}")
@require_local_subnet
-async def stream_queue(request: web.Request):
+async def stream_queue(request: Request):
"""Stream a player's queue."""
player_id = request.match_info["player_id"]
if not request.app["mass"].players.get_player_queue(player_id):
- return web.Response(text="invalid queue", status=404)
+ return Response(text="invalid queue", status=404)
# prepare request
- resp = web.StreamResponse(
+ resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
resp.enable_chunked_encoding()
@routes.get("/stream/queue/{player_id}/{queue_item_id}")
@require_local_subnet
-async def stream_queue_item(request: web.Request):
+async def stream_queue_item(request: Request):
"""Stream a single queue item."""
player_id = request.match_info["player_id"]
queue_item_id = request.match_info["queue_item_id"]
# prepare request
- resp = web.StreamResponse(
+ resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
resp.enable_chunked_encoding()
@routes.get("/stream/group/{group_player_id}")
@require_local_subnet
-async def stream_group(request: web.Request):
+async def stream_group(request: Request):
"""Handle streaming to all players of a group. Highly experimental."""
group_player_id = request.match_info["group_player_id"]
if not request.app["mass"].players.get_player_queue(group_player_id):
- return web.Response(text="invalid player id", status=404)
+ return Response(text="invalid player id", status=404)
child_player_id = request.rel_url.query.get("player_id", request.remote)
# prepare request
- resp = web.StreamResponse(
+ resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
resp.enable_chunked_encoding()
"""Radio's API endpoints."""
-from aiohttp import web
+from aiohttp.web import Request, Response, RouteTableDef
from aiohttp_jwt import login_required
from music_assistant.helpers.util import json_serializer
from music_assistant.helpers.web import async_stream_json
-routes = web.RouteTableDef()
+routes = RouteTableDef()
@routes.get("/api/tracks")
@login_required
-async def async_tracks(request: web.Request):
+async def async_tracks(request: Request):
"""Get all tracks known in the database."""
generator = request.app["mass"].database.async_get_tracks()
return await async_stream_json(request, generator)
@routes.get("/api/tracks/{item_id}/versions")
@login_required
-async def async_track_versions(request: web.Request):
+async def async_track_versions(request: Request):
"""Get all versions of an track."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
if item_id is None or provider is None:
- return web.Response(text="invalid item_id or provider", status=501)
+ return Response(text="invalid item_id or provider", status=501)
generator = request.app["mass"].music.async_get_track_versions(item_id, provider)
return await async_stream_json(request, generator)
@routes.get("/api/tracks/{item_id}")
@login_required
-async def async_track(request: web.Request):
+async def async_track(request: Request):
"""Get full track details."""
item_id = request.match_info.get("item_id")
provider = request.rel_url.query.get("provider")
lazy = request.rel_url.query.get("lazy", "true") != "false"
if item_id is None or provider is None:
- return web.Response(text="invalid item or provider", status=501)
+ return Response(text="invalid item or provider", status=501)
result = await request.app["mass"].music.async_get_track(
item_id, provider, lazy=lazy
)
- return web.Response(body=json_serializer(result), content_type="application/json")
+ return Response(body=json_serializer(result), content_type="application/json")
import logging
from asyncio import CancelledError
-import aiohttp
import jwt
import orjson
+from aiohttp import WSMsgType
+from aiohttp.web import Request, RouteTableDef, WebSocketResponse
from music_assistant.helpers.util import json_serializer
-routes = aiohttp.web.RouteTableDef()
+routes = RouteTableDef()
LOGGER = logging.getLogger("websocket")
@routes.get("/ws")
-async def async_websocket_handler(request: aiohttp.web.Request):
+async def async_websocket_handler(request: Request):
"""Handle websockets connection."""
ws_response = None
authenticated = False
remove_callbacks = []
try:
- ws_response = aiohttp.web.WebSocketResponse()
+ ws_response = WebSocketResponse()
await ws_response.prepare(request)
# callback for internal events
if hasattr(msg_details, "to_dict"):
msg_details = msg_details.to_dict()
ws_msg = {"message": msg, "message_details": msg_details}
- try:
- await ws_response.send_str(json_serializer(ws_msg).decode())
- except AssertionError:
- LOGGER.debug("trying to send message to ws while disconnected")
+ await ws_response.send_str(json_serializer(ws_msg).decode())
# process incoming messages
async for msg in ws_response:
- if msg.type != aiohttp.WSMsgType.TEXT:
+ if msg.type != WSMsgType.TEXT:
# not sure when/if this happens but log it anyway
LOGGER.warning(msg.data)
continue