import asyncio
import itertools
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
from databases import Database as Db
from music_assistant.models.media_items import (
Album,
AlbumType,
+ Artist,
ItemMapping,
MediaType,
Track,
return await self.update_db_item(cur_item.item_id, album, db=db)
# insert new album
- album_artist = await self._get_album_artist(album, cur_item, db=db)
+ album_artists = await self._get_album_artists(album, cur_item, db=db)
new_item = await self.mass.database.insert(
self.db_table,
{
**album.to_db_row(),
- "artist": json_serializer(album_artist) or None,
+ "artists": json_serializer(album_artists) or None,
},
db=db,
)
assert album.artist, f"Album {album.name} is missing artist"
async with self.mass.database.get_db(db) as db:
cur_item = await self.get_db_item(item_id)
- album_artist = await self._get_album_artist(album, cur_item, db=db)
+ album_artists = await self._get_album_artists(album, cur_item, db=db)
if overwrite:
metadata = album.metadata
provider_ids = album.provider_ids
"year": album.year or cur_item.year,
"upc": album.upc or cur_item.upc,
"album_type": album_type.value,
- "artist": json_serializer(album_artist) or None,
+ "artists": json_serializer(album_artists) or None,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
provider.name,
)
- async def _get_album_artist(
+ async def _get_album_artists(
self,
db_album: Album,
updated_album: Optional[Album] = None,
db: Optional[Db] = None,
- ) -> ItemMapping | None:
- """Extract (database) album artist as ItemMapping."""
+ ) -> List[ItemMapping]:
+ """Extract (database) album artist(s) as ItemMapping."""
+ album_artists = set()
for album in (updated_album, db_album):
- if not album or not album.artist:
+ if not album:
continue
+ for artist in album.artists:
+ album_artists.add(await self._get_artist_mapping(artist, db=db))
+ # use intermediate set to prevent duplicates
+ return list(album_artists)
- if album.artist.provider == ProviderType.DATABASE:
- if isinstance(album.artist, ItemMapping):
- return album.artist
- return ItemMapping.from_item(album.artist)
+ async def _get_artist_mapping(
+ self, artist: Union[Artist, ItemMapping], db: Optional[Db] = None
+ ) -> ItemMapping:
+ """Extract (database) track artist as ItemMapping."""
+ if artist.provider == ProviderType.DATABASE:
+ if isinstance(artist, ItemMapping):
+ return artist
+ return ItemMapping.from_item(artist)
- if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
- album.artist.item_id, provider=album.artist.provider, db=db
- ):
- return ItemMapping.from_item(db_artist)
-
- db_artist = await self.mass.music.artists.add_db_item(album.artist, db=db)
+ if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
+ artist.item_id, provider=artist.provider, db=db
+ ):
return ItemMapping.from_item(db_artist)
- return None
+ db_artist = await self.mass.music.artists.add_db_item(artist, db=db)
+ return ItemMapping.from_item(db_artist)
from tinytag.tinytag import TinyTag
from music_assistant.helpers.compare import compare_strings
+from music_assistant.helpers.database import SCHEMA_VERSION
from music_assistant.helpers.util import (
create_clean_string,
parse_title_and_version,
)
from music_assistant.models.provider import MusicProvider
+FALLBACK_ARTIST = "Various Artists"
+SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ", " / ")
+
async def scantree(path: str) -> AsyncGenerator[os.DirEntry, None]:
"""Recursively yield DirEntry objects for given directory."""
yield entry
-def split_items(org_str: str, splitters: Tuple[str] = None) -> Tuple[str]:
+def split_items(org_str: str) -> Tuple[str]:
"""Split up a tags string by common splitter."""
if isinstance(org_str, list):
return org_str
- if splitters is None:
- splitters = ("/", ";", ",")
if org_str is None:
return tuple()
- for splitter in splitters:
+ for splitter in SPLITTERS:
if splitter in org_str:
return tuple((x.strip() for x in org_str.split(splitter)))
return (org_str,)
-FALLBACK_ARTIST = "Various Artists"
-ARTIST_SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ")
-
-
class FileSystemProvider(MusicProvider):
"""
Implementation of a musicprovider for local files.
if checksum == prev_checksums.get(entry.path):
continue
try:
- if track := await self._parse_track(entry.path, checksum):
+ if track := await self._parse_track(entry.path):
# process album
if track.album:
db_album = await self.mass.music.albums.add_db_item(
await self.mass.music.tracks.set_db_library(
db_track.item_id, True, db=db
)
- elif playlist := await self._parse_playlist(entry.path, checksum):
+ elif playlist := await self._parse_playlist(entry.path):
# add/update] playlist to db
+ playlist.metadata.checksum = checksum
await self.mass.music.playlists.add_db_item(playlist, db=db)
except Exception: # pylint: disable=broad-except
# we don't want the whole sync to crash on one file so we catch all exceptions here
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
itempath = await self.get_filepath(MediaType.TRACK, prov_track_id)
- if await self.exists(itempath):
- return await self._parse_track(itempath)
- return await self.mass.music.tracks.get_db_item_by_prov_id(
- provider_item_id=prov_track_id, provider_id=self.id
- )
+ if not await self.exists(itempath):
+ raise MediaNotFoundError(f"Track path does not exist: {itempath}")
+ return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
playlist_path = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- checksum = await self._get_checksum(playlist_path)
- cache_key = f"{self.id}_playlist_tracks_{prov_playlist_id}"
+ mtime = await aiopath.getmtime(playlist_path)
+ checksum = f"{SCHEMA_VERSION}.{int(mtime)}"
+ cache_key = f"playlist_{self.id}_tracks_{prov_playlist_id}"
if cache := await self.mass.cache.get(cache_key, checksum):
return [Track.from_dict(x) for x in cache]
index = 0
if db_artist is None:
raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
# TODO: adjust to json query instead of text search
- query = f"SELECT * FROM albums WHERE artist LIKE '%\"{db_artist.item_id}\"%'"
+ query = f"SELECT * FROM albums WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
return await self.mass.music.albums.get_db_items(query)
bit_depth=16, # TODO: parse bitdepth
)
- async def _parse_track(
- self, track_path: str, checksum: Optional[int] = None
- ) -> Track | None:
+ async def _parse_track(self, track_path: str) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
if not await self.exists(track_path):
track_item_id = self._get_item_id(track_path)
- # reading file/tags is slow so we keep a cache and checksum
- checksum = checksum or await self._get_checksum(track_path)
- cache_key = f"{self.id}_tracks_{track_item_id}"
- if cache := await self.mass.cache.get(cache_key, checksum):
- return Track.from_dict(cache)
-
if not TinyTag.is_supported(track_path):
return None
tags = await self.mass.loop.run_in_executor(None, parse_tags)
- assert tags.title, "Required tag title is missing"
- assert tags.artist, "Required tag artist is missing"
-
# prefer title from tag, fallback to filename
if tags.title:
track_title = tags.title
track_title = track_path.split(os.sep)[-1]
track_title = track_title.replace(f".{ext}", "").replace("_", " ")
self.logger.warning(
- "%s is missing ID3 tags, use filename as fallback", track_path
+ "%s is missing ID3 tag [title], using filename as fallback", track_path
)
name, version = parse_title_and_version(track_title)
in_library=True,
)
- # work out if we have an artist/album/track.ext structure
- track_parts = track_path.rsplit(os.sep)
- album_folder = None
- artist_folder = None
- parentdir = os.path.dirname(track_path)
- for _ in range(len(track_parts)):
- dirname = parentdir.rsplit(os.sep)[-1]
- if compare_strings(dirname, tags.albumartist):
- artist_folder = parentdir
- if compare_strings(dirname, tags.album):
- album_folder = parentdir
- parentdir = os.path.dirname(parentdir)
-
- # album artist
- if tags.albumartist or artist_folder:
- album_artist = await self._parse_artist(
- name=tags.albumartist, artist_path=artist_folder, in_library=True
- )
- else:
- # always fallback to various artists as album artist if user did not tag album artist
- # ID3 tag properly because we must have an album artist
- album_artist = await self._parse_artist(name="Various Artists")
-
# album
+ # work out if we have an artist/album/track.ext structure
if tags.album:
+ track_parts = track_path.rsplit(os.sep)
+ album_folder = None
+ artist_folder = None
+ parentdir = os.path.dirname(track_path)
+ for _ in range(len(track_parts)):
+ dirname = parentdir.rsplit(os.sep)[-1]
+ if compare_strings(dirname, tags.albumartist):
+ artist_folder = parentdir
+ if compare_strings(dirname, tags.album):
+ album_folder = parentdir
+ parentdir = os.path.dirname(parentdir)
+
+ # album artist
+ if artist_folder:
+ album_artists = [
+ await self._parse_artist(
+ name=tags.albumartist,
+ artist_path=artist_folder,
+ in_library=True,
+ )
+ ]
+ elif tags.albumartist:
+ album_artists = [
+ await self._parse_artist(name=item, in_library=True)
+ for item in split_items(tags.albumartist)
+ ]
+
+ else:
+ # always fallback to various artists as album artist if user did not tag album artist
+ # ID3 tag properly because we must have an album artist
+ album_artists = [await self._parse_artist(name=FALLBACK_ARTIST)]
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using %s as fallback",
+ track_path,
+ FALLBACK_ARTIST,
+ )
+
track.album = await self._parse_album(
name=tags.album,
album_path=album_folder,
- artist=album_artist,
+ artists=album_artists,
in_library=True,
)
+ else:
+ self.logger.warning("%s is missing ID3 tag [album]", track_path)
- if (
- track.album
- and track.album.artist
- and track.album.artist.name == tags.artist
- ):
- track.artists = [track.album.artist]
+ # track artist(s)
+ if tags.artist == tags.albumartist:
+ track.artists = track.album.artists
else:
# Parse track artist(s) from artist string using common splitters used in ID3 tags
# NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
track_artists_str = tags.artist or FALLBACK_ARTIST
track.artists = [
await self._parse_artist(item, in_library=False)
- for item in split_items(track_artists_str, ARTIST_SPLITTERS)
+ for item in split_items(track_artists_str)
]
# Check if track has embedded metadata
track.metadata.copyright = tags.extra["copyright"]
if "lyrics" in tags.extra:
track.metadata.lyrics = tags.extra["lyrics"]
- # store last modified time as checksum
- track.metadata.checksum = checksum
quality_details = ""
if track_path.endswith(".flac"):
url=track_path,
)
)
- await self.mass.cache.set(cache_key, track.to_dict(), checksum, 86400 * 365 * 5)
return track
async def _parse_artist(
name: Optional[str] = None,
artist_path: Optional[str] = None,
in_library: bool = True,
- skip_cache=False,
) -> Artist | None:
"""Lookup metadata in Artist folder."""
assert name or artist_path
if not name:
name = artist_path.split(os.sep)[-1]
- cache_key = f"{self.id}.artist.{artist_item_id}"
- if not skip_cache:
- if cache := await self.mass.cache.get(cache_key):
- return Artist.from_dict(cache)
-
artist = Artist(
artist_item_id,
self.type,
if images:
artist.metadata.images = images
- await self.mass.cache.set(cache_key, artist.to_dict())
return artist
async def _parse_album(
self,
name: Optional[str] = None,
album_path: Optional[str] = None,
- artist: Optional[Artist] = None,
+ artists: List[Artist] = None,
in_library: bool = True,
- skip_cache=False,
) -> Album | None:
"""Lookup metadata in Album folder."""
- assert name or album_path
- if not album_path and artist:
- # create fake path
- album_path = os.path.join(self.config.path, artist.name, name)
- elif not album_path:
- album_path = os.path.join("Albums", name)
+ assert (name or album_path) and artists
+ # create fake path
+ album_path = os.path.join(self.config.path, artists[0].name, name)
album_item_id = self._get_item_id(album_path)
if not name:
name = album_path.split(os.sep)[-1]
- cache_key = f"{self.id}.album.{album_item_id}"
- if not skip_cache:
- if cache := await self.mass.cache.get(cache_key):
- return Album.from_dict(cache)
-
album = Album(
album_item_id,
self.type,
name,
- artist=artist,
+ artists=artists,
provider_ids={
MediaItemProviderId(album_item_id, self.type, self.id, url=album_path)
},
album_tracks = [
x async for x in scantree(album_path) if TinyTag.is_supported(x.path)
]
- if artist and artist.sort_name == "variousartists":
+ if album.artist.sort_name == "variousartists":
album.album_type = AlbumType.COMPILATION
elif len(album_tracks) <= 5:
album.album_type = AlbumType.SINGLE
if images:
album.metadata.images = images
- await self.mass.cache.set(cache_key, album.to_dict())
return album
- async def _parse_playlist(
- self, playlist_path: str, checksum: Optional[str] = None
- ) -> Playlist | None:
+ async def _parse_playlist(self, playlist_path: str) -> Playlist | None:
"""Parse playlist from file."""
playlist_item_id = self._get_item_id(playlist_path)
- checksum = checksum or await self._get_checksum(playlist_path)
if not playlist_path.endswith(".m3u"):
return None
)
)
playlist.owner = self._attr_name
- playlist.metadata.checksum = checksum
return playlist
async def _parse_track_from_uri(self, uri: str):
)
return None
# try to treat uri as filename
+ if self.config.path not in uri:
+ uri = os.path.join(self.config.path, uri)
try:
- return await self.get_track(uri)
+ return await self._parse_track(uri)
except MediaNotFoundError:
return None
def _get_item_id(self, file_path: str) -> str:
"""Create item id from filename."""
return create_clean_string(file_path.replace(self.config.path, ""))
-
- @staticmethod
- async def _get_checksum(filename: str) -> int:
- """Get checksum for file."""
- # use last modified time as checksum
- return await aiopath.getmtime(filename)