):
return musicbrainz_id
# lookup failed
- self.logger.warning("Unable to get musicbrainz ID for artist %s !", artist.name)
+ ref_albums_str = "/".join(x.name for x in ref_albums) or "none"
+ ref_tracks_str = "/".join(x.name for x in ref_tracks) or "none"
+ self.logger.info(
+ "Unable to get musicbrainz ID for artist %s\n"
+ " - using lookup-album(s): %s\n"
+ " - using lookup-track(s): %s\n",
+ artist.name,
+ ref_albums_str,
+ ref_tracks_str,
+ )
return None
async def get_image_data_for_item(
await self._register_provider(prov_cls(self.mass, prov_conf), prov_conf)
# always register url provider
await self._register_provider(URLProvider(self.mass, URL_CONFIG), URL_CONFIG)
+ # add job to cleanup old records from db
+ self.mass.add_job(
+ self._cleanup_library(),
+ "Cleanup removed items from database",
+ allow_duplicate=False,
+ )
async def start_sync(
self,
await asyncio.sleep(3600 * schedule)
self.mass.create_task(do_sync())
- # add job to cleanup old records from db
- self.mass.add_job(
- self._cleanup_library(),
- "Cleanup removed items from database",
- allow_duplicate=False,
- )
@property
def provider_count(self) -> int:
metadata = item.metadata
metadata.last_refresh = None
provider_ids = item.provider_ids
- album_artists = await self._get_album_artists(cur_item)
+ album_artists = await self._get_album_artists(item, overwrite=True)
else:
metadata = cur_item.metadata.update(item.metadata)
provider_ids = {*cur_item.provider_ids, *item.provider_ids}
db_item = await self.get_db_item(item_id)
return db_item
- async def delete_db_item(self, item_id: int) -> None:
+ async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
"""Delete record from the database."""
- # delete tracks connected to this album
- await self.mass.database.delete_where_query(
- TABLE_TRACKS, f"albums LIKE '%\"{item_id}\"%'"
+ # check album tracks
+ db_rows = await self.mass.music.tracks.get_db_items_by_query(
+ f"SELECT item_id FROM {TABLE_TRACKS} WHERE albums LIKE '%\"{item_id}\"%'"
)
+ assert not (db_rows and not recursive), "Tracks attached to album"
+ for db_row in db_rows:
+ await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
+
# delete the album itself from db
await super().delete_db_item(item_id)
self,
db_album: Album,
updated_album: Optional[Album] = None,
+ overwrite: bool = False,
) -> List[ItemMapping]:
"""Extract (database) album artist(s) as ItemMapping."""
album_artists = set()
if not album:
continue
for artist in album.artists:
- album_artists.add(await self._get_artist_mapping(artist))
+ album_artists.add(await self._get_artist_mapping(artist, overwrite))
# use intermediate set to prevent duplicates
# filter various artists if multiple artists
if len(album_artists) > 1:
return list(album_artists)
async def _get_artist_mapping(
- self, artist: Union[Artist, ItemMapping]
+ self, artist: Union[Artist, ItemMapping], overwrite: bool = False
) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
+ if overwrite:
+ artist = await self.mass.music.artists.add_db_item(
+ artist, overwrite_existing=True
+ )
if artist.provider == ProviderType.DATABASE:
if isinstance(artist, ItemMapping):
return artist
for row in await self.mass.database.get_rows(self.db_table, match):
row_artist = Artist.from_db_row(row)
if row_artist.sort_name == item.sort_name:
- # just to be sure ?!
cur_item = row_artist
break
if cur_item:
db_item = await self.get_db_item(item_id)
return db_item
- async def delete_db_item(self, item_id: int) -> None:
+ async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
"""Delete record from the database."""
- # delete tracks/albums connected to this artist
- await self.mass.database.delete_where_query(
- TABLE_TRACKS, f"artists LIKE '%\"{item_id}\"%'"
+ # check artist albums
+ db_rows = await self.mass.music.albums.get_db_items_by_query(
+ f"SELECT item_id FROM {TABLE_ALBUMS} WHERE artists LIKE '%\"{item_id}\"%'"
)
- await self.mass.database.delete_where_query(
- TABLE_ALBUMS, f"artists LIKE '%\"{item_id}\"%'"
+ assert not (db_rows and not recursive), "Albums attached to artist"
+ for db_row in db_rows:
+ await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
+
+ # check artist tracks
+ db_rows = await self.mass.music.tracks.get_db_items_by_query(
+ f"SELECT item_id FROM {TABLE_TRACKS} WHERE artists LIKE '%\"{item_id}\"%'"
)
+ assert not (db_rows and not recursive), "Tracks attached to artist"
+ for db_row in db_rows:
+ await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
+
# delete the artist itself from db
await super().delete_db_item(item_id)
overwrite: bool = False,
) -> ItemMapping:
"""Extract (database) album as ItemMapping."""
+ if overwrite:
+ db_album = await self.mass.music.albums.add_db_item(
+ album, overwrite_existing=True
+ )
+
if album.provider == ProviderType.DATABASE:
if isinstance(album, ItemMapping):
return album
return ItemMapping.from_item(db_album)
async def _get_artist_mapping(
- self, artist: Union[Artist, ItemMapping]
+ self, artist: Union[Artist, ItemMapping], overwrite: bool = False
) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
+ if overwrite:
+ artist = await self.mass.music.artists.add_db_item(
+ artist, overwrite_existing=True
+ )
if artist.provider == ProviderType.DATABASE:
if isinstance(artist, ItemMapping):
return artist
"""Helpers for creating/parsing URI's."""
+import os
from typing import Tuple
from music_assistant.models.enums import MediaType, ProviderType
provider, media_type_str, item_id = uri.split(":")
provider = ProviderType.parse(provider)
media_type = MediaType(media_type_str)
+ elif os.path.isfile(uri):
+ # Translate a local file (which is not from file provider) to the URL provider
+ provider = ProviderType.URL
+ media_type = MediaType.TRACK
+ item_id = uri
else:
raise KeyError
except (TypeError, AttributeError, ValueError, KeyError) as err:
from abc import ABCMeta, abstractmethod
from time import time
-from typing import TYPE_CHECKING, Generic, List, Optional, Tuple, TypeVar
+from typing import (
+ TYPE_CHECKING,
+ AsyncGenerator,
+ Generic,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+)
from music_assistant.models.errors import MediaNotFoundError
sql_query, params, limit=limit, offset=offset
)
+ async def iter_db_items(
+ self,
+ in_library: Optional[bool] = None,
+ search: Optional[str] = None,
+ order_by: str = "sort_name",
+ ) -> AsyncGenerator[ItemCls, None]:
+ """Iterate all in-database items."""
+ limit: int = 500
+ offset: int = 0
+ while True:
+ next_items = await self.db_items(
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ )
+ for item in next_items:
+ yield item
+ if len(next_items) < limit:
+ break
+ offset += limit
+
async def count(self, in_library: Optional[bool] = None) -> int:
"""Return number of in-library items for this MediaType."""
if in_library is not None:
self.logger.debug("removed provider %s from item id %s", prov_id, item_id)
- async def delete_db_item(self, item_id: int) -> None:
+ async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
"""Delete record from the database."""
# delete item
await self.mass.database.delete(
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- if MediaType.ARTIST in self.supported_mediatypes:
- raise NotImplementedError
+ raise NotImplementedError
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of all albums for the given artist."""
if MediaType.ALBUM in self.supported_mediatypes:
raise NotImplementedError
+ return []
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of most popular tracks for the given artist."""
if MediaType.TRACK in self.supported_mediatypes:
raise NotImplementedError
+ return []
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- if MediaType.ALBUM in self.supported_mediatypes:
- raise NotImplementedError
+ raise NotImplementedError
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- if MediaType.TRACK in self.supported_mediatypes:
- raise NotImplementedError
+ raise NotImplementedError
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- if MediaType.PLAYLIST in self.supported_mediatypes:
- raise NotImplementedError
+ raise NotImplementedError
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get full radio details by id."""
"""Get album tracks for given album id."""
if MediaType.ALBUM in self.supported_mediatypes:
raise NotImplementedError
+ return []
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
if MediaType.PLAYLIST in self.supported_mediatypes:
raise NotImplementedError
+ return []
async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Add item to provider's library. Return true on succes."""
continue
self.logger.debug("Start sync of %s items.", media_type.value)
controller = self.mass.music.get_controller(media_type)
-
- # create a set of all previous and current db id's
- # note we only store the items in the prev_ids list that are
- # unique to this provider to avoid getting into a mess where
- # for example an item still exists on disk (in case of file provider)
- # and no longer favorite on streaming provider.
- # Bottomline this means that we don't do a full 2 way sync if multiple
- # providers are attached to the same media item.
- prev_ids = set()
- for db_item in await controller.db_items(True):
- prov_types = {x.prov_type for x in db_item.provider_ids}
- if len(prov_types) > 1:
- continue
- for prov_id in db_item.provider_ids:
- if prov_id.prov_id == self.id:
- prev_ids.add(db_item.item_id)
- cur_ids = set()
+ cur_db_ids = set()
async for prov_item in self._get_library_gen(media_type)():
prov_item: MediaItemType = prov_item
db_item = await controller.update_db_item(
db_item.item_id, prov_item
)
- cur_ids.add(db_item.item_id)
+ cur_db_ids.add(db_item.item_id)
if not db_item.in_library:
await controller.set_db_library(db_item.item_id, True)
- # process deletions
- for item_id in prev_ids:
- if item_id not in cur_ids:
+ # process deletions (= no longer in library)
+ async for db_item in controller.iter_db_items(True):
+ if db_item.item_id in cur_db_ids:
+ continue
+ for prov_id in db_item.provider_ids:
+ prov_types = {x.prov_type for x in db_item.provider_ids}
+ if len(prov_types) > 1:
+ continue
+ if prov_id.prov_id != self.id:
+ continue
# only mark the item as not in library and leave the metadata in db
- await controller.set_db_library(item_id, False)
+ await controller.set_db_library(db_item.item_id, False)
# DO NOT OVERRIDE BELOW
)
from music_assistant.models.music_provider import MusicProvider
-VALID_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
+TRACK_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
+PLAYLIST_EXTENSIONS = ("m3u",)
+SUPPORTED_EXTENSIONS = TRACK_EXTENSIONS + PLAYLIST_EXTENSIONS
SCHEMA_VERSION = 17
LOGGER = logging.getLogger(__name__)
)
elif track := await self._parse_track(full_path):
result.append(track)
+ elif playlist := await self._parse_playlist(full_path):
+ result.append(playlist)
return result
async def sync_library(
save_checksum_interval = 0
if prev_checksums is None:
prev_checksums = {}
+
# find all music files in the music directory and all subfolders
# we work bottom up, as-in we derive all info from the tracks
cur_checksums = {}
async for entry in scantree(self.config.path):
+
+ if "." not in entry.path or entry.path.startswith("."):
+ # skip system files and files without extension
+ continue
+
+ _, ext = entry.path.rsplit(".", 1)
+ if ext not in SUPPORTED_EXTENSIONS:
+ # unsupported file extension
+ continue
+
try:
# mtime is used as file checksum
stat = await asyncio.get_running_loop().run_in_executor(
if checksum == prev_checksums.get(entry.path):
continue
- if track := await self._parse_track(entry.path):
+ if ext in TRACK_EXTENSIONS:
# add/update track to db
- await self.mass.music.tracks.add_db_item(track)
- elif playlist := await self._parse_playlist(entry.path):
+ track = await self._parse_track(entry.path)
+ # if the track was edited on disk, always overwrite existing db details
+ overwrite_existing = entry.path in prev_checksums
+ await self.mass.music.tracks.add_db_item(
+ track, overwrite_existing=overwrite_existing
+ )
+ elif ext in PLAYLIST_EXTENSIONS:
+ playlist = await self._parse_playlist(entry.path)
# add/update] playlist to db
playlist.metadata.checksum = checksum
await self.mass.music.playlists.add_db_item(playlist)
# we don't want the whole sync to crash on one file so we catch all exceptions here
self.logger.exception("Error processing %s - %s", entry.path, str(err))
- # save checksums every 50 processed items
+ # save checksums every 100 processed items
# this allows us to pickup where we leftoff when initial scan gets intterrupted
- if save_checksum_interval == 50:
+ if save_checksum_interval == 100:
await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
save_checksum_interval = 0
else:
save_checksum_interval += 1
+ # store (final) checksums in cache
await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
# work out deletions
deleted_files = set(prev_checksums.keys()) - set(cur_checksums.keys())
+ await self._process_deletions(deleted_files)
+
+ async def _process_deletions(self, deleted_files: set) -> None:
+ """Process all deletions."""
artists: Set[ItemMapping] = set()
albums: Set[ItemMapping] = set()
# process deleted tracks/playlists
for file_path in deleted_files:
+
+ if "." not in file_path.path or file_path.path.startswith("."):
+ # skip system files and files without extension
+ continue
+
+ _, ext = file_path.path.rsplit(".", 1)
+ if ext not in SUPPORTED_EXTENSIONS:
+ # unsupported file extension
+ continue
+
item_id = self._get_item_id(file_path)
- # try track first
- if db_item := await self.mass.music.tracks.get_db_item_by_prov_id(
- item_id, self.type
- ):
- await self.mass.music.tracks.remove_prov_mapping(
- db_item.item_id, self.id
- )
- # gather artists(s) attached to this track
- for artist in db_item.artists:
- artists.add(artist.item_id)
- # gather album and albumartist(s) attached to this track
- if db_item.album:
- albums.add(db_item.album.item_id)
- for artist in db_item.album.artists:
+ if ext in TRACK_EXTENSIONS:
+ if db_item := await self.mass.music.tracks.get_db_item_by_prov_id(
+ item_id, self.type
+ ):
+ await self.mass.music.tracks.remove_prov_mapping(
+ db_item.item_id, self.id
+ )
+ # gather artists(s) attached to this track
+ for artist in db_item.artists:
artists.add(artist.item_id)
- # fallback to playlist
- elif db_item := await self.mass.music.playlists.get_db_item_by_prov_id(
- item_id, self.type
- ):
- await self.mass.music.playlists.remove_prov_mapping(
- db_item.item_id, self.id
- )
+ # gather album and albumartist(s) attached to this track
+ if db_item.album:
+ albums.add(db_item.album.item_id)
+ for artist in db_item.album.artists:
+ artists.add(artist.item_id)
+ elif ext in PLAYLIST_EXTENSIONS:
+ if db_item := await self.mass.music.playlists.get_db_item_by_prov_id(
+ item_id, self.type
+ ):
+ await self.mass.music.playlists.remove_prov_mapping(
+ db_item.item_id, self.id
+ )
# check if albums are deleted
for album_id in albums:
album = await self.mass.music.albums.get_db_item(album_id)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
+ db_artist = await self.mass.music.artists.get_db_item_by_prov_id(
+ provider_item_id=prov_artist_id, provider_id=self.id
+ )
+ if db_artist is None:
+ raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
itempath = await self._get_filepath(MediaType.ARTIST, prov_artist_id)
if await self.exists(itempath):
# if path exists on disk allow parsing full details to allow refresh of metadata
- return await self._parse_artist(artist_path=itempath)
- return await self.mass.music.artists.get_db_item_by_prov_id(
- provider_item_id=prov_artist_id, provider_id=self.id
- )
+ return await self._parse_artist(db_artist.name, artist_path=itempath)
+ return db_artist
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
itempath = await self._get_filepath(MediaType.ALBUM, prov_album_id)
if await self.exists(itempath):
# if path exists on disk allow parsing full details to allow refresh of metadata
- return await self._parse_album(None, itempath, db_album.artists)
+ return await self._parse_album(db_album.name, itempath, db_album.artists)
return db_album
async def get_track(self, prov_track_id: str) -> Track:
result = []
for track in await self.mass.music.tracks.get_db_items_by_query(query):
track.album = db_album
- album_mapping = next(
+ if album_mapping := next(
(x for x in track.albums if x.item_id == db_album.item_id), None
- )
- track.disc_number = album_mapping.disc_number
- track.track_number = album_mapping.track_number
- result.append(track)
+ ):
+ track.disc_number = album_mapping.disc_number
+ track.track_number = album_mapping.track_number
+ result.append(track)
return sorted(result, key=lambda x: (x.disc_number, x.track_number))
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
"""Try to parse a track from a playlist line."""
- if "://" in line:
- # track is uri from external provider?
- try:
- return await self.mass.music.get_item_by_uri(line)
- except MusicAssistantError as err:
- self.logger.warning(
- "Could not parse uri %s to track: %s", line, str(err)
- )
- return None
- # try to treat uri as filename
- if await self.exists(line):
- return await self._parse_track(line)
- rel_path = os.path.join(playlist_path, line)
- if await self.exists(rel_path):
- return await self._parse_track(rel_path)
- return None
+ try:
+ # try to treat uri as filename first
+ if await self.exists(line):
+ file_path = await self.resolve(line)
+ return await self._parse_track(file_path)
+ # fallback to generic uri parsing
+ return await self.mass.music.get_item_by_uri(line)
+ except MusicAssistantError as err:
+ self.logger.warning(
+ "Could not parse uri/file %s to track: %s", line, str(err)
+ )
+ return None
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
direct=itempath,
)
- async def _parse_track(self, track_path: str) -> Track | None:
+ async def _parse_track(self, track_path: str) -> Track:
"""Try to parse a track from a filename by reading its tags."""
if not await self.exists(track_path):
raise MediaNotFoundError(f"Track path does not exist: {track_path}")
- if "." not in track_path or track_path.startswith("."):
- # skip system files and files without extension
- return None
-
- _, ext = track_path.rsplit(".", 1)
- if ext not in VALID_EXTENSIONS:
- # unsupported file extension
- return None
-
track_item_id = self._get_item_id(track_path)
# parse tags
)
if not artist.musicbrainz_id:
try:
- artist.musicbrainz_id = tags.musicbrainz_artistids[index]
+ artist.musicbrainz_id = tags.musicbrainz_albumartistids[
+ index
+ ]
except IndexError:
pass
album_artists.append(artist)
# cover image - prefer album image, fallback to embedded
if track.album and track.album.image:
- track.album.metadata.images = [
+ track.metadata.images = [
MediaItemImage(ImageType.THUMB, track.album.image, True)
]
elif tags.has_cover_image:
track.album.metadata.images = track.metadata.images
# parse other info
+ assert tags.duration, "Invalid duration"
track.duration = tags.duration
track.metadata.genres = tags.genres
track.disc_number = tags.disc
return album
- async def _parse_playlist(self, playlist_path: str) -> Playlist | None:
+ async def _parse_playlist(self, playlist_path: str) -> Playlist:
"""Parse playlist from file."""
playlist_item_id = self._get_item_id(playlist_path)
- if not playlist_path.endswith(".m3u"):
- return None
-
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
"""Return bool is this FileSystem musicprovider has given file/dir."""
if not file_path:
return False # guard
- # ensure we have a full path and not relative
+ file_path = await self.resolve(file_path)
if self.config.path not in file_path:
- file_path = os.path.join(self.config.path, file_path)
+ # additional guard (needed for files within m3u files)
+ return False
_exists = wrap(os.path.exists)
return await _exists(file_path)
async def resolve(self, file_path: str) -> str:
"""Resolve local accessible file."""
# remote file locations should return a tempfile here so this is future proofing
+ if self.config.path not in file_path:
+ file_path = os.path.join(self.config.path, file_path)
return file_path
async def _get_filepath(