import asyncio
import itertools
-from typing import Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Union
from databases import Database as Db
# get results from all providers
db_album = await self.get_db_item(item_id)
coros = [
- self.get_provider_album_tracks(item.item_id, item.prov_id)
+ self.get_provider_album_tracks(
+ item.item_id, item.prov_id, cache_checksum=db_album.metadata.checksum
+ )
for item in db_album.provider_ids
]
tracks = itertools.chain.from_iterable(await asyncio.gather(*coros))
# merge duplicates using a dict
final_items: Dict[str, Track] = {}
for track in tracks:
- key = f".{track.name.lower()}.{track.version}.{track.disc_number}.{track.track_number}"
+ key = f".{track.name.lower()}.{track.disc_number}.{track.track_number}"
if key in final_items:
final_items[key].provider_ids.update(track.provider_ids)
else:
item_id: str,
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
+ cache_checksum: Any = None,
) -> List[Track]:
"""Return album tracks for the given provider album id."""
prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
- # prefer cache items (if any)
+ # prefer cache items (if any) - do not use cache for filesystem
cache_key = f"{prov.type.value}.album_tracks.{item_id}"
- if cache := await self.mass.cache.get(cache_key):
+ if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.get_album_tracks(item_id)
# store (serializable items) in cache
self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items])
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in items], checksum=cache_checksum
+ )
)
return items
import asyncio
import itertools
-from typing import Dict, List, Optional
+from typing import Any, Dict, List, Optional
from databases import Database as Db
artist = await self.get(item_id, provider, provider_id)
# get results from all providers
coros = [
- self.get_provider_artist_toptracks(item.item_id, item.prov_id)
+ self.get_provider_artist_toptracks(
+ item.item_id, item.prov_id, cache_checksum=artist.metadata.checksum
+ )
for item in artist.provider_ids
]
tracks = itertools.chain.from_iterable(await asyncio.gather(*coros))
)
async def get_provider_artist_toptracks(
- self, item_id: str, provider_id: str
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ cache_checksum: Any = None,
) -> List[Track]:
"""Return top tracks for an artist on given provider."""
- prov = self.mass.music.get_provider(provider_id)
+ prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
# prefer cache items (if any)
cache_key = f"{prov.type.value}.artist_toptracks.{item_id}"
- if cache := await self.mass.cache.get(cache_key):
+ if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.get_artist_toptracks(item_id)
# store (serializable items) in cache
self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items])
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in items], checksum=cache_checksum
+ )
)
return items
async def get_provider_artist_albums(
- self, item_id: str, provider_id: str
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ cache_checksum: Any = None,
) -> List[Album]:
"""Return albums for an artist on given provider."""
- prov = self.mass.music.get_provider(provider_id)
+ prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
# prefer cache items (if any)
cache_key = f"{prov.type.value}.artist_albums.{item_id}"
- if cache := await self.mass.cache.get(cache_key):
+ if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
return [Album.from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.get_artist_albums(item_id)
# store (serializable items) in cache
self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items])
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in items], checksum=cache_checksum
+ )
)
return items
# no existing match found: insert new item
track_artists = await self._get_track_artists(item, db=db)
- track_albums = await self._get_track_albums(item, db=db)
+ track_albums = await self._get_track_albums(
+ item, overwrite=overwrite_existing, db=db
+ )
new_item = await self.mass.database.insert(
self.db_table,
{
metadata.last_refresh = None
# we store a mapping to artists/albums on the item for easier access/listings
track_artists = await self._get_track_artists(item, db=db)
- track_albums = await self._get_track_albums(item, db=db)
+ track_albums = await self._get_track_albums(item, overwrite=True, db=db)
else:
metadata = cur_item.metadata.update(item.metadata, overwrite)
provider_ids = {*cur_item.provider_ids, *item.provider_ids}
self,
base_track: Track,
upd_track: Optional[Track] = None,
+ overwrite: bool = False,
db: Optional[Db] = None,
) -> List[TrackAlbumMapping]:
"""Extract all (unique) albums of track as TrackAlbumMapping."""
track_albums = base_track.albums
# append update item album if needed
if upd_track and upd_track.album:
- mapping = await self._get_album_mapping(upd_track.album, db=db)
+ mapping = await self._get_album_mapping(
+ upd_track.album, overwrite=overwrite, db=db
+ )
mapping = TrackAlbumMapping.from_dict(
{
**mapping.to_dict(),
track_albums.append(mapping)
# append base item album if needed
elif base_track and base_track.album:
- mapping = await self._get_album_mapping(base_track.album, db=db)
+ mapping = await self._get_album_mapping(
+ base_track.album, overwrite=overwrite, db=db
+ )
mapping = TrackAlbumMapping.from_dict(
{
**mapping.to_dict(),
return track_albums
async def _get_album_mapping(
- self, album: Union[Album, ItemMapping], db: Optional[Db] = None
+ self,
+ album: Union[Album, ItemMapping],
+ overwrite: bool = False,
+ db: Optional[Db] = None,
) -> ItemMapping:
"""Extract (database) album as ItemMapping."""
if album.provider == ProviderType.DATABASE:
):
return ItemMapping.from_item(db_album)
- db_album = await self.mass.music.albums.add_db_item(album, db=db)
+ db_album = await self.mass.music.albums.add_db_item(
+ album, overwrite_existing=overwrite, db=db
+ )
return ItemMapping.from_item(db_album)
async def _get_artist_mapping(
# album is required for track linking
if left_track.album is None or right_track.album is None:
return False
- # track name and version must match
+ # track name must match
if not left_track.sort_name:
left_track.sort_name = create_clean_string(left_track.name)
if not right_track.sort_name:
right_track.sort_name = create_clean_string(right_track.name)
if left_track.sort_name != right_track.sort_name:
return False
+ # exact albumtrack match = 100% match
+ if (
+ compare_album(left_track.album, right_track.album)
+ and left_track.track_number
+ and right_track.track_number
+ and left_track.disc_number == right_track.disc_number
+ and left_track.track_number == right_track.track_number
+ ):
+ return True
+ # track version must match
if not compare_version(left_track.version, right_track.version):
return False
# track artist(s) must match
if not compare_explicit(left_track.metadata, right_track.metadata):
return False
# exact album match = 100% match
- if compare_album(left_track.album, right_track.album):
- return True
if left_track.albums and right_track.albums:
for left_album in left_track.albums:
for right_album in right_track.albums:
return True
# fallback: both albums are compilations and (near-exact) track duration match
if (
- abs(left_track.duration - right_track.duration) <= 1
+ abs(left_track.duration - right_track.duration) <= 2
and left_track.album.album_type in (AlbumType.UNKNOWN, AlbumType.COMPILATION)
and right_track.album.album_type in (AlbumType.UNKNOWN, AlbumType.COMPILATION)
):
cache_key = (
f"{prov.type.value}.search.{self.media_type.value}.{search_query}.{limit}"
)
- if cache := await self.mass.cache.get(cache_key):
- return [media_from_dict(x) for x in cache]
+ if not prov.type.is_file(): # do not cache filesystem results
+ if cache := await self.mass.cache.get(cache_key):
+ return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.search(
search_query,
from __future__ import annotations
import asyncio
+import logging
import os
import urllib.parse
from contextlib import asynccontextmanager
"aiff": ContentType.AIFF,
}
SCHEMA_VERSION = 17
+LOGGER = logging.getLogger(__name__)
async def scantree(path: str) -> AsyncGenerator[os.DirEntry, None]:
return entry.is_dir(follow_symlinks=False)
loop = asyncio.get_running_loop()
- for entry in await loop.run_in_executor(None, os.scandir, path):
- if await loop.run_in_executor(None, is_dir, entry):
- async for subitem in scantree(entry.path):
- yield subitem
- else:
- yield entry
+ try:
+ entries = await loop.run_in_executor(None, os.scandir, path)
+ except (OSError, PermissionError) as err:
+ LOGGER.warning("Skip folder %s: %s", path, str(err))
+ else:
+ for entry in entries:
+ if await loop.run_in_executor(None, is_dir, entry):
+ async for subitem in scantree(entry.path):
+ yield subitem
+ else:
+ yield entry
def split_items(org_str: str) -> Tuple[str]:
continue
if track := await self._parse_track(entry.path):
+ # set checksum on track to invalidate any cached listings
+ track.metadata.checksum = checksum
# process album
if track.album:
+ # set checksum on album to invalidate cached albumtracks listings etc
+ track.album.metadata.checksum = checksum
db_album = await self.mass.music.albums.add_db_item(
track.album, overwrite_existing=True, db=db
)
)
# process (album)artist
if track.album.artist:
+ # set checksum on albumartist to invalidate cached artisttracks listings etc
+ track.album.artist.metadata.checksum = checksum
db_artist = await self.mass.music.artists.add_db_item(
track.album.artist, db=db
)
for img_type in ImageType:
if img_type.value in _filepath:
images.append(MediaItemImage(img_type, _filepath, True))
- elif _filename == "folder.jpg":
+ elif "folder." in _filepath:
images.append(MediaItemImage(ImageType.THUMB, _filepath, True))
if images:
album.metadata.images = images