DB_TABLE_ARTISTS,
DB_TABLE_PROVIDER_MAPPINGS,
DB_TABLE_TRACK_ARTISTS,
+ VARIOUS_ARTISTS_MBID,
VARIOUS_ARTISTS_NAME,
)
+from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.compare import compare_strings, create_safe_string
from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
-from music_assistant.server.helpers.tags import parse_tags, split_items
+from music_assistant.server.helpers.tags import AudioTags, parse_tags, split_items
from music_assistant.server.models.music_provider import MusicProvider
from .helpers import get_album_dir, get_artist_dir, get_disc_dir
db_artist = await self.mass.music.artists.get_library_item_by_prov_id(
prov_artist_id, self.instance_id
)
- if db_artist is None:
+ if not db_artist:
+ # this should not be possible, but just in case
msg = f"Artist not found: {prov_artist_id}"
raise MediaNotFoundError(msg)
+ # prov_artist_id is either an actual (relative) path or a name (as fallback)
+ safe_artist_name = create_safe_string(prov_artist_id, lowercase=False, replace_space=False)
if await self.exists(prov_artist_id):
- # if path exists on disk allow parsing full details to allow refresh of metadata
- return await self._parse_artist(db_artist.name, artist_path=prov_artist_id)
- return db_artist
+ artist_path = prov_artist_id
+ elif await self.exists(safe_artist_name):
+ artist_path = safe_artist_name
+ else:
+ for prov_mapping in db_artist.provider_mappings:
+ if prov_mapping.provider_instance != self.instance_id:
+ continue
+ if prov_mapping.url:
+ artist_path = prov_mapping.url
+ break
+ else:
+ # this is an artist without an actual path on disk
+ # return the info we already have in the db
+ return db_artist
+
+ artist = Artist(
+ item_id=prov_artist_id,
+ provider=self.instance_id,
+ name=db_artist.name,
+ sort_name=db_artist.sort_name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=prov_artist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=artist_path,
+ )
+ },
+ )
+ # grab additional metadata within the Artist's folder
+ nfo_file = os.path.join(artist_path, "artist.nfo")
+ if await self.exists(nfo_file):
+ # found NFO file with metadata
+ # https://kodi.wiki/view/NFO_files/Artists
+ data = b""
+ async for chunk in self.read_file_content(nfo_file):
+ data += chunk
+ info = await asyncio.to_thread(xmltodict.parse, data)
+ info = info["artist"]
+ artist.name = info.get("title", info.get("name", db_artist.name))
+ if sort_name := info.get("sortname"):
+ artist.sort_name = sort_name
+ if mbid := info.get("musicbrainzartistid"):
+ artist.mbid = mbid
+ if description := info.get("biography"):
+ artist.metadata.description = description
+ if genre := info.get("genre"):
+ artist.metadata.genres = set(split_items(genre))
+ # find local images
+ if images := await self._get_local_images(artist_path):
+ artist.metadata.images = UniqueList(images)
+
+ return artist
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
for track in await self.get_album_tracks(prov_album_id):
for prov_mapping in track.provider_mappings:
if prov_mapping.provider_instance == self.instance_id:
- full_track = await self.get_track(prov_mapping.item_id)
+ file_item = await self.resolve(prov_mapping.item_id)
+ full_track = await self._parse_track(file_item, full_album_metadata=True)
assert isinstance(full_track.album, Album)
return full_track.album
msg = f"Album not found: {prov_album_id}"
raise MediaNotFoundError(msg)
file_item = await self.resolve(prov_track_id)
- return await self._parse_track(file_item)
+ return await self._parse_track(file_item, full_album_metadata=True)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
return file_item.local_path
return file_item.absolute_path
- async def _parse_track(self, file_item: FileSystemItem) -> Track:
+ async def _parse_track(
+ self, file_item: FileSystemItem, full_album_metadata: bool = False
+ ) -> Track:
"""Get full track details by id."""
# ruff: noqa: PLR0915, PLR0912
if acoustid := tags.get("acoustidid"):
track.external_ids.add((ExternalID.ACOUSTID, acoustid))
- album: Album | None = None
- album_artists: list[Artist] = []
-
# album
- if tags.album:
- # work out if we have an album and/or disc folder
- # disc_dir is the folder level where the tracks are located
- # this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
- # or this is an album folder with the disc attached
- disc_dir = get_disc_dir(file_item.path, tags.album, tags.disc)
- album_dir = get_album_dir(file_item.path, tags.album, disc_dir)
-
- # album artist(s)
- if tags.album_artists:
- for index, album_artist_str in enumerate(tags.album_artists):
- artist = await self._parse_artist(
- album_artist_str,
- album_path=album_dir,
- sort_name=(
- tags.album_artist_sort_names[index]
- if index < len(tags.album_artist_sort_names)
- else None
- ),
- )
- if not artist.mbid:
- with contextlib.suppress(IndexError):
- artist.mbid = tags.musicbrainz_albumartistids[index]
- album_artists.append(artist)
- else:
- # album artist tag is missing, determine fallback
- fallback_action = self.config.get_value(CONF_MISSING_ALBUM_ARTIST_ACTION)
- if fallback_action == "folder_name" and album_dir:
- possible_artist_folder = os.path.dirname(album_dir)
- self.logger.warning(
- "%s is missing ID3 tag [albumartist], using foldername %s as fallback",
- file_item.path,
- possible_artist_folder,
- )
- album_artist_str = possible_artist_folder.rsplit(os.sep)[-1]
- album_artists = [await self._parse_artist(name=album_artist_str)]
- # fallback to track artists (if defined by user)
- elif fallback_action == "track_artist":
- self.logger.warning(
- "%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
- file_item.path,
- )
- album_artists = [
- await self._parse_artist(name=track_artist_str)
- for track_artist_str in tags.artists
- ]
- # all other: fallback to various artists
- else:
- self.logger.warning(
- "%s is missing ID3 tag [albumartist], using %s as fallback",
- file_item.path,
- VARIOUS_ARTISTS_NAME,
- )
- album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS_NAME)]
-
- album = track.album = await self._parse_album(
- tags.album,
- track_path=file_item.path,
- album_path=album_dir,
- disc_path=disc_dir,
- artists=album_artists,
- barcode=tags.barcode,
+ album = track.album = (
+ await self._parse_album(
+ track_path=file_item.path, track_tags=tags, full_metadata=full_album_metadata
)
+ if tags.album
+ else None
+ )
# track artist(s)
for index, track_artist_str in enumerate(tags.artists):
- # reuse album artist details if possible
- if album_artist := next((x for x in album_artists if x.name == track_artist_str), None):
- artist = album_artist
- else:
- artist = await self._parse_artist(track_artist_str)
- if not artist.mbid:
- with contextlib.suppress(IndexError):
- artist.mbid = tags.musicbrainz_artistids[index]
- # artist sort name
- with contextlib.suppress(IndexError):
- artist.sort_name = tags.artist_sort_names[index]
+ artist = await self._create_artist_itemmapping(
+ track_artist_str,
+ sort_name=(
+ tags.artist_sort_names[index] if index < len(tags.artist_sort_names) else None
+ ),
+ mbid=(
+ tags.musicbrainz_artistids[index]
+ if index < len(tags.musicbrainz_artistids)
+ else None
+ ),
+ )
track.artists.append(artist)
# handle embedded cover image
if album and not album.metadata.images:
# set embedded cover on album if it does not have one yet
album.metadata.images = track.metadata.images
- # copy album image from track (only if the album itself doesn't have an image)
- # this deals with embedded images from filesystem providers
+ # copy (embedded) album image from track (if the album itself doesn't have an image)
if album and not album.image and track.image:
album.metadata.images = UniqueList([track.image])
if tags.musicbrainz_recordingid:
track.mbid = tags.musicbrainz_recordingid
track.metadata.chapters = UniqueList(tags.chapters)
- if album:
- if not album.mbid and tags.musicbrainz_albumid:
- album.mbid = tags.musicbrainz_albumid
- if tags.musicbrainz_releasegroupid:
- album.add_external_id(ExternalID.MB_RELEASEGROUP, tags.musicbrainz_releasegroupid)
- if not album.year:
- album.year = tags.year
- album.album_type = tags.album_type
- album.metadata.explicit = track.metadata.explicit
return track
- async def _parse_artist(
+ @use_cache(300)
+ async def _create_artist_itemmapping(
self,
name: str,
- artist_path: str | None = None,
album_path: str | None = None,
sort_name: str | None = None,
- ) -> Artist:
- """Parse Artist metadata into an Artist object."""
- cache_key = f"{self.instance_id}-artistdata-{name}-{artist_path}"
- if cache := await self.mass.cache.get(cache_key):
- assert isinstance(cache, Artist)
- return cache
- if not artist_path and album_path:
+ mbid: str | None = None,
+ ) -> ItemMapping:
+ """Create ItemMapping for a track/album artist."""
+ artist_path = None
+ if album_path:
# try to find (album)artist folder based on album path
artist_path = get_artist_dir(album_path=album_path, artist_name=name)
+ if not artist_path:
+ # check if we have an artist folder for this artist at root level
+ safe_artist_name = create_safe_string(name, lowercase=False, replace_space=False)
+ if await self.exists(name):
+ artist_path = name
+ elif await self.exists(safe_artist_name):
+ artist_path = safe_artist_name
if not artist_path:
# check if we have an existing item to retrieve the artist path
async for item in self.mass.music.artists.iter_library_items(search=name):
if not compare_strings(name, item.name):
continue
for prov_mapping in item.provider_mappings:
- if prov_mapping.provider_instance == self.instance_id:
+ if prov_mapping.provider_instance != self.instance_id:
+ continue
+ if prov_mapping.url:
artist_path = prov_mapping.url
break
- if artist_path:
- break
- else:
- # check if we have an artist folder for this artist at root level
- safe_artist_name = create_safe_string(name, lowercase=False, replace_space=False)
- if await self.exists(name):
- artist_path = name
- elif await self.exists(safe_artist_name):
- artist_path = safe_artist_name
-
- if artist_path: # noqa: SIM108
- # prefer the path as id
- item_id = artist_path
- else:
- # simply use the album name as item id
- item_id = name
- artist = Artist(
- item_id=item_id,
+ return ItemMapping(
+ media_type=MediaType.ARTIST,
+ # simply use the artist name as item id as fallback
+ item_id=artist_path or name,
provider=self.instance_id,
name=name,
sort_name=sort_name,
- provider_mappings={
- ProviderMapping(
- item_id=item_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=artist_path,
- )
- },
+ external_ids={(ExternalID.MB_ARTIST, mbid)} if mbid else set(),
)
- if artist_path is None or not await self.exists(artist_path):
- # return basic object if there is no dedicated artist folder
- await self.mass.cache.set(cache_key, artist, expiration=120)
- return artist
-
- nfo_file = os.path.join(artist_path, "artist.nfo")
- if await self.exists(nfo_file):
- # found NFO file with metadata
- # https://kodi.wiki/view/NFO_files/Artists
- data = b""
- async for chunk in self.read_file_content(nfo_file):
- data += chunk
- info = await asyncio.to_thread(xmltodict.parse, data)
- info = info["artist"]
- artist.name = info.get("title", info.get("name", name))
- if sort_name := info.get("sortname"):
- artist.sort_name = sort_name
- if mbid := info.get("musicbrainzartistid"):
- artist.mbid = mbid
- if description := info.get("biography"):
- artist.metadata.description = description
- if genre := info.get("genre"):
- artist.metadata.genres = set(split_items(genre))
- # find local images
- if images := await self._get_local_images(artist_path):
- artist.metadata.images = UniqueList(images)
-
- await self.mass.cache.set(cache_key, artist, expiration=120)
- return artist
-
async def _parse_album(
- self,
- name: str,
- track_path: str,
- album_path: str | None,
- disc_path: str | None,
- artists: list[Artist],
- barcode: str | None = None,
- sort_name: str | None = None,
+ self, track_path: str, track_tags: AudioTags, full_metadata: bool = False
) -> Album:
- """Parse Album metadata into an Album object."""
- cache_key = f"{self.instance_id}-albumdata-{name}-{album_path}"
- if cache := await self.mass.cache.get(cache_key):
- assert isinstance(cache, Album)
- return cache
+ """Parse Album metadata from Track tags."""
+ assert track_tags.album
+ # work out if we have an album and/or disc folder
+ # disc_dir is the folder level where the tracks are located
+ # this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
+ # or this is an album folder with the disc attached
+ disc_dir = get_disc_dir(track_path, track_tags.album, track_tags.disc)
+ album_dir = get_album_dir(track_path, track_tags.album, disc_dir)
+
+ # album artist(s)
+ album_artists: UniqueList[Artist | ItemMapping] = UniqueList()
+ if track_tags.album_artists:
+ for index, album_artist_str in enumerate(track_tags.album_artists):
+ artist = await self._create_artist_itemmapping(
+ album_artist_str,
+ album_path=album_dir,
+ sort_name=(
+ track_tags.album_artist_sort_names[index]
+ if index < len(track_tags.album_artist_sort_names)
+ else None
+ ),
+ mbid=(
+ track_tags.musicbrainz_albumartistids[index]
+ if index < len(track_tags.musicbrainz_albumartistids)
+ else None
+ ),
+ )
+ album_artists.append(artist)
+ else:
+ # album artist tag is missing, determine fallback
+ fallback_action = self.config.get_value(CONF_MISSING_ALBUM_ARTIST_ACTION)
+ if fallback_action == "folder_name" and album_dir:
+ possible_artist_folder = os.path.dirname(album_dir)
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using foldername %s as fallback",
+ track_path,
+ possible_artist_folder,
+ )
+ album_artist_str = possible_artist_folder.rsplit(os.sep)[-1]
+ album_artists = UniqueList(
+ [await self._create_artist_itemmapping(name=album_artist_str)]
+ )
+ # fallback to track artists (if defined by user)
+ elif fallback_action == "track_artist":
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
+ track_path,
+ )
+ album_artists = UniqueList(
+ [
+ await self._create_artist_itemmapping(name=track_artist_str)
+ for track_artist_str in track_tags.artists
+ ]
+ )
+ # all other: fallback to various artists
+ else:
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using %s as fallback",
+ track_path,
+ VARIOUS_ARTISTS_NAME,
+ )
+ album_artists = UniqueList(
+ [
+ await self._create_artist_itemmapping(
+ name=VARIOUS_ARTISTS_NAME, mbid=VARIOUS_ARTISTS_MBID
+ )
+ ]
+ )
- if album_path:
+ if album_dir: # noqa: SIM108
# prefer the path as id
- item_id = album_path
- elif artists:
- # create fake item_id based on artist + album
- item_id = artists[0].name + os.sep + name
+ item_id = album_dir
else:
- # simply use the album name as item id
- album_path = name
+ # create fake item_id based on artist + album
+ item_id = album_artists[0].name + os.sep + track_tags.album
+ name, version = parse_title_and_version(track_tags.album)
album = Album(
item_id=item_id,
provider=self.instance_id,
name=name,
- sort_name=sort_name,
- artists=UniqueList(artists),
+ version=version,
+ sort_name=track_tags.album_sort,
+ artists=album_artists,
provider_mappings={
ProviderMapping(
item_id=item_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=album_path,
+ url=album_dir,
)
},
)
- if barcode:
- album.external_ids.add((ExternalID.BARCODE, barcode))
+ if track_tags.barcode:
+ album.external_ids.add((ExternalID.BARCODE, track_tags.barcode))
+
+ if track_tags.musicbrainz_albumid:
+ album.mbid = track_tags.musicbrainz_albumid
+ if track_tags.musicbrainz_releasegroupid:
+ album.add_external_id(ExternalID.MB_RELEASEGROUP, track_tags.musicbrainz_releasegroupid)
+ if track_tags.year:
+ album.year = track_tags.year
+ album.album_type = track_tags.album_type
# hunt for additional metadata and images in the folder structure
- extra_path = os.path.dirname(track_path) if (track_path and not album_path) else None
- for folder_path in (disc_path, album_path, extra_path):
+ if not full_metadata:
+ return album
+
+ extra_path = os.path.dirname(track_path) if (track_path and not album_dir) else None
+ for folder_path in (disc_dir, album_dir, extra_path):
if not folder_path or not await self.exists(folder_path):
continue
nfo_file = os.path.join(folder_path, "album.nfo")
else:
album.metadata.images += images
- await self.mass.cache.set(cache_key, album, expiration=120)
return album
async def _get_local_images(self, folder: str) -> list[MediaItemImage]: