from aiofiles.threadpool.binary import AsyncFileIO
from tinytag.tinytag import TinyTag
+from music_assistant.helpers.compare import compare_strings
from music_assistant.helpers.util import (
create_clean_string,
parse_title_and_version,
for entry in scantree(self.config.path):
# mtime is used as file checksum
- checksum = str(entry.stat().st_mtime)
+ checksum = int(entry.stat().st_mtime)
if checksum == checksums.get(entry.path):
continue
# we don't want the whole sync to crash on one file so we catch all exceptions here
self.logger.exception("Error processing %s", entry.path)
+ # save checksum in cache for next sync
+ checksums[entry.path] = checksum
+
# TODO: Handle deletions
await self.mass.cache.set(cache_key, checksums)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
itempath = await self.get_filepath(prov_artist_id)
- return await self._parse_artist(itempath)
+ return await self._parse_artist(artist_path=itempath)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
itempath = await self.get_filepath(prov_album_id)
- return await self._parse_album(itempath)
+ return await self._parse_album(album_path=itempath)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
result = []
for entry in scantree(itempath):
# mtime is used as file checksum
- checksum = str(entry.stat().st_mtime)
+ checksum = int(entry.stat().st_mtime)
if track := await self._parse_track(entry.path, checksum):
result.append(track)
return result
result = []
playlist_path = await self.get_filepath(prov_playlist_id)
if not self.exists(playlist_path):
- raise MediaNotFoundError(f"playlist path does not exist: {playlist_path}")
+ raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
checksum = self._get_checksum(playlist_path)
cache_key = f"{self.id}_playlist_tracks_{prov_playlist_id}"
if cache := await self.mass.cache.get(cache_key, checksum):
result = []
for entry in scantree(itempath):
# mtime is used as file checksum
- checksum = str(entry.stat().st_mtime)
+ checksum = int(entry.stat().st_mtime)
if track := await self._parse_track(entry.path, checksum):
result.append(track)
return result
)
async def _parse_track(
- self, track_path: str, checksum: Optional[str] = None
+ self, track_path: str, checksum: Optional[int] = None
) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
if self.config.path not in track_path:
# work out if we have an artist/album/track.ext structure
track_parts = track_path_base.rsplit(os.sep)
- if len(track_parts) == 3:
- album_path = os.path.dirname(track_path)
- artist_path = os.path.dirname(album_path)
- album_artist = await self._parse_artist(artist_path, True)
- track.album = await self._parse_album(album_path, album_artist, True)
-
- if track.album is None and tags.album:
- # no artist/album structure found, create a basic album object instead
- if tags.albumartist:
- album_path = f"{tags.albumartist}/{tags.album}"
- album_artist = await self._parse_artist(tags.albumartist)
- else:
- album_path = tags.album
- album_artist = None
- track.album = await self._parse_album(album_path, album_artist)
+ album_folder = None
+ artist_folder = None
+ parentdir = os.path.dirname(track_path)
+ for _ in range(len(track_parts)):
+ dirname = parentdir.rsplit(os.sep)[-1]
+ if compare_strings(dirname, tags.albumartist or tags.artist):
+ artist_folder = parentdir
+ if compare_strings(dirname, tags.album):
+ album_folder = parentdir
+ parentdir = os.path.dirname(parentdir)
+
+ # album artist
+ if tags.albumartist or artist_folder:
+ album_artist = await self._parse_artist(
+ name=tags.albumartist, artist_path=artist_folder, in_library=True
+ )
+ else:
+ album_artist = None
+
+ # album
+ if tags.album:
+ track.album = await self._parse_album(
+ name=tags.album,
+ album_path=album_folder,
+ artist=album_artist,
+ in_library=True,
+ )
- # try to guess the album type
- if track.album:
+ # try to guess the album type
if name.lower() == track.album.name.lower():
track.album.album_type = AlbumType.SINGLE
elif track.album.artist not in (x.name for x in track.artists):
else:
track.album.album_type = AlbumType.ALBUM
- # Parse track artist(s) from artist string using common splitters used in ID3 tags
- # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
- track_artists_str = tags.artist or FALLBACK_ARTIST
- track.artists = [
- await self._parse_artist(item)
- for item in split_items(track_artists_str, ARTIST_SPLITTERS)
- ]
+ if (
+ track.album
+ and track.album.artist
+ and track.album.artist.name == tags.artist
+ ):
+ track.artists = [track.album.artist]
+ else:
+ # Parse track artist(s) from artist string using common splitters used in ID3 tags
+ # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
+ track_artists_str = tags.artist or FALLBACK_ARTIST
+ track.artists = [
+ await self._parse_artist(item, in_library=False)
+ for item in split_items(track_artists_str, ARTIST_SPLITTERS)
+ ]
# Check if track has embedded metadata
img = await self.mass.loop.run_in_executor(None, tags.get_image)
await self.mass.cache.set(cache_key, track.to_dict(), checksum, 86400 * 365 * 5)
return track
- async def _parse_artist(self, artist_path: str, skip_cache=False) -> Artist | None:
+ async def _parse_artist(
+ self,
+ name: Optional[str] = None,
+ artist_path: Optional[str] = None,
+ in_library: bool = True,
+ skip_cache=False,
+ ) -> Artist | None:
"""Lookup metadata in Artist folder."""
- if self.config.path not in artist_path:
- artist_path = os.path.join(self.config.path, artist_path)
+ assert name or artist_path
+ if not artist_path:
+ artist_path = self._get_absolute_path(name)
+
artist_path_base = self._get_relative_path(artist_path)
artist_item_id = self._get_item_id(artist_path_base)
- name = artist_path.split(os.sep)[-1]
+ if not name:
+ name = artist_path.split(os.sep)[-1]
cache_key = f"{self.id}.artist.{artist_item_id}"
if not skip_cache:
artist_item_id, self.type, self.id, url=artist_path_base
)
},
+ in_library=in_library,
)
if not self.exists(artist_path):
- # return basic object if there is no path on disk
- # happens if disk structure does not conform
+ # return basic object if there is no dedicated artist folder
return artist
- # mark artist as in-library when it exists as folder on disk
+ # always mark artist as in-library when it exists as folder on disk
artist.in_library = True
nfo_file = os.path.join(artist_path, "artist.nfo")
return artist
async def _parse_album(
- self, album_path: str, artist: Optional[Artist] = None, skip_cache=False
+ self,
+ name: Optional[str] = None,
+ album_path: Optional[str] = None,
+ artist: Optional[Artist] = None,
+ in_library: bool = True,
+ skip_cache=False,
) -> Album | None:
"""Lookup metadata in Album folder."""
- if self.config.path not in album_path:
- album_path = os.path.join(self.config.path, album_path)
+ assert name or album_path
+ if not album_path and artist:
+ album_path = self._get_absolute_path(f"{artist.name}{os.sep}{name}")
+ elif not album_path:
+ album_path = self._get_absolute_path(name)
+
album_path_base = self._get_relative_path(album_path)
album_item_id = self._get_item_id(album_path_base)
- name = album_path.split(os.sep)[-1]
+ if not name:
+ name = album_path.split(os.sep)[-1]
cache_key = f"{self.id}.album.{album_item_id}"
if not skip_cache:
album_item_id, self.type, self.id, url=album_path_base
)
},
+ in_library=in_library,
)
if not self.exists(album_path):
- # return basic object if there is no path on disk
- # happens if disk structure does not conform
+ # return basic object if there is no dedicated album folder
return album
- # mark album as in-library when it exists as folder on disk
+ # always mark as in-library when it exists as folder on disk
album.in_library = True
nfo_file = os.path.join(album_path, "album.nfo")
filename = filename[:-1]
return filename
+ def _get_absolute_path(self, filename: str) -> str:
+ """Get absolute path for filename (including the base dir)."""
+ return os.path.join(self.config.path, filename)
+
def _get_item_id(self, filename: str) -> str:
"""Create item id from filename."""
return create_clean_string(self._get_relative_path(filename))
@staticmethod
- def _get_checksum(filename: str) -> str:
+ def _get_checksum(filename: str) -> int:
"""Get checksum for file."""
# use last modified time as checksum
- return str(os.path.getmtime(filename))
+ return int(os.path.getmtime(filename))