Prefer ID3 tags for filesystem provider (#315)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 15 May 2022 23:57:30 +0000 (01:57 +0200)
committerGitHub <noreply@github.com>
Sun, 15 May 2022 23:57:30 +0000 (01:57 +0200)
* Prefer ID3 tags for filesystem provider

music_assistant/controllers/music/providers/filesystem.py
music_assistant/controllers/music/tracks.py
music_assistant/controllers/stream.py
music_assistant/helpers/compare.py

index da06216ee485e98bb1cf0b80be56b5f89b36483e..3ebb424daecddb9ddca2efa723aaa43fc3ed8fdf 100644 (file)
@@ -12,6 +12,7 @@ import xmltodict
 from aiofiles.threadpool.binary import AsyncFileIO
 from tinytag.tinytag import TinyTag
 
+from music_assistant.helpers.compare import compare_strings
 from music_assistant.helpers.util import (
     create_clean_string,
     parse_title_and_version,
@@ -116,7 +117,7 @@ class FileSystemProvider(MusicProvider):
         for entry in scantree(self.config.path):
 
             # mtime is used as file checksum
-            checksum = str(entry.stat().st_mtime)
+            checksum = int(entry.stat().st_mtime)
             if checksum == checksums.get(entry.path):
                 continue
 
@@ -139,18 +140,21 @@ class FileSystemProvider(MusicProvider):
                 # we don't want the whole sync to crash on one file so we catch all exceptions here
                 self.logger.exception("Error processing %s", entry.path)
 
+            # save checksum in cache for next sync
+            checksums[entry.path] = checksum
+
         # TODO: Handle deletions
         await self.mass.cache.set(cache_key, checksums)
 
     async def get_artist(self, prov_artist_id: str) -> Artist:
         """Get full artist details by id."""
         itempath = await self.get_filepath(prov_artist_id)
-        return await self._parse_artist(itempath)
+        return await self._parse_artist(artist_path=itempath)
 
     async def get_album(self, prov_album_id: str) -> Album:
         """Get full album details by id."""
         itempath = await self.get_filepath(prov_album_id)
-        return await self._parse_album(itempath)
+        return await self._parse_album(album_path=itempath)
 
     async def get_track(self, prov_track_id: str) -> Track:
         """Get full track details by id."""
@@ -172,7 +176,7 @@ class FileSystemProvider(MusicProvider):
         result = []
         for entry in scantree(itempath):
             # mtime is used as file checksum
-            checksum = str(entry.stat().st_mtime)
+            checksum = int(entry.stat().st_mtime)
             if track := await self._parse_track(entry.path, checksum):
                 result.append(track)
         return result
@@ -182,7 +186,7 @@ class FileSystemProvider(MusicProvider):
         result = []
         playlist_path = await self.get_filepath(prov_playlist_id)
         if not self.exists(playlist_path):
-            raise MediaNotFoundError(f"playlist path does not exist: {playlist_path}")
+            raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
         checksum = self._get_checksum(playlist_path)
         cache_key = f"{self.id}_playlist_tracks_{prov_playlist_id}"
         if cache := await self.mass.cache.get(cache_key, checksum):
@@ -223,7 +227,7 @@ class FileSystemProvider(MusicProvider):
         result = []
         for entry in scantree(itempath):
             # mtime is used as file checksum
-            checksum = str(entry.stat().st_mtime)
+            checksum = int(entry.stat().st_mtime)
             if track := await self._parse_track(entry.path, checksum):
                 result.append(track)
         return result
@@ -278,7 +282,7 @@ class FileSystemProvider(MusicProvider):
         )
 
     async def _parse_track(
-        self, track_path: str, checksum: Optional[str] = None
+        self, track_path: str, checksum: Optional[int] = None
     ) -> Track | None:
         """Try to parse a track from a filename by reading its tags."""
         if self.config.path not in track_path:
@@ -327,24 +331,35 @@ class FileSystemProvider(MusicProvider):
 
         # work out if we have an artist/album/track.ext structure
         track_parts = track_path_base.rsplit(os.sep)
-        if len(track_parts) == 3:
-            album_path = os.path.dirname(track_path)
-            artist_path = os.path.dirname(album_path)
-            album_artist = await self._parse_artist(artist_path, True)
-            track.album = await self._parse_album(album_path, album_artist, True)
-
-        if track.album is None and tags.album:
-            # no artist/album structure found, create a basic album object instead
-            if tags.albumartist:
-                album_path = f"{tags.albumartist}/{tags.album}"
-                album_artist = await self._parse_artist(tags.albumartist)
-            else:
-                album_path = tags.album
-                album_artist = None
-            track.album = await self._parse_album(album_path, album_artist)
+        album_folder = None
+        artist_folder = None
+        parentdir = os.path.dirname(track_path)
+        for _ in range(len(track_parts)):
+            dirname = parentdir.rsplit(os.sep)[-1]
+            if compare_strings(dirname, tags.albumartist or tags.artist):
+                artist_folder = parentdir
+            if compare_strings(dirname, tags.album):
+                album_folder = parentdir
+            parentdir = os.path.dirname(parentdir)
+
+        # album artist
+        if tags.albumartist or artist_folder:
+            album_artist = await self._parse_artist(
+                name=tags.albumartist, artist_path=artist_folder, in_library=True
+            )
+        else:
+            album_artist = None
+
+        # album
+        if tags.album:
+            track.album = await self._parse_album(
+                name=tags.album,
+                album_path=album_folder,
+                artist=album_artist,
+                in_library=True,
+            )
 
-        # try to guess the album type
-        if track.album:
+            # try to guess the album type
             if name.lower() == track.album.name.lower():
                 track.album.album_type = AlbumType.SINGLE
             elif track.album.artist not in (x.name for x in track.artists):
@@ -352,13 +367,20 @@ class FileSystemProvider(MusicProvider):
             else:
                 track.album.album_type = AlbumType.ALBUM
 
-        # Parse track artist(s) from artist string using common splitters used in ID3 tags
-        # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
-        track_artists_str = tags.artist or FALLBACK_ARTIST
-        track.artists = [
-            await self._parse_artist(item)
-            for item in split_items(track_artists_str, ARTIST_SPLITTERS)
-        ]
+        if (
+            track.album
+            and track.album.artist
+            and track.album.artist.name == tags.artist
+        ):
+            track.artists = [track.album.artist]
+        else:
+            # Parse track artist(s) from artist string using common splitters used in ID3 tags
+            # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
+            track_artists_str = tags.artist or FALLBACK_ARTIST
+            track.artists = [
+                await self._parse_artist(item, in_library=False)
+                for item in split_items(track_artists_str, ARTIST_SPLITTERS)
+            ]
 
         # Check if track has embedded metadata
         img = await self.mass.loop.run_in_executor(None, tags.get_image)
@@ -416,13 +438,22 @@ class FileSystemProvider(MusicProvider):
         await self.mass.cache.set(cache_key, track.to_dict(), checksum, 86400 * 365 * 5)
         return track
 
-    async def _parse_artist(self, artist_path: str, skip_cache=False) -> Artist | None:
+    async def _parse_artist(
+        self,
+        name: Optional[str] = None,
+        artist_path: Optional[str] = None,
+        in_library: bool = True,
+        skip_cache=False,
+    ) -> Artist | None:
         """Lookup metadata in Artist folder."""
-        if self.config.path not in artist_path:
-            artist_path = os.path.join(self.config.path, artist_path)
+        assert name or artist_path
+        if not artist_path:
+            artist_path = self._get_absolute_path(name)
+
         artist_path_base = self._get_relative_path(artist_path)
         artist_item_id = self._get_item_id(artist_path_base)
-        name = artist_path.split(os.sep)[-1]
+        if not name:
+            name = artist_path.split(os.sep)[-1]
 
         cache_key = f"{self.id}.artist.{artist_item_id}"
         if not skip_cache:
@@ -438,14 +469,14 @@ class FileSystemProvider(MusicProvider):
                     artist_item_id, self.type, self.id, url=artist_path_base
                 )
             },
+            in_library=in_library,
         )
 
         if not self.exists(artist_path):
-            # return basic object if there is no path on disk
-            # happens if disk structure does not conform
+            # return basic object if there is no dedicated artist folder
             return artist
 
-        # mark artist as in-library when it exists as folder on disk
+        # always mark artist as in-library when it exists as folder on disk
         artist.in_library = True
 
         nfo_file = os.path.join(artist_path, "artist.nfo")
@@ -490,14 +521,24 @@ class FileSystemProvider(MusicProvider):
         return artist
 
     async def _parse_album(
-        self, album_path: str, artist: Optional[Artist] = None, skip_cache=False
+        self,
+        name: Optional[str] = None,
+        album_path: Optional[str] = None,
+        artist: Optional[Artist] = None,
+        in_library: bool = True,
+        skip_cache=False,
     ) -> Album | None:
         """Lookup metadata in Album folder."""
-        if self.config.path not in album_path:
-            album_path = os.path.join(self.config.path, album_path)
+        assert name or album_path
+        if not album_path and artist:
+            album_path = self._get_absolute_path(f"{artist.name}{os.sep}{name}")
+        elif not album_path:
+            album_path = self._get_absolute_path(name)
+
         album_path_base = self._get_relative_path(album_path)
         album_item_id = self._get_item_id(album_path_base)
-        name = album_path.split(os.sep)[-1]
+        if not name:
+            name = album_path.split(os.sep)[-1]
 
         cache_key = f"{self.id}.album.{album_item_id}"
         if not skip_cache:
@@ -514,14 +555,14 @@ class FileSystemProvider(MusicProvider):
                     album_item_id, self.type, self.id, url=album_path_base
                 )
             },
+            in_library=in_library,
         )
 
         if not self.exists(album_path):
-            # return basic object if there is no path on disk
-            # happens if disk structure does not conform
+            # return basic object if there is no dedicated album folder
             return album
 
-        # mark album as in-library when it exists as folder on disk
+        # always mark as in-library when it exists as folder on disk
         album.in_library = True
 
         nfo_file = os.path.join(album_path, "album.nfo")
@@ -660,12 +701,16 @@ class FileSystemProvider(MusicProvider):
             filename = filename[:-1]
         return filename
 
+    def _get_absolute_path(self, filename: str) -> str:
+        """Get absolute path for filename (including the base dir)."""
+        return os.path.join(self.config.path, filename)
+
     def _get_item_id(self, filename: str) -> str:
         """Create item id from filename."""
         return create_clean_string(self._get_relative_path(filename))
 
     @staticmethod
-    def _get_checksum(filename: str) -> str:
+    def _get_checksum(filename: str) -> int:
         """Get checksum for file."""
         # use last modified time as checksum
-        return str(os.path.getmtime(filename))
+        return int(os.path.getmtime(filename))
index 9e6e3c2a4bce312265c4ac3c715fb05a483ae231..80e0ba08a7fdaa2717472f62ff5d217dcfabca5c 100644 (file)
@@ -64,7 +64,7 @@ class TracksController(MediaControllerBase[Track]):
     ) -> List[Track]:
         """Return all versions of a track we can find on all providers."""
         track = await self.get(item_id, provider, provider_id)
-        prov_types = {item.types for item in self.mass.music.providers}
+        prov_types = {item.type for item in self.mass.music.providers}
         first_artist = next(iter(track.artists))
         search_query = f"{first_artist.name} {track.name}"
         return [
index 6bd911cb22245e9e9d41555d9636e4d2b4cfc10d..c956d7e6b1bb797b06ddcebe2f2724e5900b9c1d 100644 (file)
@@ -24,6 +24,7 @@ from music_assistant.models.enums import (
     CrossFadeMode,
     EventType,
     MediaType,
+    ProviderType,
 )
 from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
 from music_assistant.models.event import MassEvent
@@ -59,13 +60,13 @@ class StreamController:
             return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}"
         return f"http://{self._ip}:{self._port}/{queue_id}.{ext}"
 
-    async def get_preview_url(self, provider_id: str, track_id: str) -> str:
+    async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
         """Return url to short preview sample."""
-        track = await self.mass.music.tracks.get_provider_item(track_id, provider_id)
+        track = await self.mass.music.tracks.get_provider_item(track_id, provider)
         if preview := track.metadata.preview:
             return preview
         enc_track_id = urllib.parse.quote(track_id)
-        return f"http://{self._ip}:{self._port}/preview?provider_id={provider_id}&item_id={enc_track_id}"
+        return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}"
 
     async def setup(self) -> None:
         """Async initialize of module."""
index c2e8ac5f7fad8b0c9f92a408a0ec67692fcb49fd..e5f0e7642fc4124d2e31ff2727e2070b04fa31c7 100644 (file)
@@ -15,6 +15,8 @@ from music_assistant.models.media_items import (
 
 def compare_strings(str1, str2, strict=False):
     """Compare strings and return True if we have an (almost) perfect match."""
+    if str1 is None or str2 is None:
+        return False
     if not strict:
         return create_clean_string(str1) == create_clean_string(str2)
     return str1.lower().strip() == str2.lower().strip()