import os
import urllib.parse
from contextlib import asynccontextmanager
-from time import time
from typing import Generator, List, Optional, Tuple
import aiofiles
async def sync_library(self) -> None:
"""Run library sync for this provider."""
- last_save = 0
cache_key = f"{self.id}.checksums"
checksums = await self.mass.cache.get(cache_key)
if checksums is None:
checksums = {}
# find all music files in the music directory and all subfolders
- # we work bottom down, as-in we derive all info from the tracks
+ # we work bottom up, as-in we derive all info from the tracks
for entry in scantree(self.config.path):
# mtime is used as file checksum
# we don't want the whole sync to crash on one file so we catch all exceptions here
self.logger.exception("Error processing %s", entry.path)
- # save current checksum cache every 5 mins for large listings
- checksums[entry.path] = checksum
- if (time() - last_save) > 60:
- await self.mass.cache.set(cache_key, checksums)
- last_save = time()
# TODO: Handle deletions
await self.mass.cache.set(cache_key, checksums)
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
itempath = await self.get_filepath(prov_album_id)
+ if not self.exists(itempath):
+ query = f"SELECT * FROM tracks WHERE album LIKE '%\"{prov_album_id}\"%'"
+ query += f" and album LIKE '%\"{self.type.value}\"%'"
+ return await self.mass.music.tracks.get_db_items(query)
result = []
for entry in scantree(itempath):
# mtime is used as file checksum
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
- itempath = await self.get_filepath(prov_playlist_id)
- if not self.exists(itempath):
- raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
+ playlist_path = await self.get_filepath(prov_playlist_id)
+ if not self.exists(playlist_path):
+ raise MediaNotFoundError(f"playlist path does not exist: {playlist_path}")
+ checksum = self._get_checksum(playlist_path)
+ cache_key = f"{self.id}_playlist_tracks_{prov_playlist_id}"
+ if cache := await self.mass.cache.get(cache_key, checksum):
+ return [Track.from_dict(x) for x in cache]
index = 0
- async with self.open_file(itempath, "r") as _file:
+ async with self.open_file(playlist_path, "r") as _file:
for line in await _file.readlines():
line = urllib.parse.unquote(line.strip())
if line and not line.startswith("#"):
track.position = index
result.append(track)
index += 1
+ await self.mass.cache.set(cache_key, [x.to_dict() for x in result], checksum)
return result
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
prov_artist_id, self.type
)
result = []
- for entry in scantree(self.config.path):
+ for entry in scantree(itempath):
# mtime is used as file checksum
checksum = str(entry.stat().st_mtime)
if track := await self._parse_track(entry.path, checksum):
if genre := info.get("genre"):
artist.metadata.genres = set(split_items(genre))
if not artist.musicbrainz_id:
- for uid in info.get("uniqueid", []):
+ for uid in info.get("uniqueid") or []:
+ if not uid.get("@type"):
+ continue
if uid["@type"] == "MusicBrainzArtist":
artist.musicbrainz_id = uid["#text"]
# find local images
album.year = int(year)
if genre := info.get("genre"):
album.metadata.genres = set(split_items(genre))
- for uid in info.get("uniqueid", []):
+ for uid in info.get("uniqueid") or []:
+ if not uid.get("@type"):
+ continue
if uid["@type"] == "MusicBrainzReleaseGroup":
if not album.musicbrainz_id:
album.musicbrainz_id = uid["#text"]
"""Add item to library."""
result = None
if media_type == MediaType.ARTIST:
- result = await self._get_data(
- "favorite/create", {"artist_ids": prov_item_id}
- )
+ result = await self._get_data("favorite/create", artist_id=prov_item_id)
elif media_type == MediaType.ALBUM:
- result = await self._get_data(
- "favorite/create", {"album_ids": prov_item_id}
- )
+ result = await self._get_data("favorite/create", album_ids=prov_item_id)
elif media_type == MediaType.TRACK:
- result = await self._get_data(
- "favorite/create", {"track_ids": prov_item_id}
- )
+ result = await self._get_data("favorite/create", track_ids=prov_item_id)
elif media_type == MediaType.PLAYLIST:
result = await self._get_data(
- "playlist/subscribe", {"playlist_id": prov_item_id}
+ "playlist/subscribe", playlist_id=prov_item_id
)
return result
"""Remove item from library."""
result = None
if media_type == MediaType.ARTIST:
- result = await self._get_data(
- "favorite/delete", {"artist_ids": prov_item_id}
- )
+ result = await self._get_data("favorite/delete", artist_ids=prov_item_id)
elif media_type == MediaType.ALBUM:
- result = await self._get_data(
- "favorite/delete", {"album_ids": prov_item_id}
- )
+ result = await self._get_data("favorite/delete", album_ids=prov_item_id)
elif media_type == MediaType.TRACK:
- result = await self._get_data(
- "favorite/delete", {"track_ids": prov_item_id}
- )
+ result = await self._get_data("favorite/delete", track_ids=prov_item_id)
elif media_type == MediaType.PLAYLIST:
playlist = await self.get_playlist(prov_item_id)
if playlist.is_editable:
result = await self._get_data(
- "playlist/delete", {"playlist_id": prov_item_id}
+ "playlist/delete", playlist_id=prov_item_id
)
else:
result = await self._get_data(
- "playlist/unsubscribe", {"playlist_id": prov_item_id}
+ "playlist/unsubscribe", playlist_id=prov_item_id
)
return result