CONF_ANNOUNCE_VOLUME_MIN: Final[str] = "announce_volume_min"
CONF_ANNOUNCE_VOLUME_MAX: Final[str] = "announce_volume_max"
CONF_ICON: Final[str] = "icon"
+CONF_LANGUAGE: Final[str] = "language"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
# only allow setting raw values if main entry exists
msg = f"Invalid provider_instance: {provider_instance}"
raise KeyError(msg)
- self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value)
+ self.set(f"{CONF_PROVIDERS}/{provider_instance}/values/{key}", value)
+
+ def set_raw_core_config_value(self, core_module: str, key: str, value: ConfigValueType) -> None:
+ """
+ Set (raw) single config(entry) value for a core controller.
+
+ Note that this only stores the (raw) value without any validation or default.
+ """
+ if not self.get(f"{CONF_CORE}/{core_module}"):
+ # create base object first if needed
+ self.set(f"{CONF_CORE}/{core_module}", CoreConfig({}, core_module).to_raw())
+ self.set(f"{CONF_CORE}/{core_module}/values/{key}", value)
def set_raw_player_config_value(self, player_id: str, key: str, value: ConfigValueType) -> None:
"""
from base64 import b64encode
from contextlib import suppress
from time import time
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
from uuid import uuid4
import aiofiles
from aiohttp import web
-from music_assistant.common.models.enums import ImageType, MediaType, ProviderFeature, ProviderType
+from music_assistant.common.models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ ProviderType,
+)
from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
from music_assistant.common.models.media_items import (
Album,
Radio,
Track,
)
-from music_assistant.constants import VARIOUS_ARTISTS_ID_MBID, VARIOUS_ARTISTS_NAME
+from music_assistant.constants import CONF_LANGUAGE, VARIOUS_ARTISTS_ID_MBID, VARIOUS_ARTISTS_NAME
+from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.images import create_collage, get_image_thumb
from music_assistant.server.models.core_controller import CoreController
from music_assistant.server.models.metadata_provider import MetadataProvider
from music_assistant.server.providers.musicbrainz import MusicbrainzProvider
+LOCALES = {
+ "af_ZA": "African",
+ "ar_AE": "Arabic (United Arab Emirates)",
+ "ar_EG": "Arabic (Egypt)",
+ "ar_SA": "Saudi Arabia",
+ "bg_BG": "Bulgarian",
+ "cs_CZ": "Czech",
+ "zh_CN": "Chinese",
+ "hr_HR": "Croatian",
+ "da_DK": "Danish",
+ "de_DE": "German",
+ "el_GR": "Greek",
+ "en_US": "English (US)",
+ "en_UK": "English (UK)",
+ "es_ES": "Spanish",
+ "et_EE": "Estonian",
+ "fi_FI": "Finnish",
+ "fr_FR": "French",
+ "hu_HU": "Hungarian",
+ "it_IT": "Italian",
+ "lt_LT": "Lithuanian",
+ "lv_LV": "Latvian",
+ "nl_NL": "Dutch",
+ "no_NO": "Norwegian",
+ "pl_PL": "Polish",
+ "pt_PT": "Portuguese",
+ "ro_RO": "Romanian",
+ "ru_RU": "Russian",
+ "sk_SK": "Slovak",
+ "sl_SI": "Slovenian",
+ "sv_SE": "Swedish",
+ "tr_TR": "Turkish",
+}
+
+DEFAULT_LANGUAGE = "en_US"
+
class MetaDataController(CoreController):
"""Several helpers to search and store metadata for mediaitems."""
)
self.manifest.icon = "book-information-variant"
+ async def get_config_entries(
+ self,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+ ) -> tuple[ConfigEntry, ...]:
+ """Return all Config Entries for this core module (if any)."""
+ return (
+ ConfigEntry(
+ key=CONF_LANGUAGE,
+ type=ConfigEntryType.STRING,
+ label="Preferred language",
+ required=False,
+ default_value=DEFAULT_LANGUAGE,
+ description="Preferred language for metadata.\n\n"
+ "Note that English will always be used as fallback when content "
+ "in your preferred language is not available.",
+ options=tuple(ConfigValueOption(value, key) for key, value in LOCALES.items()),
+ ),
+ )
+
async def setup(self, config: CoreConfig) -> None:
"""Async initialize of module."""
self.mass.streams.register_dynamic_route("/imageproxy", self.handle_imageproxy)
@property
def providers(self) -> list[MetadataProvider]:
"""Return all loaded/running MetadataProviders."""
- return self.mass.get_providers(ProviderType.METADATA) # type: ignore[return-value]
+ if TYPE_CHECKING:
+ return cast(list[MetadataProvider], self.mass.get_providers(ProviderType.METADATA))
+ return self.mass.get_providers(ProviderType.METADATA)
@property
def preferred_language(self) -> str:
- """Return preferred language for metadata as 2 letter country code (uppercase).
+ """Return preferred language for metadata (as 2 letter language code 'en')."""
+ return self.locale.split("_")[0]
- Defaults to English (EN).
- """
- return self._pref_lang or "EN"
+ @property
+ def locale(self) -> str:
+ """Return preferred language for metadata (as full locale code 'en_EN')."""
+ return self.mass.config.get_raw_core_config_value(
+ self.domain, CONF_LANGUAGE, DEFAULT_LANGUAGE
+ )
- @preferred_language.setter
- def preferred_language(self, lang: str) -> None:
- """Set preferred language to 2 letter country code.
+ @api_command("metadata/set_default_preferred_language")
+ def set_default_preferred_language(self, lang: str) -> None:
+ """
+ Set the (default) preferred language.
- Can only be set once.
+ Reasoning behind this is that the backend can not make a wise choice for the default,
+ so relies on some external source that knows better to set this info, like the frontend
+ or a streaming provider.
+ Can only be set once (by this call or the user).
"""
- if self._pref_lang is None:
- self._pref_lang = lang.upper()
+ if self.mass.config.get_raw_core_config_value(self.domain, CONF_LANGUAGE):
+ return # already set
+ if lang in LOCALES:
+ self.mass.config.set_raw_core_config_value(self.domain, CONF_LANGUAGE, lang)
+ return
+ lang = lang.lower()
+ # try strict match first
+ for locale_code, lang_name in LOCALES.items():
+ if lang in (locale_code.lower(), lang_name.lower()):
+ self.mass.config.set_raw_core_config_value(self.domain, CONF_LANGUAGE, locale_code)
+ return
+ # attempt loose match on either language or country code
+ for locale_code in tuple(LOCALES):
+ language_code, region_code = locale_code.lower().split("_", 1)
+ if lang in (language_code, region_code):
+ self.mass.config.set_raw_core_config_value(self.domain, CONF_LANGUAGE, locale_code)
+ return
+ # if we reach this point, we couldn't match the language
+ self.logger.warning("%s is not a valid language", lang)
def start_scan(self) -> None:
"""Start background scan for missing metadata."""
# handle regular provider listing, always add back folder first
if not prov or not sub_path:
yield BrowseFolder(item_id="root", provider="library", path="root", name="..")
+ if not prov:
+ return
else:
back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
yield BrowseFolder(
async def _handle_ws_client(self, request: web.Request) -> web.WebSocketResponse:
connection = WebsocketClientHandler(self, request)
+ if lang := request.headers.get("Accept-Language"):
+ self.mass.metadata.set_default_preferred_language(lang.split(",")[0])
try:
self.clients.add(connection)
return await connection.handle_client()
key = f"{method}.{path}"
self._dynamic_routes.pop(key)
- async def serve_static(self, file_path: str, _request: web.Request) -> web.FileResponse:
+ async def serve_static(self, file_path: str, request: web.Request) -> web.FileResponse:
"""Serve file response."""
headers = {"Cache-Control": "no-cache"}
return web.FileResponse(file_path, headers=headers)
self.logger.info(
"Successfully logged in to Qobuz as %s", details["user"]["display_name"]
)
- self.mass.metadata.preferred_language = details["user"]["country_code"]
+ self.mass.metadata.set_default_preferred_language(details["user"]["country_code"])
return details["user_auth_token"]
return None
# pylint: disable=too-many-branches
url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
headers = {"X-App-Id": app_var(0)}
+ locale = self.mass.metadata.locale.replace("_", "-")
+ language = locale.split("-")[0]
+ headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
if endpoint != "user/login":
auth_token = await self._auth_token()
if not auth_token:
# return existing token if we have one in memory
if (
self._auth_token
- and asyncio.to_thread(os.path.isdir, self._cache_dir)
+ and await asyncio.to_thread(os.path.isdir, self._cache_dir)
and (self._auth_token["expiresAt"] > int(time.time()) + 600)
):
return self._auth_token
if tokeninfo and userinfo:
self._auth_token = tokeninfo
self._sp_user = userinfo
- self.mass.metadata.preferred_language = userinfo["country"]
+ self.mass.metadata.set_default_preferred_language(userinfo["country"])
self.logger.info("Successfully logged in to Spotify as %s", userinfo["id"])
self._auth_token = tokeninfo
return tokeninfo
if tokeninfo is None:
tokeninfo = await self.login()
headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'}
+ locale = self.mass.metadata.locale.replace("_", "-")
+ language = locale.split("-")[0]
+ headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
async with (
self._throttler,
self.mass.http_session.get(
if link := artist_obj.get(key):
metadata.links.add(MediaItemLink(type=link_type, url=link))
# description/biography
- if desc := artist_obj.get(f"strBiography{self.mass.metadata.preferred_language}"):
+ lang_code, lang_country = self.mass.metadata.locale.split("_")
+ if desc := artist_obj.get(f"strBiography{lang_country}") or (
+ desc := artist_obj.get(f"strBiography{lang_code.upper()}")
+ ):
metadata.description = desc
else:
metadata.description = artist_obj.get("strBiographyEN")
)
# description
- if desc := album_obj.get(f"strDescription{self.mass.metadata.preferred_language}"):
+ lang_code, lang_country = self.mass.metadata.locale.split("_")
+ if desc := album_obj.get(f"strDescription{lang_country}") or (
+ desc := album_obj.get(f"strDescription{lang_code.upper()}")
+ ):
metadata.description = desc
else:
metadata.description = album_obj.get("strDescriptionEN")
metadata.genres = {genre}
metadata.mood = track_obj.get("strMood")
# description
- if desc := track_obj.get(f"strDescription{self.mass.metadata.preferred_language}"):
+ lang_code, lang_country = self.mass.metadata.locale.split("_")
+ if desc := track_obj.get(f"strDescription{lang_country}") or (
+ desc := track_obj.get(f"strDescription{lang_code.upper()}")
+ ):
metadata.description = desc
else:
metadata.description = track_obj.get("strDescriptionEN")
kwargs["username"] = self.config.get_value(CONF_USERNAME)
kwargs["partnerId"] = "1"
kwargs["render"] = "json"
+ locale = self.mass.metadata.locale.replace("_", "-")
+ language = locale.split("-")[0]
+ headers = {"Accept-Language": f"{locale}, {language};q=0.9, *;q=0.5"}
async with (
self._throttler,
- self.mass.http_session.get(url, params=kwargs, ssl=False) as response,
+ self.mass.http_session.get(url, params=kwargs, headers=headers, ssl=False) as response,
):
result = await response.json()
if not result or "error" in result:
from urllib.parse import unquote
import pytube
+from ytmusicapi.constants import SUPPORTED_LANGUAGES
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
await self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
self._signature_timestamp = await self._get_signature_timestamp()
+ # get default language (that is supported by YTM)
+ mass_locale = self.mass.metadata.locale
+ for lang_code in SUPPORTED_LANGUAGES:
+ if lang_code in (mass_locale, mass_locale.split("_")[0]):
+ self.language = lang_code
+ break
+ else:
+ self.language = "en"
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
ytm_filter = "songs"
if media_types[0] == MediaType.PLAYLIST:
ytm_filter = "playlists"
- results = await search(query=search_query, ytm_filter=ytm_filter, limit=limit)
+ results = await search(
+ query=search_query, ytm_filter=ytm_filter, limit=limit, language=self.language
+ )
parsed_results = SearchResults()
for result in results:
try:
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Youtube Music."""
await self._check_oauth_token()
- artists_obj = await get_library_artists(
- headers=self._headers,
- )
+ artists_obj = await get_library_artists(headers=self._headers, language=self.language)
for artist in artists_obj:
yield await self._parse_artist(artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Youtube Music."""
await self._check_oauth_token()
- albums_obj = await get_library_albums(
- headers=self._headers,
- )
+ albums_obj = await get_library_albums(headers=self._headers, language=self.language)
for album in albums_obj:
yield await self._parse_album(album, album["browseId"])
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
await self._check_oauth_token()
- playlists_obj = await get_library_playlists(
- headers=self._headers,
- )
+ playlists_obj = await get_library_playlists(headers=self._headers, language=self.language)
for playlist in playlists_obj:
yield await self._parse_playlist(playlist)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Youtube Music."""
await self._check_oauth_token()
- tracks_obj = await get_library_tracks(
- headers=self._headers,
- )
+ tracks_obj = await get_library_tracks(headers=self._headers, language=self.language)
for track in tracks_obj:
# Library tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
await self._check_oauth_token()
- if album_obj := await get_album(prov_album_id=prov_album_id):
+ if album_obj := await get_album(prov_album_id=prov_album_id, language=self.language):
return await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
"""Get album tracks for given album id."""
await self._check_oauth_token()
- album_obj = await get_album(prov_album_id=prov_album_id)
+ album_obj = await get_album(prov_album_id=prov_album_id, language=self.language)
if not album_obj.get("tracks"):
return []
tracks = []
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
await self._check_oauth_token()
- if artist_obj := await get_artist(prov_artist_id=prov_artist_id, headers=self._headers):
+ if artist_obj := await get_artist(
+ prov_artist_id=prov_artist_id, headers=self._headers, language=self.language
+ ):
return await self._parse_artist(artist_obj=artist_obj)
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
prov_track_id=prov_track_id,
headers=self._headers,
signature_timestamp=self._signature_timestamp,
+ language=self.language,
):
return await self._parse_track(track_obj)
msg = f"Item {prov_track_id} not found"
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
if playlist_obj := await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers
+ prov_playlist_id=prov_playlist_id, headers=self._headers, language=self.language
):
return await self._parse_playlist(playlist_obj)
msg = f"Item {prov_playlist_id} not found"
from music_assistant.server.helpers.auth import AuthenticationHelper
-async def get_artist(prov_artist_id: str, headers: dict[str, str]) -> dict[str, str]:
+async def get_artist(
+ prov_artist_id: str, headers: dict[str, str], language: str = "en"
+) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_artist function."""
def _get_artist():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
try:
artist = ytm.get_artist(channelId=prov_artist_id)
# ChannelId can sometimes be different and original ID is not part of the response
return await asyncio.to_thread(_get_artist)
-async def get_album(prov_album_id: str) -> dict[str, str]:
+async def get_album(prov_album_id: str, language: str = "en") -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_album function."""
def _get_album():
- ytm = ytmusicapi.YTMusic()
+ ytm = ytmusicapi.YTMusic(language=language)
return ytm.get_album(browseId=prov_album_id)
return await asyncio.to_thread(_get_album)
-async def get_playlist(prov_playlist_id: str, headers: dict[str, str]) -> dict[str, str]:
+async def get_playlist(
+ prov_playlist_id: str, headers: dict[str, str], language: str = "en"
+) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_playlist():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=None)
playlist["checksum"] = get_playlist_checksum(playlist)
return playlist
async def get_track(
- prov_track_id: str, headers: dict[str, str], signature_timestamp: str
+ prov_track_id: str, headers: dict[str, str], signature_timestamp: str, language: str = "en"
) -> dict[str, str] | None:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_song():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
track_obj = ytm.get_song(videoId=prov_track_id, signatureTimestamp=signature_timestamp)
track = {}
if "videoDetails" not in track_obj:
return await asyncio.to_thread(_get_song)
-async def get_library_artists(headers: dict[str, str]) -> dict[str, str]:
+async def get_library_artists(headers: dict[str, str], language: str = "en") -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_artists function."""
def _get_library_artists():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
artists = ytm.get_library_subscriptions(limit=9999)
# Sync properties with uniformal artist object
for artist in artists:
return await asyncio.to_thread(_get_library_artists)
-async def get_library_albums(headers: dict[str, str]) -> dict[str, str]:
+async def get_library_albums(headers: dict[str, str], language: str = "en") -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_albums function."""
def _get_library_albums():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
return ytm.get_library_albums(limit=9999)
return await asyncio.to_thread(_get_library_albums)
-async def get_library_playlists(headers: dict[str, str]) -> dict[str, str]:
+async def get_library_playlists(headers: dict[str, str], language: str = "en") -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_playlists function."""
def _get_library_playlists():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
playlists = ytm.get_library_playlists(limit=9999)
# Sync properties with uniformal playlist object
for playlist in playlists:
return await asyncio.to_thread(_get_library_playlists)
-async def get_library_tracks(headers: dict[str, str]) -> dict[str, str]:
+async def get_library_tracks(headers: dict[str, str], language: str = "en") -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_tracks function."""
def _get_library_tracks():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
return ytm.get_library_songs(limit=9999)
return await asyncio.to_thread(_get_library_tracks)
return await asyncio.to_thread(_get_song_radio_tracks)
-async def search(query: str, ytm_filter: str | None = None, limit: int = 20) -> list[dict]:
+async def search(
+ query: str, ytm_filter: str | None = None, limit: int = 20, language: str = "en"
+) -> list[dict]:
"""Async wrapper around the ytmusicapi search function."""
def _search():
- ytm = ytmusicapi.YTMusic()
+ ytm = ytmusicapi.YTMusic(language=language)
results = ytm.search(query=query, filter=ytm_filter, limit=limit)
# Sync result properties with uniformal objects
for result in results: