improve matching logic - prevent duplicate tracks
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 16 May 2022 21:47:43 +0000 (23:47 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 16 May 2022 21:47:43 +0000 (23:47 +0200)
music_assistant/controllers/music/providers/filesystem.py
music_assistant/helpers/compare.py

index efc05d63e742a3af711eda639d909d7611ee1e53..4160c57bad6b9f3ef023d4a517e40d5a85fca71f 100644 (file)
@@ -285,10 +285,7 @@ class FileSystemProvider(MusicProvider):
         self, track_path: str, checksum: Optional[int] = None
     ) -> Track | None:
         """Try to parse a track from a filename by reading its tags."""
-        if self.config.path not in track_path:
-            track_path = os.path.join(self.config.path, track_path)
-        track_path_base = self._get_relative_path(track_path)
-        track_item_id = self._get_item_id(track_path_base)
+        track_item_id = self._get_item_id(track_path)
 
         if not self.exists(track_path):
             raise MediaNotFoundError(f"Track path does not exist: {track_path}")
@@ -313,10 +310,11 @@ class FileSystemProvider(MusicProvider):
             track_title = tags.title
         else:
 
-            ext = track_path_base.split(".")[-1]
-            track_title = track_path_base.replace(f".{ext}", "").replace("_", " ")
+            ext = track_path.split(".")[-1]
+            track_title = track_path.split(os.sep)[-1]
+            track_title = track_title.replace(f".{ext}", "").replace("_", " ")
             self.logger.warning(
-                "%s is missing ID3 tags, use filename as fallback", track_path_base
+                "%s is missing ID3 tags, use filename as fallback", track_path
             )
 
         name, version = parse_title_and_version(track_title)
@@ -330,7 +328,7 @@ class FileSystemProvider(MusicProvider):
         )
 
         # work out if we have an artist/album/track.ext structure
-        track_parts = track_path_base.rsplit(os.sep)
+        track_parts = track_path.rsplit(os.sep)
         album_folder = None
         artist_folder = None
         parentdir = os.path.dirname(track_path)
@@ -432,7 +430,7 @@ class FileSystemProvider(MusicProvider):
                 prov_id=self.id,
                 quality=quality,
                 details=quality_details,
-                url=track_path_base,
+                url=track_path,
             )
         )
         await self.mass.cache.set(cache_key, track.to_dict(), checksum, 86400 * 365 * 5)
@@ -448,10 +446,10 @@ class FileSystemProvider(MusicProvider):
         """Lookup metadata in Artist folder."""
         assert name or artist_path
         if not artist_path:
-            artist_path = self._get_absolute_path(name)
+            # create fake path
+            artist_path = os.path.join(self.config.path, name)
 
-        artist_path_base = self._get_relative_path(artist_path)
-        artist_item_id = self._get_item_id(artist_path_base)
+        artist_item_id = self._get_item_id(artist_path)
         if not name:
             name = artist_path.split(os.sep)[-1]
 
@@ -465,9 +463,7 @@ class FileSystemProvider(MusicProvider):
             self.type,
             name,
             provider_ids={
-                MediaItemProviderId(
-                    artist_item_id, self.type, self.id, url=artist_path_base
-                )
+                MediaItemProviderId(artist_item_id, self.type, self.id, url=artist_path)
             },
             in_library=in_library,
         )
@@ -531,12 +527,12 @@ class FileSystemProvider(MusicProvider):
         """Lookup metadata in Album folder."""
         assert name or album_path
         if not album_path and artist:
-            album_path = self._get_absolute_path(f"{artist.name}{os.sep}{name}")
+            # create fake path
+            album_path = os.path.join(self.config.path, artist.name, name)
         elif not album_path:
-            album_path = self._get_absolute_path(name)
+            album_path = os.path.join(self.config.path, name)
 
-        album_path_base = self._get_relative_path(album_path)
-        album_item_id = self._get_item_id(album_path_base)
+        album_item_id = self._get_item_id(album_path)
         if not name:
             name = album_path.split(os.sep)[-1]
 
@@ -551,9 +547,7 @@ class FileSystemProvider(MusicProvider):
             name,
             artist=artist,
             provider_ids={
-                MediaItemProviderId(
-                    album_item_id, self.type, self.id, url=album_path_base
-                )
+                MediaItemProviderId(album_item_id, self.type, self.id, url=album_path)
             },
             in_library=in_library,
         )
@@ -617,9 +611,7 @@ class FileSystemProvider(MusicProvider):
         self, playlist_path: str, checksum: Optional[str] = None
     ) -> Playlist | None:
         """Parse playlist from file."""
-        playlist_path = self._get_absolute_path(playlist_path)
-        playlist_path_base = self._get_relative_path(playlist_path)
-        playlist_item_id = self._get_item_id(playlist_path_base)
+        playlist_item_id = self._get_item_id(playlist_path)
         checksum = checksum or self._get_checksum(playlist_path)
 
         if not playlist_path.endswith(".m3u"):
@@ -628,7 +620,7 @@ class FileSystemProvider(MusicProvider):
         if not self.exists(playlist_path):
             raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
 
-        name = playlist_path_base.split(os.sep)[-1].replace(".m3u", "")
+        name = playlist_path.split(os.sep)[-1].replace(".m3u", "")
 
         playlist = Playlist(playlist_item_id, provider=self.type, name=name)
         playlist.is_editable = True
@@ -638,7 +630,7 @@ class FileSystemProvider(MusicProvider):
                 item_id=playlist_item_id,
                 prov_type=self.type,
                 prov_id=self.id,
-                url=playlist_path_base,
+                url=playlist_path,
             )
         )
         playlist.owner = self._attr_name
@@ -691,26 +683,9 @@ class FileSystemProvider(MusicProvider):
             return file_path
         return None
 
-    def _get_relative_path(self, filename: str) -> str:
-        """Get relative path for filename (without the base dir)."""
-        if self.config.path not in filename:
-            return filename
-        filename = filename.replace(self.config.path, "")
-        if filename.startswith(os.sep):
-            filename = filename[1:]
-        if filename.endswith(os.sep):
-            filename = filename[:-1]
-        return filename
-
-    def _get_absolute_path(self, filename: str) -> str:
-        """Get absolute path for filename (including the base dir)."""
-        if self.config.path in filename:
-            return filename
-        return os.path.join(self.config.path, filename)
-
-    def _get_item_id(self, filename: str) -> str:
+    def _get_item_id(self, file_path: str) -> str:
         """Create item id from filename."""
-        return create_clean_string(self._get_relative_path(filename))
+        return create_clean_string(file_path.replace(self.config.path, ""))
 
     @staticmethod
     def _get_checksum(filename: str) -> int:
index e5f0e7642fc4124d2e31ff2727e2070b04fa31c7..0a100c4d7be924ba616efb02245d895dac11e1af 100644 (file)
@@ -8,12 +8,13 @@ from music_assistant.models.media_items import (
     Album,
     Artist,
     ItemMapping,
+    MediaItem,
     MediaItemMetadata,
     Track,
 )
 
 
-def compare_strings(str1, str2, strict=False):
+def compare_strings(str1, str2, strict=False) -> bool:
     """Compare strings and return True if we have an (almost) perfect match."""
     if str1 is None or str2 is None:
         return False
@@ -22,7 +23,7 @@ def compare_strings(str1, str2, strict=False):
     return str1.lower().strip() == str2.lower().strip()
 
 
-def compare_version(left_version: str, right_version: str):
+def compare_version(left_version: str, right_version: str) -> bool:
     """Compare version string."""
     if not left_version and not right_version:
         return True
@@ -38,28 +39,82 @@ def compare_version(left_version: str, right_version: str):
     return left_versions == right_versions
 
 
-def compare_explicit(left: MediaItemMetadata, right: MediaItemMetadata):
+def compare_explicit(left: MediaItemMetadata, right: MediaItemMetadata) -> bool:
     """Compare if explicit is same in metadata."""
     if left.explicit is None and right.explicit is None:
         return True
     return left == right
 
 
-def compare_artists(left_artists: List[Artist], right_artists: List[Artist]):
-    """Compare two lists of artist and return True if both lists match."""
+def compare_artist(
+    left_artist: Union[Artist, ItemMapping],
+    right_artist: Union[Artist, ItemMapping],
+    allow_none=False,
+) -> bool:
+    """Compare two artist items and return True if they match."""
+    if allow_none and left_artist is None and right_artist is None:
+        return True
+    if left_artist is None or right_artist is None:
+        return False
+    # return early on exact item_id match
+    if compare_item_id(left_artist, right_artist):
+        return True
+
+    # prefer match on musicbrainz_id
+    if getattr(left_artist, "musicbrainz_id", None) and getattr(
+        right_artist, "musicbrainz_id", None
+    ):
+        return left_artist.musicbrainz_id == right_artist.musicbrainz_id
+
+    # fallback to comparing
+
+    if not left_artist.sort_name:
+        left_artist.sort_name = create_clean_string(left_artist.name)
+    if not right_artist.sort_name:
+        right_artist.sort_name = create_clean_string(right_artist.name)
+    return left_artist.sort_name == right_artist.sort_name
+
+
+def compare_artists(
+    left_artists: List[Union[Artist, ItemMapping]],
+    right_artists: List[Union[Artist, ItemMapping]],
+) -> bool:
+    """Compare two lists of artist and return True if both lists match (exactly)."""
     matches = 0
     for left_artist in left_artists:
-        if not left_artist.sort_name:
-            left_artist.sort_name = create_clean_string(left_artist.name)
         for right_artist in right_artists:
-            if not right_artist.sort_name:
-                right_artist.sort_name = create_clean_string(right_artist.name)
-            if left_artist.sort_name == right_artist.sort_name:
+            if compare_artist(left_artist, right_artist):
                 matches += 1
     return len(left_artists) == matches
 
 
-def compare_albums(left_albums: List[Album], right_albums: List[Album]):
+def compare_item_id(
+    left_item: Union[MediaItem, ItemMapping], right_item: Union[MediaItem, ItemMapping]
+) -> bool:
+    """Compare two lists of artist and return True if both lists match."""
+    if (
+        left_item.provider == right_item.provider
+        and left_item.item_id == right_item.item_id
+    ):
+        return True
+
+    if not hasattr(left_item, "provider_ids") or not hasattr(
+        right_item, "provider_ids"
+    ):
+        return False
+    for prov_l in left_item.provider_ids:
+        for prov_r in right_item.provider_ids:
+            if prov_l.prov_type != prov_r.prov_type:
+                continue
+            if prov_l.item_id == prov_r.item_id:
+                return True
+    return False
+
+
+def compare_albums(
+    left_albums: List[Union[Album, ItemMapping]],
+    right_albums: List[Union[Album, ItemMapping]],
+):
     """Compare two lists of albums and return True if a match was found."""
     for left_album in left_albums:
         for right_album in right_albums:
@@ -69,16 +124,17 @@ def compare_albums(left_albums: List[Album], right_albums: List[Album]):
 
 
 def compare_album(
-    left_album: Union[Album, ItemMapping], right_album: Union[Album, ItemMapping]
+    left_album: Union[Album, ItemMapping],
+    right_album: Union[Album, ItemMapping],
+    allow_none=False,
 ):
     """Compare two album items and return True if they match."""
+    if left_album is None and right_album is None:
+        return True
     if left_album is None or right_album is None:
         return False
     # return early on exact item_id match
-    if (
-        left_album.provider == right_album.provider
-        and left_album.item_id == right_album.item_id
-    ):
+    if compare_item_id(left_album, right_album):
         return True
 
     # prefer match on UPC
@@ -96,31 +152,23 @@ def compare_album(
         left_album.sort_name = create_clean_string(left_album.name)
     if not right_album.sort_name:
         right_album.sort_name = create_clean_string(right_album.name)
-    if not compare_strings(left_album.name, right_album.name):
+    if left_album.sort_name != right_album.sort_name:
         return False
     if not compare_version(left_album.version, right_album.version):
         return False
     # album artist must be either set on both or not at all
-    if left_album.artist and not right_album.artist:
-        return False
-    if right_album.artist and not left_album.artist:
-        return False
-    if left_album.artist and right_album.artist:
-        if not left_album.artist.sort_name:
-            left_album.artist.sort_name = create_clean_string(left_album.artist.name)
-        if not right_album.artist.sort_name:
-            right_album.artist.sort_name = create_clean_string(right_album.artist.name)
-        if left_album.artist.sort_name != right_album.artist.sort_name:
+    if hasattr(left_album, "artist") and hasattr(right_album, "artist"):
+        if not compare_artist(left_album.artist, right_album.artist, True):
             return False
     return left_album.sort_name == right_album.sort_name
 
 
 def compare_track(left_track: Track, right_track: Track):
     """Compare two track items and return True if they match."""
-    if (
-        left_track.provider == right_track.provider
-        and left_track.item_id == right_track.item_id
-    ):
+    if left_track is None or right_track is None:
+        return False
+    # return early on exact item_id match
+    if compare_item_id(left_track, right_track):
         return True
     if left_track.isrc and left_track.isrc == right_track.isrc:
         # ISRC is always 100% accurate match
@@ -134,7 +182,7 @@ def compare_track(left_track: Track, right_track: Track):
         left_track.sort_name = create_clean_string(left_track.name)
     if not right_track.sort_name:
         right_track.sort_name = create_clean_string(right_track.name)
-    if not left_track.sort_name != right_track.sort_name:
+    if left_track.sort_name != right_track.sort_name:
         return False
     if not compare_version(left_track.version, right_track.version):
         return False
@@ -145,14 +193,8 @@ def compare_track(left_track: Track, right_track: Track):
     if not compare_explicit(left_track.metadata, right_track.metadata):
         return False
     # exact album match OR (near) exact duration match
-    if isinstance(left_track.album, Album) and isinstance(right_track.album, Album):
-        if compare_album(left_track.album, right_track.album):
-            return True
-    if isinstance(left_track.album, ItemMapping) and isinstance(
-        right_track.album, ItemMapping
-    ):
-        if left_track.album.sort_name == right_track.album.sort_name:
-            return True
+    if compare_album(left_track.album, right_track.album, True):
+        return True
     if abs(left_track.duration - right_track.duration) <= 2:
         # 100% match, all criteria passed
         return True