Better handling of album directory for local files (#1445)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 4 Jul 2024 23:25:03 +0000 (01:25 +0200)
committerGitHub <noreply@github.com>
Thu, 4 Jul 2024 23:25:03 +0000 (01:25 +0200)
music_assistant/server/helpers/compare.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/filesystem_local/helpers.py

index 692890600028993a2b7d52ac8271393b3379d069..b2e51a466d9d96c57fec1fcbfac3d52ebc393d35 100644 (file)
@@ -3,10 +3,10 @@
 from __future__ import annotations
 
 import re
+from difflib import SequenceMatcher
 
 import unidecode
 
-from music_assistant.common.helpers.util import create_sort_name
 from music_assistant.common.models.enums import ExternalID, MediaType
 from music_assistant.common.models.media_items import (
     Album,
@@ -388,18 +388,23 @@ def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
     """Compare strings and return True if we have an (almost) perfect match."""
     if not str1 or not str2:
         return False
+    str1_lower = str1.lower()
+    str2_lower = str2.lower()
+    if strict:
+        return str1_lower == str2_lower
     # return early if total length mismatch
     if abs(len(str1) - len(str2)) > 4:
         return False
-    if not strict:
-        # handle '&' vs 'And'
-        if " & " in str1 and " and " in str2.lower():
-            str2 = str2.lower().replace(" and ", " & ")
-        elif " and " in str1.lower() and " & " in str2:
-            str2 = str2.replace(" & ", " and ")
-        return create_safe_string(str1) == create_safe_string(str2)
-
-    return create_sort_name(str1) == create_sort_name(str2)
+    # handle '&' vs 'And'
+    if " & " in str1_lower and " and " in str2_lower:
+        str2 = str2_lower.replace(" and ", " & ")
+    elif " and " in str1_lower and " & " in str2:
+        str2 = str2.replace(" & ", " and ")
+    if create_safe_string(str1) == create_safe_string(str2):
+        return True
+    # last resort: use difflib to compare strings
+    required_accuracy = 0.91 if len(str1) > 8 else 0.85
+    return SequenceMatcher(a=str1_lower, b=str2).ratio() > required_accuracy
 
 
 def compare_version(base_version: str, compare_version: str) -> bool:
index 7b3c2136e5839f6619eb74ad21b7e11f261c5ab5..c217664bad3aae58f5c1e4505c23b0b86f10fad3 100644 (file)
@@ -47,14 +47,13 @@ from music_assistant.constants import (
     DB_TABLE_TRACK_ARTISTS,
     VARIOUS_ARTISTS_NAME,
 )
-from music_assistant.server.controllers.cache import use_cache
 from music_assistant.server.controllers.music import DB_SCHEMA_VERSION
 from music_assistant.server.helpers.compare import compare_strings
 from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
 from music_assistant.server.helpers.tags import parse_tags, split_items
 from music_assistant.server.models.music_provider import MusicProvider
 
-from .helpers import get_parentdir
+from .helpers import get_album_dir, get_artist_dir, get_disc_dir
 
 if TYPE_CHECKING:
     from collections.abc import AsyncGenerator
@@ -731,16 +730,14 @@ class FileSystemProviderBase(MusicProvider):
             # disc_dir is the folder level where the tracks are located
             # this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
             # or this is an album folder with the disc attached
-            disc_dir = get_parentdir(file_item.path, f"disc {tags.disc or 1}")
-            album_dir = get_parentdir(disc_dir or file_item.path, tags.album)
+            disc_dir = get_disc_dir(file_item.path, tags.album, tags.disc)
+            album_dir = get_album_dir(file_item.path, tags.album, disc_dir)
 
             # album artist(s)
             album_artists = []
             if tags.album_artists:
                 for index, album_artist_str in enumerate(tags.album_artists):
-                    # work out if we have an artist folder
-                    artist_dir = get_parentdir(album_dir, album_artist_str, 1)
-                    artist = await self._parse_artist(album_artist_str, artist_path=artist_dir)
+                    artist = await self._parse_artist(album_artist_str, album_path=album_dir)
                     if not artist.mbid:
                         with contextlib.suppress(IndexError):
                             artist.mbid = tags.musicbrainz_albumartistids[index]
@@ -781,8 +778,9 @@ class FileSystemProviderBase(MusicProvider):
 
             track.album = await self._parse_album(
                 tags.album,
-                album_dir,
-                disc_dir,
+                track_path=file_item.path,
+                album_path=album_dir,
+                disc_path=disc_dir,
                 artists=album_artists,
                 barcode=tags.barcode,
             )
@@ -859,14 +857,20 @@ class FileSystemProviderBase(MusicProvider):
 
     async def _parse_artist(
         self,
-        name: str | None = None,
+        name: str,
         artist_path: str | None = None,
+        album_path: str | None = None,
         sort_name: str | None = None,
-    ) -> Artist | None:
-        """Lookup metadata in Artist folder."""
-        assert name or artist_path
+    ) -> Artist:
+        """Parse Artist metadata into an Artist object."""
+        cache_key = f"{self.instance_id}-artistdata-{name}-{artist_path}"
+        if cache := await self.mass.cache.get(cache_key):
+            return cache
+        if not artist_path and album_path:
+            # try to find (album)artist folder based on album path
+            artist_path = get_artist_dir(album_path=album_path, artist_name=name)
         if not artist_path:
-            # check if we have an existing item
+            # check if we have an existing item to retrieve the artist path
             async for item in self.mass.music.artists.iter_library_items(search=name):
                 if not compare_strings(name, item.name):
                     continue
@@ -882,30 +886,25 @@ class FileSystemProviderBase(MusicProvider):
                     artist_path = name
                 elif await self.exists(name.title()):
                     artist_path = name.title()
-                else:
-                    # use fake artist path as item id which is just the name
-                    artist_path = name
-
-        if not name:
-            name = artist_path.split(os.sep)[-1]
 
         artist = Artist(
-            item_id=artist_path,
+            item_id=artist_path or name,
             provider=self.instance_id,
             name=name,
             sort_name=sort_name,
             provider_mappings={
                 ProviderMapping(
-                    item_id=artist_path,
+                    item_id=artist_path or name,
                     provider_domain=self.domain,
                     provider_instance=self.instance_id,
-                    url=artist_path,
+                    url=artist_path or name,
                 )
             },
         )
 
         if not await self.exists(artist_path):
             # return basic object if there is no dedicated artist folder
+            await self.mass.cache.set(cache_key, artist, expiration=120)
             return artist
 
         nfo_file = os.path.join(artist_path, "artist.nfo")
@@ -930,19 +929,23 @@ class FileSystemProviderBase(MusicProvider):
         if images := await self._get_local_images(artist_path):
             artist.metadata.images = images
 
+        await self.mass.cache.set(cache_key, artist, expiration=120)
         return artist
 
     async def _parse_album(
         self,
-        name: str | None,
+        name: str,
+        track_path: str,
         album_path: str | None,
         disc_path: str | None,
         artists: list[Artist],
         barcode: str | None = None,
         sort_name: str | None = None,
-    ) -> Album | None:
-        """Lookup metadata in Album folder."""
-        assert name or album_path
+    ) -> Album:
+        """Parse Album metadata into an Album object."""
+        cache_key = f"{self.instance_id}-albumdata-{name}-{album_path}"
+        if cache := await self.mass.cache.get(cache_key):
+            return cache
         # create fake path if needed
         if not album_path and artists:
             album_path = artists[0].name + os.sep + name
@@ -970,11 +973,9 @@ class FileSystemProviderBase(MusicProvider):
         if barcode:
             album.external_ids.add((ExternalID.BARCODE, barcode))
 
-        if not await self.exists(album_path):
-            # return basic object if there is no dedicated album folder
-            return album
-
-        for folder_path in (disc_path, album_path):
+        # hunt for additional metadata and images in the folder structure
+        extra_path = os.path.dirname(track_path) if (track_path and not album_path) else None
+        for folder_path in (disc_path, album_path, extra_path):
             if not folder_path:
                 continue
             nfo_file = os.path.join(folder_path, "album.nfo")
@@ -1009,9 +1010,9 @@ class FileSystemProviderBase(MusicProvider):
                 else:
                     album.metadata.images += images
 
+        await self.mass.cache.set(cache_key, album, expiration=120)
         return album
 
-    @use_cache(120)
     async def _get_local_images(self, folder: str) -> list[MediaItemImage]:
         """Return local images found in a given folderpath."""
         images = []
index acc42a8bf0950ba20a59e5e4c2da91fc4a0d5e9a..9459f5e1d14e7586f44f6212444f36f94df6f759 100644 (file)
@@ -7,15 +7,41 @@ import os
 from music_assistant.server.helpers.compare import compare_strings
 
 
-def get_parentdir(base_path: str, name: str, skip: int = 0) -> str | None:
-    """Look for folder name in path (to find dedicated artist or album folder)."""
-    if not base_path:
-        return None
-    parentdir = os.path.dirname(base_path)
-    for _ in range(skip, 3):
+def get_artist_dir(album_path: str, artist_name: str) -> str | None:
+    """Look for (Album)Artist directory in path of album."""
+    parentdir = os.path.dirname(album_path)
+    dirname = parentdir.rsplit(os.sep)[-1]
+    if compare_strings(artist_name, dirname, False):
+        return parentdir
+    return None
+
+
+def get_disc_dir(track_path: str, album_name: str, disc_number: int | None) -> str | None:
+    """Look for disc directory in path of album/tracks."""
+    parentdir = os.path.dirname(track_path)
+    dirname = parentdir.rsplit(os.sep)[-1]
+    dirname_lower = dirname.lower()
+    if disc_number is not None and compare_strings(f"disc {disc_number}", dirname, False):
+        return parentdir
+    if dirname_lower.startswith(album_name.lower()) and "disc" in dirname_lower:
+        return parentdir
+    if dirname_lower.startswith(album_name.lower()) and dirname_lower.endswith(str(disc_number)):
+        return parentdir
+    return None
+
+
+def get_album_dir(track_path: str, album_name: str, disc_dir: str | None) -> str | None:
+    """Return album/parent directory of a track."""
+    parentdir = os.path.dirname(track_path)
+    # account for disc sublevel by ignoring 1 level if needed
+    for _ in range(2 if disc_dir else 1):
         dirname = parentdir.rsplit(os.sep)[-1]
-        dirname = dirname.split("(")[0].split("[")[0].strip()
-        if compare_strings(name, dirname, False):
+        dirname_lower = dirname.lower()
+        if compare_strings(album_name, dirname, False):
+            return parentdir
+        if album_name in dirname_lower:
+            return parentdir
+        if dirname_lower in album_name:
             return parentdir
         parentdir = os.path.dirname(parentdir)
     return None