if entry.type == ConfigEntryType.LABEL:
result.value = result.label
if not isinstance(result.value, expected_type):
- if result.value is None and allow_none:
- # In some cases we allow this (e.g. create default config)
- result.value = result.default_value
+ # value type does not match
+ try:
+ # try to simply convert it
+ result.value = expected_type(result.value)
return result
+ except ValueError:
+ pass
# handle common conversions/mistakes
if expected_type == float and isinstance(result.value, int):
result.value = float(result.value)
if expected_type == int and isinstance(result.value, float):
result.value = int(result.value)
return result
+ # fallback to default
+ if result.value is None and allow_none:
+ # In some cases we allow this (e.g. create default config)
+ result.value = result.default_value
+ return result
if entry.default_value:
LOGGER.warning(
"%s has unexpected type: %s, fallback to default",
ConfigValueOption("info", "INFO"),
ConfigValueOption("warning", "WARNING"),
ConfigValueOption("error", "ERROR"),
- ConfigValueOption("debug", "DEBIG"),
+ ConfigValueOption("debug", "DEBUG"),
],
default_value="GLOBAL",
description="Set the log verbosity for this provider",
# try to load the provider first to catch errors before we save it.
if config.enabled:
await self.mass.load_provider(config)
+ else:
+ await self.mass.unload_provider(config.instance_id)
# load succeeded, save new config
conf_key = f"{CONF_PROVIDERS}/{instance_id}"
self.set(conf_key, config.to_raw())
for prov_item in prov_items
if loose_compare_strings(album.name, prov_item.name)
}
- # make sure that the 'base' version is included
+ # make sure that the 'base' version is NOT included
for prov_version in album.provider_mappings:
- if prov_version.item_id in all_versions:
- continue
- album_copy = Album.from_dict(album.to_dict())
- album_copy.item_id = prov_version.item_id
- album_copy.provider = prov_version.provider_domain
- album_copy.provider_mappings = {prov_version}
- all_versions[prov_version.item_id] = album_copy
+ all_versions.pop(prov_version.item_id, None)
# return the aggregated result
return all_versions.values()
assert not (db_rows and not recursive), "Tracks attached to album"
for db_row in db_rows:
with contextlib.suppress(MediaNotFoundError):
- await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
+ await self.mass.music.tracks.delete_db_item(db_row["item_id"], recursive)
# delete the album itself from db
await super().delete_db_item(item_id)
for prov_item in prov_items
if loose_compare_strings(radio.name, prov_item.name)
}
- # make sure that the 'base' version is included
+ # make sure that the 'base' version is NOT included
for prov_version in radio.provider_mappings:
- if prov_version.item_id in all_versions:
- continue
- radio_copy = Radio.from_dict(radio.to_dict())
- radio_copy.item_id = prov_version.item_id
- radio_copy.provider = prov_version.provider_domain
- radio_copy.provider_mappings = {prov_version}
- all_versions[prov_version.item_id] = radio_copy
+ all_versions.pop(prov_version.item_id, None)
# return the aggregated result
return all_versions.values()
if loose_compare_strings(track.name, prov_item.name)
and compare_artists(prov_item.artists, track.artists, any_match=True)
}
- # make sure that the 'base' version is included
+ # make sure that the 'base' version is NOT included
for prov_version in track.provider_mappings:
- if prov_version.item_id in all_versions:
- continue
- # grab full item here including album details etc
- prov_track = await self.get_provider_item(
- prov_version.item_id, prov_version.provider_instance
- )
- all_versions[prov_version.item_id] = prov_track
+ all_versions.pop(prov_version.item_id, None)
# return the aggregated result
return all_versions.values()
from __future__ import annotations
import asyncio
+import contextlib
import logging
import os
import urllib.parse
from aiohttp import web
from music_assistant.common.models.enums import ImageType, MediaType, ProviderFeature, ProviderType
+from music_assistant.common.models.errors import ProviderUnavailableError
from music_assistant.common.models.media_items import (
Album,
Artist,
self._pref_lang = lang.upper()
def start_scan(self) -> None:
- """Start background scan for missing Artist metadata."""
+ """Start background scan for missing metadata."""
async def scan_artist_metadata():
"""Background task that scans for artists missing metadata on filesystem providers."""
LOGGER.info("Start scan for missing artist metadata")
self.scan_busy = True
- for prov in self.mass.music.providers:
- if not prov.is_file():
+ async for artist in self.mass.music.artists.iter_db_items():
+ if artist.metadata.last_refresh is not None:
continue
- async for artist in self.mass.music.artists.iter_db_items_by_prov_id(
- provider_instance=prov.instance_id
- ):
- if artist.metadata.last_refresh is not None:
- continue
- # simply grabbing the full artist will trigger a full fetch
+ # most important is to see artist thumb in listings
+ # so if that is already present, move on
+ # full details can be grabbed later
+ if artist.image:
+ continue
+ # simply grabbing the full artist will trigger a full fetch
+ with contextlib.suppress(ProviderUnavailableError):
await self.mass.music.artists.get(artist.item_id, artist.provider, lazy=False)
- # this is slow on purpose to not cause stress on the metadata providers
- await asyncio.sleep(30)
+ # this is slow on purpose to not cause stress on the metadata providers
+ await asyncio.sleep(30)
self.scan_busy = False
LOGGER.info("Finished scan for missing artist metadata")
from music_assistant.server import MusicAssistant
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.music")
+SYNC_INTERVAL = 3 * 3600
class MusicController:
"""Async initialize of module."""
# setup library database
await self._setup_database()
+ self.mass.create_task(self.start_sync(reschedule=SYNC_INTERVAL))
async def close(self) -> None:
"""Cleanup on exit."""
self,
media_types: list[MediaType] | None = None,
providers: list[str] | None = None,
+ reschedule: int | None = None,
) -> None:
"""Start running the sync of (all or selected) musicproviders.
if provider.instance_id not in providers:
continue
self._start_provider_sync(provider.instance_id, media_types)
- # trgger metadata scan after provider sync completed
+ # trigger metadata scan after provider sync completed
self.mass.metadata.start_scan()
+ # reschedule task if needed
+ def create_sync_task():
+ self.mass.create_task(self.start_sync, media_types, providers, reschedule)
+
+ if reschedule is not None:
+ self.mass.loop.call_later(reschedule, create_sync_task)
+
@api_command("music/synctasks")
def get_running_sync_tasks(self) -> list[SyncTask]:
"""Return list with providers that are currently syncing."""
"""Retrieve metadata for an artist on this Metadata provider."""
if ProviderFeature.ARTIST_METADATA in self.supported_features:
raise NotImplementedError
- return
async def get_album_metadata(self, album: Album) -> MediaItemMetadata | None:
"""Retrieve metadata for an album on this Metadata provider."""
if ProviderFeature.ALBUM_METADATA in self.supported_features:
raise NotImplementedError
- return
async def get_track_metadata(self, track: Track) -> MediaItemMetadata | None:
"""Retrieve metadata for a track on this Metadata provider."""
if ProviderFeature.TRACK_METADATA in self.supported_features:
raise NotImplementedError
- return
async def get_musicbrainz_artist_id(
self, artist: Artist, ref_albums: Iterable[Album], ref_tracks: Iterable[Track]
"""Discover MusicBrainzArtistId for an artist given some reference albums/tracks."""
if ProviderFeature.GET_ARTIST_MBID in self.supported_features:
raise NotImplementedError
- return
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.enums import ProviderFeature
-from music_assistant.common.models.errors import MediaNotFoundError, MusicAssistantError
+from music_assistant.common.models.errors import (
+ InvalidDataError,
+ MediaNotFoundError,
+ MusicAssistantError,
+)
from music_assistant.common.models.media_items import (
Album,
AlbumType,
from .helpers import get_parentdir
+CONF_MISSING_ALBUM_ARTIST_ACTION = "missing_album_artist_action"
+
TRACK_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
PLAYLIST_EXTENSIONS = ("m3u", "pls")
SUPPORTED_EXTENSIONS = TRACK_EXTENSIONS + PLAYLIST_EXTENSIONS
if prev_checksums is None:
prev_checksums = {}
+ # process all deleted (or renamed) files first
+ cur_filenames = set()
+ async for item in self.listdir("", recursive=True):
+ if "." not in item.name or not item.ext:
+ # skip system files and files without extension
+ continue
+
+ if item.ext not in SUPPORTED_EXTENSIONS:
+ # unsupported file extension
+ continue
+ cur_filenames.add(item.path)
+ # work out deletions
+ deleted_files = set(prev_checksums.keys()) - cur_filenames
+ await self._process_deletions(deleted_files)
+
# find all music files in the music directory and all subfolders
# we work bottom up, as-in we derive all info from the tracks
cur_checksums = {}
# store (final) checksums in cache
await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
- # work out deletions
- deleted_files = set(prev_checksums.keys()) - set(cur_checksums.keys())
- await self._process_deletions(deleted_files)
async def _process_deletions(self, deleted_files: set[str]) -> None:
"""Process all deletions."""
if db_item := await controller.get_db_item_by_prov_id(
file_path, provider_instance=self.instance_id
):
- await controller.remove_prov_mapping(db_item.item_id, self.instance_id)
+ await controller.delete_db_item(db_item.item_id, True)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
artist.musicbrainz_id = tags.musicbrainz_albumartistids[index]
album_artists.append(artist)
else:
- # always fallback to various artists as album artist if user did not tag album artist
- # ID3 tag properly because we must have an album artist
- self.logger.warning(
- "%s is missing ID3 tag [albumartist], using %s as fallback",
- file_item.path,
- VARIOUS_ARTISTS,
- )
- album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS)]
+ # album artist tag is missing, determine fallback
+ fallback_action = self.config.get_value(CONF_MISSING_ALBUM_ARTIST_ACTION)
+ if fallback_action == "various_artists":
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using %s as fallback",
+ file_item.path,
+ VARIOUS_ARTISTS,
+ )
+ album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS)]
+ elif fallback_action == "track_artist":
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
+ file_item.path,
+ )
+ album_artists = [
+ await self._parse_artist(name=track_artist_str)
+ for track_artist_str in tags.artists
+ ]
+ else:
+ # default action is to skip the track
+ raise InvalidDataError(f"{file_item.path} is missing ID3 tag [albumartist]")
track.album = await self._parse_album(
tags.album,
artist := next((x for x in track.album.artists if x.name == track_artist_str), None)
):
track.artists.append(artist)
- continue
- artist = await self._parse_artist(track_artist_str)
+ else:
+ artist = await self._parse_artist(track_artist_str)
if not artist.musicbrainz_id:
with contextlib.suppress(IndexError):
artist.musicbrainz_id = tags.musicbrainz_artistids[index]
# try to parse albumtype
if track.album and track.album.album_type == AlbumType.UNKNOWN:
album_type = tags.album_type
- if album_type and "compilation" in album_type:
- track.album.album_type = AlbumType.COMPILATION
- elif album_type and "single" in album_type:
- track.album.album_type = AlbumType.SINGLE
- elif album_type and "album" in album_type:
- track.album.album_type = AlbumType.ALBUM
- elif track.album.sort_name in track.sort_name:
- track.album.album_type = AlbumType.SINGLE
+ try:
+ track.album.album_type = AlbumType(album_type)
+ except (ValueError, KeyError):
+ if track.album.sort_name in track.sort_name:
+ track.album.album_type = AlbumType.SINGLE
# set checksum to invalidate any cached listings
checksum_timestamp = str(int(time()))
parentdir = os.path.dirname(base_path)
for _ in range(3):
dirname = parentdir.rsplit(os.sep)[-1]
+ dirname = dirname.split("(")[0].split("[")[0].strip()
if compare_strings(name, dirname, False):
return parentdir
parentdir = os.path.dirname(parentdir)
"type": "string",
"label": "Path",
"default_value": "/media"
+ },
+ {
+ "key": "missing_album_artist_action",
+ "type": "string",
+ "label": "Action when a track is missing the Albumartist ID3 tag",
+ "default_value": "skip",
+ "description": "Music Assistant prefers information stored in ID3 tags and only uses online sources for additional metadata. This means that the ID3 tags need to be accurate, preferably tagged with MusicBrainz Picard.",
+ "advanced": true,
+ "required": false,
+ "options": [
+ { "title": "Skip track and log warning", "value": "skip" },
+ { "title": "Use Track artist(s)", "value": "track_artist" },
+ { "title": "Use Various Artists", "value": "various_artists" }
+ ]
}
],
"description": "Controls whether the NetBIOS over TCP/IP (is_direct_tcp=False) or the newer Direct hosting of SMB over TCP/IP (is_direct_tcp=True) will be used for the communication. The default parameter is False which will use NetBIOS over TCP/IP for wider compatibility (TCP port: 139).",
"advanced": true,
"required": false
+ },
+ {
+ "key": "missing_album_artist_action",
+ "type": "string",
+ "label": "Action when a track is missing the Albumartist ID3 tag",
+ "default_value": "skip",
+ "description": "Music Assistant prefers information stored in ID3 tags and only uses online sources for additional metadata. This means that the ID3 tags need to be accurate, preferably tagged with MusicBrainz Picard.",
+ "advanced": true,
+ "required": false,
+ "options": [
+ { "title": "Skip track and log warning", "value": "skip" },
+ { "title": "Use Track artist(s)", "value": "track_artist" },
+ { "title": "Use Various Artists", "value": "various_artists" }
+ ]
}
],
def playlist_item_from_mass(queue_item: QueueItem, index: int = 0) -> PlaylistItem:
"""Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
if queue_item.media_item and queue_item.media_type == MediaType.TRACK:
- artist = queue_item.media_item.artist.name
- album = queue_item.media_item.album.name
+ artist = queue_item.media_item.artist.name if queue_item.media_item.artist else ""
+ album = queue_item.media_item.album.name if queue_item.media_item.album else ""
title = queue_item.media_item.name
elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
if " - " in queue_item.streamdetails.stream_title:
from __future__ import annotations
import asyncio
+import contextlib
import json
import os
import platform
album = Album(item_id=album_obj["id"], provider=self.domain, name=name, version=version)
for artist_obj in album_obj["artists"]:
album.artists.append(await self._parse_artist(artist_obj))
- if album_obj["album_type"] == "single":
- album.album_type = AlbumType.SINGLE
- elif album_obj["album_type"] == "compilation":
- album.album_type = AlbumType.COMPILATION
- elif album_obj["album_type"] == "album":
- album.album_type = AlbumType.ALBUM
+
+ with contextlib.suppress(ValueError):
+ album.album_type = AlbumType(album_obj["album_type"])
+
if "genres" in album_obj:
album.metadata.genre = set(album_obj["genres"])
if album_obj.get("images"):
"Single": AlbumType.SINGLE,
"Compilation": AlbumType.COMPILATION,
"Album": AlbumType.ALBUM,
+ "EP": AlbumType.EP,
}
provider_instance=self.instance_id,
)
)
+ if authors := playlist_obj.get("author"):
+ playlist.owner = authors[0]["name"] if isinstance(authors, list) else authors["name"]
+ else:
+ playlist.owner = self.instance_id
playlist.metadata.checksum = playlist_obj.get("checksum")
return playlist