from music_assistant.controllers.music.playlists import PlaylistController
from music_assistant.controllers.music.radio import RadioController
from music_assistant.controllers.music.tracks import TracksController
-from music_assistant.helpers.database import TABLE_PLAYLOG, TABLE_TRACK_LOUDNESS
+from music_assistant.helpers.database import (
+ TABLE_CACHE,
+ TABLE_PLAYLOG,
+ TABLE_TRACK_LOUDNESS,
+)
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.uri import parse_uri
from music_assistant.models.config import MusicProviderConfig
await asyncio.sleep(3600 * schedule)
self.mass.create_task(do_sync())
+ # add job to cleanup old records from db
+ self.mass.add_job(
+ self._cleanup_library(),
+ "Cleanup removed items from database",
+ allow_duplicate=False,
+ )
@property
def provider_count(self) -> int:
raise SetupFailedError(
f"Setup failed of provider {provider.type.value}: {str(err)}"
) from err
+
+ async def _cleanup_library(self) -> None:
+ """Cleanup deleted items from library/database."""
+ prev_providers = await self.mass.cache.get("prov_ids", default=[])
+ cur_providers = list(self._providers.keys())
+ removed_providers = {x for x in prev_providers if x not in cur_providers}
+
+ async with self.mass.database.get_db() as db:
+ for prov_id in removed_providers:
+
+ # clean cache items from deleted provider(s)
+ await self.mass.database.delete_where_query(
+ TABLE_CACHE, f"key LIKE '%{prov_id}%'", db=db
+ )
+
+ # cleanup media items from db matched to deleted provider
+ for ctrl in (
+ self.mass.music.artists,
+ self.mass.music.albums,
+ self.mass.music.tracks,
+ self.mass.music.radio,
+ self.mass.music.playlists,
+ ):
+ prov_items = await ctrl.get_db_items_by_prov_id(
+ provider_id=prov_id, db=db
+ )
+ for item in prov_items:
+ await ctrl.remove_prov_mapping(item.item_id, prov_id, db=db)
+
+ await self.mass.cache.set("prov_ids", cur_providers)
return []
return await prov.get_album_tracks(item_id)
- async def add_db_item(self, album: Album, db: Optional[Db] = None) -> Album:
- """Add a new album record to the database."""
- assert album.provider_ids, f"Album {album.name} is missing provider id(s)"
- assert album.artist, f"Album {album.name} is missing artist"
+ async def add_db_item(self, item: Album, db: Optional[Db] = None) -> Album:
+ """Add a new record to the database."""
+ assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
+ assert item.artist, f"Album {item.name} is missing artist"
cur_item = None
async with self.mass.database.get_db(db) as db:
# always try to grab existing item by musicbrainz_id
- if album.musicbrainz_id:
- match = {"musicbrainz_id": album.musicbrainz_id}
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item and album.upc:
- match = {"upc": album.upc}
+ if not cur_item and item.upc:
+ match = {"upc": item.upc}
cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
if not cur_item:
# fallback to matching
- match = {"sort_name": album.sort_name}
+ match = {"sort_name": item.sort_name}
for row in await self.mass.database.get_rows(
self.db_table, match, db=db
):
row_album = Album.from_db_row(row)
- if compare_album(row_album, album):
+ if compare_album(row_album, item):
cur_item = row_album
break
if cur_item:
# update existing
- return await self.update_db_item(cur_item.item_id, album, db=db)
+ return await self.update_db_item(cur_item.item_id, item, db=db)
- # insert new album
- album_artists = await self._get_album_artists(album, cur_item, db=db)
+ # insert new item
+ album_artists = await self._get_album_artists(item, cur_item, db=db)
new_item = await self.mass.database.insert(
self.db_table,
{
- **album.to_db_row(),
+ **item.to_db_row(),
"artists": json_serializer(album_artists) or None,
},
db=db,
)
item_id = new_item["item_id"]
- self.logger.debug("added %s to database", album.name)
+ self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
async def update_db_item(
self,
item_id: int,
- album: Album,
+ item: Album,
overwrite: bool = False,
db: Optional[Db] = None,
) -> Album:
"""Update Album record in the database."""
- assert album.provider_ids, f"Album {album.name} is missing provider id(s)"
- assert album.artist, f"Album {album.name} is missing artist"
+ assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
+ assert item.artist, f"Album {item.name} is missing artist"
async with self.mass.database.get_db(db) as db:
cur_item = await self.get_db_item(item_id)
- album_artists = await self._get_album_artists(album, cur_item, db=db)
+ album_artists = await self._get_album_artists(item, cur_item, db=db)
if overwrite:
- metadata = album.metadata
- provider_ids = album.provider_ids
+ metadata = item.metadata
+ provider_ids = item.provider_ids
else:
- metadata = cur_item.metadata.update(album.metadata)
- provider_ids = {*cur_item.provider_ids, *album.provider_ids}
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
- if album.album_type != AlbumType.UNKNOWN:
- album_type = album.album_type
+ if item.album_type != AlbumType.UNKNOWN:
+ album_type = item.album_type
else:
album_type = cur_item.album_type
self.db_table,
{"item_id": item_id},
{
- "name": album.name if overwrite else cur_item.name,
- "sort_name": album.sort_name if overwrite else cur_item.sort_name,
- "version": album.version if overwrite else cur_item.version,
- "year": album.year or cur_item.year,
- "upc": album.upc or cur_item.upc,
+ "name": item.name if overwrite else cur_item.name,
+ "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "version": item.version if overwrite else cur_item.version,
+ "year": item.year or cur_item.year,
+ "upc": item.upc or cur_item.upc,
"album_type": album_type.value,
"artists": json_serializer(album_artists) or None,
"metadata": json_serializer(metadata),
},
db=db,
)
- self.logger.debug("updated %s in database: %s", album.name, item_id)
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
MassEvent(
return []
return await provider.get_artist_albums(item_id)
- async def add_db_item(self, artist: Artist, db: Optional[Db] = None) -> Artist:
- """Add a new artist record to the database."""
- assert artist.provider_ids, "Album is missing provider id(s)"
+ async def add_db_item(self, item: Artist, db: Optional[Db] = None) -> Artist:
+ """Add a new item record to the database."""
+ assert item.provider_ids, "Album is missing provider id(s)"
async with self.mass.database.get_db(db) as db:
# always try to grab existing item by musicbrainz_id
cur_item = None
- if artist.musicbrainz_id:
- match = {"musicbrainz_id": artist.musicbrainz_id}
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
if not cur_item:
# fallback to matching
# NOTE: we match an artist by name which could theoretically lead to collisions
# but the chance is so small it is not worth the additional overhead of grabbing
# the musicbrainz id upfront
- match = {"sort_name": artist.sort_name}
+ match = {"sort_name": item.sort_name}
for row in await self.mass.database.get_rows(
self.db_table, match, db=db
):
row_artist = Artist.from_db_row(row)
- if row_artist.sort_name == artist.sort_name:
+ if row_artist.sort_name == item.sort_name:
# just to be sure ?!
cur_item = row_artist
break
if cur_item:
# update existing
- return await self.update_db_item(cur_item.item_id, artist, db=db)
+ return await self.update_db_item(cur_item.item_id, item, db=db)
- # insert artist
+ # insert item
new_item = await self.mass.database.insert(
- self.db_table, artist.to_db_row(), db=db
+ self.db_table, item.to_db_row(), db=db
)
item_id = new_item["item_id"]
- self.logger.debug("added %s to database", artist.name)
+ self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
async def update_db_item(
self,
item_id: int,
- artist: Artist,
+ item: Artist,
overwrite: bool = False,
db: Optional[Db] = None,
) -> Artist:
"""Update Artist record in the database."""
cur_item = await self.get_db_item(item_id)
if overwrite:
- metadata = artist.metadata
- provider_ids = artist.provider_ids
+ metadata = item.metadata
+ provider_ids = item.provider_ids
else:
- metadata = cur_item.metadata.update(artist.metadata)
- provider_ids = {*cur_item.provider_ids, *artist.provider_ids}
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
async with self.mass.database.get_db(db) as db:
await self.mass.database.update(
self.db_table,
{"item_id": item_id},
{
- "name": artist.name if overwrite else cur_item.name,
- "sort_name": artist.sort_name if overwrite else cur_item.sort_name,
- "musicbrainz_id": artist.musicbrainz_id or cur_item.musicbrainz_id,
+ "name": item.name if overwrite else cur_item.name,
+ "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "musicbrainz_id": item.musicbrainz_id or cur_item.musicbrainz_id,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
db=db,
)
- self.logger.debug("updated %s in database: %s", artist.name, item_id)
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
MassEvent(
)
)
- async def add_db_item(
- self, playlist: Playlist, db: Optional[Db] = None
- ) -> Playlist:
- """Add a new playlist record to the database."""
+ async def add_db_item(self, item: Playlist, db: Optional[Db] = None) -> Playlist:
+ """Add a new record to the database."""
async with self.mass.database.get_db(db) as db:
- match = {"name": playlist.name, "owner": playlist.owner}
+ match = {"name": item.name, "owner": item.owner}
if cur_item := await self.mass.database.get_row(
self.db_table, match, db=db
):
# update existing
- return await self.update_db_item(cur_item["item_id"], playlist, db=db)
+ return await self.update_db_item(cur_item["item_id"], item, db=db)
- # insert new playlist
+ # insert new item
new_item = await self.mass.database.insert(
- self.db_table, playlist.to_db_row(), db=db
+ self.db_table, item.to_db_row(), db=db
)
item_id = new_item["item_id"]
- self.logger.debug("added %s to database", playlist.name)
+ self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
async def update_db_item(
self,
item_id: int,
- playlist: Playlist,
+ item: Playlist,
overwrite: bool = False,
db: Optional[Db] = None,
) -> Playlist:
cur_item = await self.get_db_item(item_id, db=db)
if overwrite:
- metadata = playlist.metadata
- provider_ids = playlist.provider_ids
+ metadata = item.metadata
+ provider_ids = item.provider_ids
else:
- metadata = cur_item.metadata.update(playlist.metadata)
- provider_ids = {*cur_item.provider_ids, *playlist.provider_ids}
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
await self.mass.database.update(
self.db_table,
{"item_id": item_id},
{
- "name": playlist.name,
- "sort_name": playlist.sort_name,
- "owner": playlist.owner,
- "is_editable": playlist.is_editable,
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "owner": item.owner,
+ "is_editable": item.is_editable,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
db=db,
)
- self.logger.debug("updated %s in database: %s", playlist.name, item_id)
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
MassEvent(
import os
import urllib.parse
from contextlib import asynccontextmanager
+from pathlib import Path
from typing import AsyncGenerator, List, Optional, Set, Tuple
import aiofiles
FALLBACK_ARTIST = "Various Artists"
SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ", " / ")
+CONTENT_TYPE_EXT = {
+ # map of supported file extensions (mapped to ContentType)
+ "mp3": ContentType.MP3,
+ "m4a": ContentType.M4A,
+ "flac": ContentType.FLAC,
+ "wav": ContentType.WAV,
+ "ogg": ContentType.OGG,
+ "wma": ContentType.WMA,
+}
async def scantree(path: str) -> AsyncGenerator[os.DirEntry, None]:
) -> None:
"""Run library sync for this provider."""
cache_key = f"{self.id}.checksums"
- prev_checksums = await self.mass.cache.get(cache_key)
+ prev_checksums = await self.mass.cache.get(cache_key, SCHEMA_VERSION)
if prev_checksums is None:
prev_checksums = {}
# find all music files in the music directory and all subfolders
self.logger.exception("Error processing %s", entry.path)
# save checksums for next sync
- await self.mass.cache.set(cache_key, cur_checksums)
+ await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
# work out deletions
deleted_files = set(prev_checksums.keys()) - set(cur_checksums.keys())
if db_item := await self.mass.music.tracks.get_db_item_by_prov_id(
item_id, self.type
):
- # remove provider mapping from track
- db_item.provider_ids = {
- x for x in db_item.provider_ids if x.item_id != item_id
- }
- if not db_item.provider_ids:
- # track has no more provider_ids left, it is completely deleted
- await self.mass.music.tracks.delete_db_item(db_item.item_id)
- else:
- await self.mass.music.tracks.update_db_item(
- db_item.item_id, db_item
- )
+ await self.mass.music.tracks.remove_prov_mapping(
+ db_item.item_id, self.id
+ )
+ # gather artists(s) attached to this track
for artist in db_item.artists:
- artists.add(artist)
+ artists.add(artist.item_id)
+ # gather album and albumartist(s) attached to this track
if db_item.album:
- albums.add(db_item.album)
- # check if artists are deleted
- for artist in artists:
- if db_item := await self.mass.music.artists.get_db_item_by_prov_id(
- artist.item_id, artist.provider
- ):
- if len(db_item.provider_ids) > 1:
- continue
- artist_tracks = await self.mass.music.artists.toptracks(
- db_item.item_id, db_item.provider
- )
- if artist_tracks:
- continue
- artist_albums = await self.mass.music.artists.albums(
- db_item.item_id, db_item.provider
- )
- if artist_albums:
- continue
- # artist has no more items attached, delete it
- await self.mass.music.artists.delete_db_item(db_item.item_id)
+ albums.add(db_item.album.item_id)
+ for artist in db_item.album.artists:
+ artists.add(artist.item_id)
# check if albums are deleted
- for album in albums:
- if db_item := await self.mass.music.albums.get_db_item_by_prov_id(
- album.item_id, album.provider
- ):
- if len(db_item.provider_ids) > 1:
- continue
- album_tracks = await self.mass.music.albums.tracks(
- db_item.item_id, db_item.provider
- )
- if album_tracks:
- continue
- # album has no more tracks attached, delete it
- await self.mass.music.albums.delete_db_item(db_item.item_id)
+ for album_id in albums:
+ album = await self.mass.music.albums.get_db_item(album_id)
+ prov_album_id = next(
+ x.item_id for x in album.provider_ids if x.prov_id == self.id
+ )
+ album_tracks = await self.get_album_tracks(prov_album_id)
+ if album_tracks:
+ continue
+ # album has no more tracks attached, delete prov mapping
+ await self.mass.music.albums.remove_prov_mapping(album_id)
+ # check if artists are deleted
+ for artist_id in artists:
+ artist = await self.mass.music.artists.get_db_item(artist_id)
+ prov_artist_id = next(
+ x.item_id for x in artist.provider_ids if x.prov_id == self.id
+ )
+ artist_tracks = await self.get_artist_toptracks(prov_artist_id)
+ if artist_tracks:
+ continue
+ artist_albums = await self.get_artist_albums(prov_artist_id)
+ if artist_albums:
+ continue
+ # artist has no more tracks attached, delete prov mapping
+ await self.mass.music.artists.remove_prov_mapping(artist_id)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
itempath = await self.get_filepath(MediaType.ARTIST, prov_artist_id)
if await self.exists(itempath):
+ # if path exists on disk allow parsing full details to allow refresh of metadata
return await self._parse_artist(artist_path=itempath)
return await self.mass.music.artists.get_db_item_by_prov_id(
provider_item_id=prov_artist_id, provider_id=self.id
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- itempath = await self.get_filepath(MediaType.ALBUM, prov_album_id)
- if await self.exists(itempath):
- return await self._parse_album(album_path=itempath)
- return await self.mass.music.albums.get_db_item_by_prov_id(
+ db_album = await self.mass.music.albums.get_db_item_by_prov_id(
provider_item_id=prov_album_id, provider_id=self.id
)
+ if db_album is None:
+ raise MediaNotFoundError(f"Album not found: {prov_album_id}")
+ itempath = await self.get_filepath(MediaType.ALBUM, prov_album_id)
+ if await self.exists(itempath):
+ # if path exists on disk allow parsing full details to allow refresh of metadata
+ return await self._parse_album(None, itempath, db_album.artists)
+ return db_album
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
itempath = await self.get_filepath(MediaType.TRACK, prov_track_id)
- if not await self.exists(itempath):
- raise MediaNotFoundError(f"Track path does not exist: {itempath}")
return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
cache_key = f"playlist_{self.id}_tracks_{prov_playlist_id}"
if cache := await self.mass.cache.get(cache_key, checksum):
return [Track.from_dict(x) for x in cache]
+ playlist_base_path = Path(playlist_path).parent
index = 0
try:
async with self.open_file(playlist_path, "r") as _file:
for line in await _file.readlines():
line = urllib.parse.unquote(line.strip())
if line and not line.startswith("#"):
- if track := await self._parse_track_from_uri(line):
+ # TODO: add support for .pls playlist files
+ if track := await self._parse_playlist_line(
+ line, playlist_base_path
+ ):
track.position = index
result.append(track)
index += 1
await self.mass.cache.set(cache_key, [x.to_dict() for x in result], checksum)
return result
+ async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
+ """Try to parse a track from a playlist line."""
+ if "://" in line:
+ # track is uri from external provider?
+ try:
+ return await self.mass.music.get_item_by_uri(line)
+ except MusicAssistantError as err:
+ self.logger.warning(
+ "Could not parse uri %s to track: %s", line, str(err)
+ )
+ return None
+ # try to treat uri as filename
+ if await self.exists(line):
+ return await self._parse_track(line)
+ rel_path = os.path.join(playlist_path, line)
+ if await self.exists(rel_path):
+ return await self._parse_track(rel_path)
+ return None
+
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
# filesystem items are always stored in db so we can query the database
if not await self.exists(track_path):
raise MediaNotFoundError(f"Track path does not exist: {track_path}")
- track_item_id = self._get_item_id(track_path)
+ if "." not in track_path or track_path.startswith("."):
+ # skip system files and files without extension
+ return None
- if not TinyTag.is_supported(track_path):
+ filename_base, ext = Path(track_path).name.rsplit(".", 1)
+ content_type = CONTENT_TYPE_EXT.get(ext.lower())
+ if content_type is None:
+ # unsupported file extension
return None
+ track_item_id = self._get_item_id(track_path)
+
# parse ID3 tags with TinyTag
def parse_tags():
return TinyTag.get(track_path, image=True, ignore_errors=True)
tags = await self.mass.loop.run_in_executor(None, parse_tags)
- # prefer title from tag, fallback to filename
- if tags.title:
- track_title = tags.title
- else:
- ext = track_path.split(".")[-1]
- track_title = track_path.split(os.sep)[-1]
- track_title = track_title.replace(f".{ext}", "").replace("_", " ")
+ # prefer title from tags, fallback to filename
+ if not tags.title or not tags.artist:
self.logger.warning(
- "%s is missing ID3 tag [title], using filename as fallback", track_path
+ "%s is missing ID3 tags, using filename as fallback", track_path
)
+ filename_parts = filename_base.split(" - ", 1)
+ if len(filename_parts) == 2:
+ tags.artist = tags.artist or filename_parts[0]
+ tags.title = tags.title or filename_parts[1]
+ else:
+ tags.artist = tags.artist or FALLBACK_ARTIST
+ tags.title = tags.title or filename_base
- name, version = parse_title_and_version(track_title)
+ name, version = parse_title_and_version(tags.title)
track = Track(
item_id=track_item_id,
provider=self.type,
)
track.album = await self._parse_album(
- name=tags.album,
- album_path=album_folder,
+ tags.album,
+ album_folder,
artists=album_artists,
in_library=True,
)
self.logger.warning("%s is missing ID3 tag [album]", track_path)
# track artist(s)
- if tags.artist == tags.albumartist:
+ if tags.artist == tags.albumartist and track.album:
track.artists = track.album.artists
else:
# Parse track artist(s) from artist string using common splitters used in ID3 tags
track.metadata.lyrics = tags.extra["lyrics"]
quality_details = ""
- if track_path.endswith(".flac"):
+ if content_type == ContentType.FLAC:
# TODO: get bit depth
quality = MediaQuality.FLAC_LOSSLESS
if tags.samplerate > 192000:
async def _parse_album(
self,
- name: Optional[str] = None,
- album_path: Optional[str] = None,
- artists: List[Artist] = None,
+ name: Optional[str],
+ album_path: Optional[str],
+ artists: List[Artist],
in_library: bool = True,
) -> Album | None:
"""Lookup metadata in Album folder."""
assert (name or album_path) and artists
- # create fake path
- album_path = os.path.join(self.config.path, artists[0].name, name)
+ if not album_path:
+ # create fake path
+ album_path = os.path.join(self.config.path, artists[0].name, name)
album_item_id = self._get_item_id(album_path)
if not name:
playlist.owner = self._attr_name
return playlist
- async def _parse_track_from_uri(self, uri: str):
- """Try to parse a track from an uri found in playlist."""
- if "://" in uri:
- # track is uri from external provider?
- try:
- return await self.mass.music.get_item_by_uri(uri)
- except MusicAssistantError as err:
- self.logger.warning(
- "Could not parse uri %s to track: %s", uri, str(err)
- )
- return None
- # try to treat uri as filename
- if self.config.path not in uri:
- uri = os.path.join(self.config.path, uri)
- try:
- return await self._parse_track(uri)
- except MediaNotFoundError:
- return None
-
async def exists(self, file_path: str) -> bool:
"""Return bool is this FileSystem musicprovider has given file/dir."""
if not file_path:
await self.mass.metadata.get_radio_metadata(item)
return await self.add_db_item(item)
- async def add_db_item(self, radio: Radio, db: Optional[Db] = None) -> Radio:
- """Add a new radio record to the database."""
- assert radio.provider_ids
+ async def add_db_item(self, item: Radio, db: Optional[Db] = None) -> Radio:
+ """Add a new item record to the database."""
+ assert item.provider_ids
async with self.mass.database.get_db(db) as db:
- match = {"name": radio.name}
+ match = {"name": item.name}
if cur_item := await self.mass.database.get_row(
self.db_table, match, db=db
):
# update existing
- return await self.update_db_item(cur_item["item_id"], radio, db=db)
+ return await self.update_db_item(cur_item["item_id"], item, db=db)
- # insert new radio
+ # insert new item
new_item = await self.mass.database.insert(
- self.db_table, radio.to_db_row(), db=db
+ self.db_table, item.to_db_row(), db=db
)
item_id = new_item["item_id"]
- self.logger.debug("added %s to database", radio.name)
+ self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
async def update_db_item(
self,
item_id: int,
- radio: Radio,
+ item: Radio,
overwrite: bool = False,
db: Optional[Db] = None,
) -> Radio:
async with self.mass.database.get_db(db) as db:
cur_item = await self.get_db_item(item_id, db=db)
if overwrite:
- metadata = radio.metadata
- provider_ids = radio.provider_ids
+ metadata = item.metadata
+ provider_ids = item.provider_ids
else:
- metadata = cur_item.metadata.update(radio.metadata)
- provider_ids = {*cur_item.provider_ids, *radio.provider_ids}
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
match = {"item_id": item_id}
await self.mass.database.update(
self.db_table,
match,
{
- "name": radio.name,
- "sort_name": radio.sort_name,
+ "name": item.name,
+ "sort_name": item.sort_name,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
db=db,
)
- self.logger.debug("updated %s in database: %s", radio.name, item_id)
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
MassEvent(
provider.name,
)
- async def add_db_item(self, track: Track, db: Optional[Db] = None) -> Track:
- """Add a new track record to the database."""
- assert track.artists, "Track is missing artist(s)"
- assert track.provider_ids, "Track is missing provider id(s)"
+ async def add_db_item(self, item: Track, db: Optional[Db] = None) -> Track:
+ """Add a new item record to the database."""
+ assert item.artists, "Track is missing artist(s)"
+ assert item.provider_ids, "Track is missing provider id(s)"
cur_item = None
async with self.mass.database.get_db(db) as db:
# always try to grab existing item by external_id
- if track.musicbrainz_id:
- match = {"musicbrainz_id": track.musicbrainz_id}
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item and track.isrc:
- match = {"isrc": track.isrc}
+ if not cur_item and item.isrc:
+ match = {"isrc": item.isrc}
cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
if not cur_item:
# fallback to matching
- match = {"sort_name": track.sort_name}
+ match = {"sort_name": item.sort_name}
for row in await self.mass.database.get_rows(
self.db_table, match, db=db
):
row_track = Track.from_db_row(row)
- if compare_track(row_track, track):
+ if compare_track(row_track, item):
cur_item = row_track
break
if cur_item:
# update existing
- return await self.update_db_item(cur_item.item_id, track, db=db)
+ return await self.update_db_item(cur_item.item_id, item, db=db)
- # no existing match found: insert new track
- track_artists = await self._get_track_artists(track, db=db)
- track_albums = await self._get_track_albums(track, db=db)
+ # no existing match found: insert new item
+ track_artists = await self._get_track_artists(item, db=db)
+ track_albums = await self._get_track_albums(item, db=db)
new_item = await self.mass.database.insert(
self.db_table,
{
- **track.to_db_row(),
+ **item.to_db_row(),
"artists": json_serializer(track_artists),
"albums": json_serializer(track_albums),
},
)
item_id = new_item["item_id"]
# return created object
- self.logger.debug("added %s to database: %s", track.name, item_id)
+ self.logger.debug("added %s to database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
MassEvent(
async def update_db_item(
self,
item_id: int,
- track: Track,
+ item: Track,
overwrite: bool = False,
db: Optional[Db] = None,
) -> Track:
async with self.mass.database.get_db(db) as db:
cur_item = await self.get_db_item(item_id, db=db)
if overwrite:
- provider_ids = track.provider_ids
+ provider_ids = item.provider_ids
else:
- provider_ids = {*cur_item.provider_ids, *track.provider_ids}
- metadata = cur_item.metadata.update(track.metadata, overwrite)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
+ metadata = cur_item.metadata.update(item.metadata, overwrite)
- # we store a mapping to artists/albums on the track for easier access/listings
- track_artists = await self._get_track_artists(cur_item, track, db=db)
- track_albums = await self._get_track_albums(cur_item, track, db=db)
+ # we store a mapping to artists/albums on the item for easier access/listings
+ track_artists = await self._get_track_artists(cur_item, item, db=db)
+ track_albums = await self._get_track_albums(cur_item, item, db=db)
await self.mass.database.update(
self.db_table,
{"item_id": item_id},
{
- "name": track.name if overwrite else cur_item.name,
- "sort_name": track.sort_name if overwrite else cur_item.sort_name,
- "version": track.version if overwrite else cur_item.version,
- "duration": track.duration if overwrite else cur_item.duration,
+ "name": item.name if overwrite else cur_item.name,
+ "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "version": item.version if overwrite else cur_item.version,
+ "duration": item.duration if overwrite else cur_item.duration,
"artists": json_serializer(track_artists),
"albums": json_serializer(track_albums),
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
- "isrc": track.isrc or cur_item.isrc,
+ "isrc": item.isrc or cur_item.isrc,
},
db=db,
)
- self.logger.debug("updated %s in database: %s", track.name, item_id)
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
MassEvent(
"""Compare two track items and return True if they match."""
if left_track is None or right_track is None:
return False
- # album is required for track linking
- if left_track.album is None or right_track.album is None:
- return False
# return early on exact item_id match
if compare_item_id(left_track, right_track):
return True
if left_track.musicbrainz_id == right_track.musicbrainz_id:
# musicbrainz_id is always 100% accurate match
return True
+ # album is required for track linking
+ if left_track.album is None or right_track.album is None:
+ return False
# track name and version must match
if not left_track.sort_name:
left_track.sort_name = create_clean_string(left_track.name)
from __future__ import annotations
from contextlib import asynccontextmanager
-from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional
+from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Mapping, Optional
from databases import Database as Db
from music_assistant.mass import MusicAssistant
-SCHEMA_VERSION = 16
+SCHEMA_VERSION = 17
TABLE_TRACK_LOUDNESS = "track_loudness"
TABLE_PLAYLOG = "playlog"
async with self.get_db(db) as _db:
return await _db.fetch_all(query, params)
+ async def iterate_rows(
+ self,
+ table: str,
+ match: dict = None,
+ db: Optional[Db] = None,
+ ) -> AsyncGenerator[Mapping, None]:
+ """Iterate rows for given table."""
+ async with self.get_db(db) as _db:
+ sql_query = f"SELECT * FROM {table}"
+ if match is not None:
+ sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
+ async for row in _db.iterate(sql_query, match):
+ yield row
+
async def search(
self, table: str, search: str, column: str = "name", db: Optional[Db] = None
) -> List[Mapping]:
sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
await _db.execute(sql_query, match)
+ async def delete_where_query(
+ self, table: str, query: str, db: Optional[Db] = None
+ ) -> None:
+ """Delete data in given table using given where clausule."""
+ async with self.get_db(db) as _db:
+ sql_query = f"DELETE FROM {table} WHERE {query}"
+ await _db.execute(sql_query)
+
async def _migrate(self):
"""Perform database migration actions if needed."""
async with self.get_db() as db:
from abc import ABCMeta, abstractmethod
from time import time
-from typing import TYPE_CHECKING, Generic, List, Optional, Tuple, TypeVar
+from typing import (
+ TYPE_CHECKING,
+ AsyncGenerator,
+ Generic,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+)
from databases import Database as Db
"""Add item to local db and return the database item."""
raise NotImplementedError
+ @abstractmethod
+ async def add_db_item(self, item: ItemCls, db: Optional[Db] = None) -> ItemCls:
+ """Add a new record for this mediatype to the database."""
+ raise NotImplementedError
+
+ @abstractmethod
+ async def update_db_item(
+ self,
+ item_id: int,
+ item: ItemCls,
+ overwrite: bool = False,
+ db: Optional[Db] = None,
+ ) -> ItemCls:
+ """Update record in the database, merging data."""
+ raise NotImplementedError
+
async def library(self) -> List[ItemCls]:
"""Get all in-library items."""
match = {"in_library": True}
return await self.get_db_items(query, db=db)
+ async def iterate_db_items(
+ self,
+ db: Optional[Db] = None,
+ ) -> AsyncGenerator[ItemCls, None]:
+ """Iterate all records from database."""
+ async for db_row in self.mass.database.iterate_rows(self.db_table, db=db):
+ yield self.item_cls.from_db_row(db_row)
+
async def set_db_library(
self, item_id: int, in_library: bool, db: Optional[Db] = None
) -> None:
)
return item
+ async def remove_prov_mapping(
+ self, item_id: int, prov_id: str, db: Optional[Db] = None
+ ) -> None:
+ """Remove provider id(s) from item."""
+ async with self.mass.database.get_db(db) as db:
+ if db_item := await self.get_db_item(item_id, db=db):
+ db_item.provider_ids = {
+ x for x in db_item.provider_ids if x.prov_id != prov_id
+ }
+ if not db_item.provider_ids:
+ # item has no more provider_ids left, it is completely deleted
+ await self.delete_db_item(db_item.item_id)
+ return
+ await self.update_db_item(
+ db_item.item_id, db_item, overwrite=True, db=db
+ )
+
+ self.logger.debug("removed provider %s from item id %s", prov_id, item_id)
+
async def delete_db_item(self, item_id: int, db: Optional[Db] = None) -> None:
"""Delete record from the database."""
async with self.mass.database.get_db(db) as db:
{"item_id": int(item_id)},
db=db,
)
- # NOTE: this does not delete any references to this item in other records
+ # NOTE: this does not delete any references to this item in other records!
self.logger.debug("deleted item with id %s from database", item_id)