from __future__ import annotations
import re
+from difflib import SequenceMatcher
import unidecode
-from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.enums import ExternalID, MediaType
from music_assistant.common.models.media_items import (
Album,
"""Compare strings and return True if we have an (almost) perfect match."""
if not str1 or not str2:
return False
+ str1_lower = str1.lower()
+ str2_lower = str2.lower()
+ if strict:
+ return str1_lower == str2_lower
# return early if total length mismatch
if abs(len(str1) - len(str2)) > 4:
return False
- if not strict:
- # handle '&' vs 'And'
- if " & " in str1 and " and " in str2.lower():
- str2 = str2.lower().replace(" and ", " & ")
- elif " and " in str1.lower() and " & " in str2:
- str2 = str2.replace(" & ", " and ")
- return create_safe_string(str1) == create_safe_string(str2)
-
- return create_sort_name(str1) == create_sort_name(str2)
+ # handle '&' vs 'And'
+ if " & " in str1_lower and " and " in str2_lower:
+ str2 = str2_lower.replace(" and ", " & ")
+ elif " and " in str1_lower and " & " in str2:
+ str2 = str2.replace(" & ", " and ")
+ if create_safe_string(str1) == create_safe_string(str2):
+ return True
+ # last resort: use difflib to compare strings
+ required_accuracy = 0.91 if len(str1) > 8 else 0.85
+ return SequenceMatcher(a=str1_lower, b=str2).ratio() > required_accuracy
def compare_version(base_version: str, compare_version: str) -> bool:
DB_TABLE_TRACK_ARTISTS,
VARIOUS_ARTISTS_NAME,
)
-from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.controllers.music import DB_SCHEMA_VERSION
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
from music_assistant.server.helpers.tags import parse_tags, split_items
from music_assistant.server.models.music_provider import MusicProvider
-from .helpers import get_parentdir
+from .helpers import get_album_dir, get_artist_dir, get_disc_dir
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
# disc_dir is the folder level where the tracks are located
# this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
# or this is an album folder with the disc attached
- disc_dir = get_parentdir(file_item.path, f"disc {tags.disc or 1}")
- album_dir = get_parentdir(disc_dir or file_item.path, tags.album)
+ disc_dir = get_disc_dir(file_item.path, tags.album, tags.disc)
+ album_dir = get_album_dir(file_item.path, tags.album, disc_dir)
# album artist(s)
album_artists = []
if tags.album_artists:
for index, album_artist_str in enumerate(tags.album_artists):
- # work out if we have an artist folder
- artist_dir = get_parentdir(album_dir, album_artist_str, 1)
- artist = await self._parse_artist(album_artist_str, artist_path=artist_dir)
+ artist = await self._parse_artist(album_artist_str, album_path=album_dir)
if not artist.mbid:
with contextlib.suppress(IndexError):
artist.mbid = tags.musicbrainz_albumartistids[index]
track.album = await self._parse_album(
tags.album,
- album_dir,
- disc_dir,
+ track_path=file_item.path,
+ album_path=album_dir,
+ disc_path=disc_dir,
artists=album_artists,
barcode=tags.barcode,
)
async def _parse_artist(
self,
- name: str | None = None,
+ name: str,
artist_path: str | None = None,
+ album_path: str | None = None,
sort_name: str | None = None,
- ) -> Artist | None:
- """Lookup metadata in Artist folder."""
- assert name or artist_path
+ ) -> Artist:
+ """Parse Artist metadata into an Artist object."""
+ cache_key = f"{self.instance_id}-artistdata-{name}-{artist_path}"
+ if cache := await self.mass.cache.get(cache_key):
+ return cache
+ if not artist_path and album_path:
+ # try to find (album)artist folder based on album path
+ artist_path = get_artist_dir(album_path=album_path, artist_name=name)
if not artist_path:
- # check if we have an existing item
+ # check if we have an existing item to retrieve the artist path
async for item in self.mass.music.artists.iter_library_items(search=name):
if not compare_strings(name, item.name):
continue
artist_path = name
elif await self.exists(name.title()):
artist_path = name.title()
- else:
- # use fake artist path as item id which is just the name
- artist_path = name
-
- if not name:
- name = artist_path.split(os.sep)[-1]
artist = Artist(
- item_id=artist_path,
+ item_id=artist_path or name,
provider=self.instance_id,
name=name,
sort_name=sort_name,
provider_mappings={
ProviderMapping(
- item_id=artist_path,
+ item_id=artist_path or name,
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=artist_path,
+ url=artist_path or name,
)
},
)
if not await self.exists(artist_path):
# return basic object if there is no dedicated artist folder
+ await self.mass.cache.set(cache_key, artist, expiration=120)
return artist
nfo_file = os.path.join(artist_path, "artist.nfo")
if images := await self._get_local_images(artist_path):
artist.metadata.images = images
+ await self.mass.cache.set(cache_key, artist, expiration=120)
return artist
async def _parse_album(
self,
- name: str | None,
+ name: str,
+ track_path: str,
album_path: str | None,
disc_path: str | None,
artists: list[Artist],
barcode: str | None = None,
sort_name: str | None = None,
- ) -> Album | None:
- """Lookup metadata in Album folder."""
- assert name or album_path
+ ) -> Album:
+ """Parse Album metadata into an Album object."""
+ cache_key = f"{self.instance_id}-albumdata-{name}-{album_path}"
+ if cache := await self.mass.cache.get(cache_key):
+ return cache
# create fake path if needed
if not album_path and artists:
album_path = artists[0].name + os.sep + name
if barcode:
album.external_ids.add((ExternalID.BARCODE, barcode))
- if not await self.exists(album_path):
- # return basic object if there is no dedicated album folder
- return album
-
- for folder_path in (disc_path, album_path):
+ # hunt for additional metadata and images in the folder structure
+ extra_path = os.path.dirname(track_path) if (track_path and not album_path) else None
+ for folder_path in (disc_path, album_path, extra_path):
if not folder_path:
continue
nfo_file = os.path.join(folder_path, "album.nfo")
else:
album.metadata.images += images
+ await self.mass.cache.set(cache_key, album, expiration=120)
return album
- @use_cache(120)
async def _get_local_images(self, folder: str) -> list[MediaItemImage]:
"""Return local images found in a given folderpath."""
images = []
from music_assistant.server.helpers.compare import compare_strings
-def get_parentdir(base_path: str, name: str, skip: int = 0) -> str | None:
- """Look for folder name in path (to find dedicated artist or album folder)."""
- if not base_path:
- return None
- parentdir = os.path.dirname(base_path)
- for _ in range(skip, 3):
+def get_artist_dir(album_path: str, artist_name: str) -> str | None:
+ """Look for (Album)Artist directory in path of album."""
+ parentdir = os.path.dirname(album_path)
+ dirname = parentdir.rsplit(os.sep)[-1]
+ if compare_strings(artist_name, dirname, False):
+ return parentdir
+ return None
+
+
+def get_disc_dir(track_path: str, album_name: str, disc_number: int | None) -> str | None:
+ """Look for disc directory in path of album/tracks."""
+ parentdir = os.path.dirname(track_path)
+ dirname = parentdir.rsplit(os.sep)[-1]
+ dirname_lower = dirname.lower()
+ if disc_number is not None and compare_strings(f"disc {disc_number}", dirname, False):
+ return parentdir
+ if dirname_lower.startswith(album_name.lower()) and "disc" in dirname_lower:
+ return parentdir
+ if dirname_lower.startswith(album_name.lower()) and dirname_lower.endswith(str(disc_number)):
+ return parentdir
+ return None
+
+
+def get_album_dir(track_path: str, album_name: str, disc_dir: str | None) -> str | None:
+ """Return album/parent directory of a track."""
+ parentdir = os.path.dirname(track_path)
+ # account for disc sublevel by ignoring 1 level if needed
+ for _ in range(2 if disc_dir else 1):
dirname = parentdir.rsplit(os.sep)[-1]
- dirname = dirname.split("(")[0].split("[")[0].strip()
- if compare_strings(name, dirname, False):
+ dirname_lower = dirname.lower()
+ if compare_strings(album_name, dirname, False):
+ return parentdir
+ if album_name in dirname_lower:
+ return parentdir
+ if dirname_lower in album_name:
return parentdir
parentdir = os.path.dirname(parentdir)
return None