import statistics
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
-from databases import Database as Db
-
from music_assistant.controllers.music.albums import AlbumsController
from music_assistant.controllers.music.artists import ArtistsController
from music_assistant.controllers.music.playlists import PlaylistController
from music_assistant.controllers.music.radio import RadioController
from music_assistant.controllers.music.tracks import TracksController
-from music_assistant.helpers.database import (
- TABLE_PLAYLOG,
- TABLE_PROV_MAPPINGS,
- TABLE_TRACK_LOUDNESS,
-)
+from music_assistant.helpers.database import TABLE_PLAYLOG, TABLE_TRACK_LOUDNESS
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.uri import parse_uri
from music_assistant.models.config import MusicProviderConfig
from music_assistant.models.enums import MediaType, ProviderType
from music_assistant.models.errors import MusicAssistantError, SetupFailedError
-from music_assistant.models.media_items import (
- MediaItem,
- MediaItemProviderId,
- MediaItemType,
-)
+from music_assistant.models.media_items import MediaItem, MediaItemType
from music_assistant.models.provider import MusicProvider
from .providers.filesystem import FileSystemProvider
provider_item_id, provider=provider, provider_id=provider_id
)
- async def get_provider_mapping(
- self,
- media_type: Optional[MediaType] = None,
- provider_item_id: Optional[str] = None,
- provider: Optional[ProviderType] = None,
- provider_id: Optional[str] = None,
- url: Optional[str] = None,
- db: Optional[Db] = None,
- return_key: str = "item_id",
- ) -> int | None:
- """Lookup database id for media item from provider id."""
- match = {}
- if media_type is not None:
- match["media_type"] = media_type.value
- if provider_item_id is not None:
- match["prov_item_id"] = provider_item_id
- if provider is not None:
- match["prov_type"] = provider.value
- if provider_id is not None:
- match["prov_id"] = provider_id
- if url is not None:
- match["url"] = url
- if result := await self.mass.database.get_row(
- TABLE_PROV_MAPPINGS,
- match,
- db=db,
- ):
- return result[return_key] if return_key else result
- return None
-
- async def get_provider_mappings(
- self,
- media_type: MediaType,
- provider: Optional[ProviderType] = None,
- provider_id: Optional[str] = None,
- db: Optional[Db] = None,
- ) -> List[int]:
- """Lookup all database id's for media type for given provider id."""
- match = {
- "media_type": media_type.value,
- }
- if provider is not None:
- match["prov_type"] = provider.value
- if provider_id is not None:
- match["prov_id"] = provider_id
- if result := await self.mass.database.get_rows(
- TABLE_PROV_MAPPINGS,
- match,
- db=db,
- ):
- return [x["item_id"] for x in result]
- return None
-
- async def set_provider_mappings(
- self,
- item_id: int,
- media_type: MediaType,
- prov_ids: List[MediaItemProviderId],
- db: Optional[Db] = None,
- ):
- """Store provider ids for media item to database."""
- async with self.mass.database.get_db(db) as db:
- # make sure that existing items are deleted first
- await self.mass.database.delete(
- TABLE_PROV_MAPPINGS,
- {"item_id": int(item_id), "media_type": media_type.value},
- db=db,
- )
- for prov_id in prov_ids:
- await self.mass.database.insert(
- TABLE_PROV_MAPPINGS,
- {
- "item_id": item_id,
- "media_type": media_type.value,
- "prov_item_id": prov_id.item_id,
- "prov_id": prov_id.prov_id,
- "prov_type": prov_id.prov_type.value,
- "quality": prov_id.quality.value if prov_id.quality else None,
- "details": prov_id.details,
- "url": prov_id.url,
- },
- allow_replace=True,
- db=db,
- )
-
async def refresh_items(self, items: List[MediaItem]) -> None:
"""
Refresh MediaItems to force retrieval of full info and matches.
db=db,
)
item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ALBUM, album.provider_ids, db=db
- )
self.logger.debug("added %s to database", album.name)
# return created object
return await self.get_db_item(item_id, db=db)
},
db=db,
)
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ALBUM, provider_ids, db=db
- )
self.logger.debug("updated %s in database: %s", album.name, item_id)
return await self.get_db_item(item_id, db=db)
self.db_table, artist.to_db_row(), db=db
)
item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ARTIST, artist.provider_ids, db=db
- )
self.logger.debug("added %s to database", artist.name)
# return created object
return await self.get_db_item(item_id, db=db)
},
db=db,
)
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ARTIST, provider_ids, db=db
- )
self.logger.debug("updated %s in database: %s", artist.name, item_id)
return await self.get_db_item(item_id, db=db)
self.db_table, playlist.to_db_row(), db=db
)
item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.PLAYLIST, playlist.provider_ids, db=db
- )
self.logger.debug("added %s to database", playlist.name)
# return created object
return await self.get_db_item(item_id, db=db)
},
db=db,
)
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.PLAYLIST, provider_ids, db=db
- )
self.logger.debug("updated %s in database: %s", playlist.name, item_id)
db_item = await self.get_db_item(item_id, db=db)
self.mass.signal_event(
"""Recursively yield DirEntry objects for given directory."""
loop = asyncio.get_running_loop()
for entry in await loop.run_in_executor(None, os.scandir, path):
- if entry.is_dir(follow_symlinks=False):
+ if await loop.run_in_executor(None, entry.is_dir):
async for subitem in scantree(entry.path):
yield subitem
else:
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
# filesystem items are always stored in db so we can query the database
- db_id = await self.mass.music.get_provider_mapping(
- MediaType.ALBUM, provider=self.type, provider_item_id=prov_album_id
+ db_album = await self.mass.music.albums.get_db_item_by_prov_id(
+ prov_album_id, provider_id=self.id
)
- if db_id is None:
+ if db_album is None:
raise MediaNotFoundError(f"Album not found: {prov_album_id}")
- query = f"SELECT * FROM tracks WHERE album LIKE '%\"{db_id}\"%'"
+ # TODO: adjust to json query instead of text search
+ query = f"SELECT * FROM tracks WHERE album LIKE '%\"{db_album.item_id}\"%'"
query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
- return await self.mass.music.tracks.get_db_items(query)
+ result = []
+ for track in await self.mass.music.tracks.get_db_items(query):
+ track.album = db_album
+ result.append(track)
+ return result
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
# filesystem items are always stored in db so we can query the database
- db_id = await self.mass.music.get_provider_mapping(
- MediaType.ARTIST, provider=self.type, provider_item_id=prov_artist_id
+ db_artist = await self.mass.music.artists.get_db_item_by_prov_id(
+ prov_artist_id, provider_id=self.id
)
- if db_id is None:
+ if db_artist is None:
raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
- query = f"SELECT * FROM albums WHERE artist LIKE '%\"{db_id}\"%'"
+ # TODO: adjust to json query instead of text search
+ query = f"SELECT * FROM albums WHERE artist LIKE '%\"{db_artist.item_id}\"%'"
query += f" AND provider_ids like '%\"{self.type.value}\"%'"
return await self.mass.music.albums.get_db_items(query)
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of all tracks as we have no clue about preference."""
# filesystem items are always stored in db so we can query the database
- db_id = await self.mass.music.get_provider_mapping(
- MediaType.ARTIST, provider=self.type, provider_item_id=prov_artist_id
+ db_artist = await self.mass.music.artists.get_db_item_by_prov_id(
+ prov_artist_id, provider_id=self.id
)
- if db_id is None:
+ if db_artist is None:
raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
- query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{db_id}\"%'"
+ # TODO: adjust to json query instead of text search
+ query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
query += f" AND provider_ids like '%\"{self.type.value}\"%'"
return await self.mass.music.tracks.get_db_items(query)
return await self.mass.loop.run_in_executor(None, _get_data)
- async def get_filepath(self, media_type: MediaType, item_id: str) -> str | None:
+ async def get_filepath(
+ self, media_type: MediaType, prov_item_id: str
+ ) -> str | None:
"""Get full filepath on disk for item_id."""
- if item_id is None:
+ if prov_item_id is None:
return None # guard
- file_path = await self.mass.music.get_provider_mapping(
- media_type, provider_id=self.id, provider_item_id=item_id, return_key="url"
+ # funky sql queries go here ;-)
+ query = (
+ "SELECT json_extract(json_each.value, '$.url') as url FROM :table, "
+ "json_each(provider_ids) WHERE "
+ "json_extract(json_each.value, '$.prov_id') = :prov_id"
+ "AND json_extract(json_each.value, '$.item_id') = :item_id"
+ )
+ params = {
+ "table": f"{media_type.value}s",
+ "prov_id": self.id,
+ "item_id": prov_item_id,
+ }
+ file_path = next(
+ await self.mass.database.get_rows_from_query(query, params), None
)
if file_path is not None:
# ensure we have a full path and not relative
self.db_table, radio.to_db_row(), db=db
)
item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.RADIO, radio.provider_ids, db=db
- )
self.logger.debug("added %s to database", radio.name)
# return created object
return await self.get_db_item(item_id, db=db)
},
db=db,
)
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.RADIO, provider_ids, db=db
- )
self.logger.debug("updated %s in database: %s", radio.name, item_id)
return await self.get_db_item(item_id, db=db)
from music_assistant.models.enums import EventType, MediaType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
-from music_assistant.models.media_items import Artist, ItemMapping, Track
+from music_assistant.models.media_items import Album, Artist, ItemMapping, Track
class TracksController(MediaControllerBase[Track]):
assert track.provider_ids, "Track is missing provider id(s)"
cur_item = None
async with self.mass.database.get_db(db) as db:
- track_album = await self._get_track_album(track)
# always try to grab existing item by external_id
if track.musicbrainz_id:
# no existing match found: insert new track
track_artists = await self._get_track_artists(track, db=db)
+ track_albums = await self._get_track_albums(track, db=db)
new_item = await self.mass.database.insert(
self.db_table,
{
**track.to_db_row(),
"artists": json_serializer(track_artists),
- "album": json_serializer(track_album) or None,
+ "albums": json_serializer(track_albums),
},
db=db,
)
item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.TRACK, track.provider_ids, db=db
- )
-
# return created object
self.logger.debug("added %s to database: %s", track.name, item_id)
return await self.get_db_item(item_id, db=db)
"""Update Track record in the database, merging data."""
async with self.mass.database.get_db(db) as db:
cur_item = await self.get_db_item(item_id, db=db)
- track_album = await self._get_track_album(cur_item, track)
if overwrite:
provider_ids = track.provider_ids
else:
provider_ids = {*cur_item.provider_ids, *track.provider_ids}
metadata = cur_item.metadata.update(track.metadata, overwrite)
- # we store a mapping to artists on the track for easier access/listings
- track_artists = await self._get_track_artists(cur_item, track)
+ # we store a mapping to artists/albums on the track for easier access/listings
+ track_artists = await self._get_track_artists(cur_item, track, db=db)
+ track_albums = await self._get_track_albums(cur_item, track, db=db)
await self.mass.database.update(
self.db_table,
{"item_id": item_id},
"version": track.version if overwrite else cur_item.version,
"duration": track.duration if overwrite else cur_item.duration,
"artists": json_serializer(track_artists),
- "album": json_serializer(track_album),
+ "albums": json_serializer(track_albums),
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
"isrc": track.isrc or cur_item.isrc,
},
db=db,
)
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.TRACK, provider_ids, db=db
- )
-
self.logger.debug("updated %s in database: %s", track.name, item_id)
return await self.get_db_item(item_id, db=db)
# use intermediate set to clear out duplicates
return list({await self._get_artist_mapping(x, db=db) for x in track_artists})
- async def _get_track_album(
+ async def _get_track_albums(
self,
base_track: Track,
upd_track: Optional[Track] = None,
db: Optional[Db] = None,
- ) -> ItemMapping | None:
- """Extract (database) track album as ItemMapping."""
- for track in (upd_track, base_track):
- if not track or not track.album:
- continue
-
- if track.album.provider == ProviderType.DATABASE:
- if isinstance(track.album, ItemMapping):
- return track.album
- return ItemMapping.from_item(track.album)
+ ) -> List[ItemMapping]:
+ """Extract all (unique) artists of track as ItemMapping."""
+ track_albums = []
+ if upd_track and upd_track.album:
+ track_albums.append(upd_track.album)
+ if base_track.album and base_track.album not in track_albums:
+ track_albums.append(base_track.album)
+ for item in base_track.albums:
+ if item not in track_albums:
+ track_albums.append(item)
+ if upd_track:
+ for item in upd_track.albums:
+ if item not in track_albums:
+ track_albums.append(item)
+ # use intermediate set to clear out duplicates
+ return [await self._get_album_mapping(x, db=db) for x in track_albums]
- if db_album := await self.mass.music.albums.get_db_item_by_prov_id(
- track.album.item_id, provider=track.album.provider, db=db
- ):
- return ItemMapping.from_item(db_album)
+ async def _get_album_mapping(
+ self, album: Union[Album, ItemMapping], db: Optional[Db] = None
+ ) -> ItemMapping:
+ """Extract (database) album as ItemMapping."""
+ if album.provider == ProviderType.DATABASE:
+ if isinstance(album, ItemMapping):
+ return album
+ return ItemMapping.from_item(album)
- db_album = await self.mass.music.albums.add_db_item(track.album, db=db)
- return ItemMapping.from_item(db_album)
+ if db_artist := await self.mass.music.albums.get_db_item_by_prov_id(
+ album.item_id, provider=album.provider, db=db
+ ):
+ return ItemMapping.from_item(db_artist)
- return None
+ db_artist = await self.mass.music.albums.add_db_item(album, db=db)
+ return ItemMapping.from_item(db_artist)
async def _get_artist_mapping(
self, artist: Union[Artist, ItemMapping], db: Optional[Db] = None
# exact album match = 100% match
if compare_album(left_track.album, right_track.album):
return True
+ if left_track.albums and right_track.albums:
+ for left_album in left_track.albums:
+ for right_album in right_track.albums:
+ if compare_album(left_album, right_album):
+ return True
# fallback: both albums are compilations and (near-exact) track duration match
- return (
+ if (
abs(left_track.duration - right_track.duration) <= 1
and left_track.album.album_type in (AlbumType.UNKNOWN, AlbumType.COMPILATION)
and right_track.album.album_type in (AlbumType.UNKNOWN, AlbumType.COMPILATION)
- )
+ ):
+ return True
+ return False
from music_assistant.mass import MusicAssistant
-SCHEMA_VERSION = 13
+SCHEMA_VERSION = 14
-TABLE_PROV_MAPPINGS = "provider_mappings"
TABLE_TRACK_LOUDNESS = "track_loudness"
TABLE_PLAYLOG = "playlog"
TABLE_ARTISTS = "artists"
await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_PROV_MAPPINGS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_THUMBS}")
# recreate missing tables
await self.__create_database_tables(db)
+ if prev_version < 14:
+ # album --> albums on track entity
+ # no more need for prov_mappings table
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
+ await db.execute("DROP TABLE IF EXISTS provider_mappings")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
+ # recreate missing tables
+ await self.__create_database_tables(db)
+
# store current schema version
await self.set_setting("version", str(SCHEMA_VERSION), db=db)
@staticmethod
async def __create_database_tables(db: Db) -> None:
- """Init generic database tables."""
- await db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_PROV_MAPPINGS}(
- item_id INTEGER NOT NULL,
- media_type TEXT NOT NULL,
- prov_item_id TEXT NOT NULL,
- prov_type TEXT NOT NULL,
- prov_id TEXT NOT NULL,
- quality INTEGER NULL,
- details TEXT NULL,
- url TEXT NULL,
- UNIQUE(item_id, media_type, prov_item_id, prov_id)
- );"""
- )
+ """Init database tables."""
+ # TODO: create indexes, especially for the json columns
+
await db.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}(
item_id INTEGER NOT NULL,
isrc TEXT,
musicbrainz_id TEXT,
artists json,
- album json,
+ albums json,
metadata json,
disc_number INTEGER NULL,
track_number INTEGER NULL,
from databases import Database as Db
-from music_assistant.helpers.database import TABLE_PROV_MAPPINGS
from music_assistant.models.errors import MediaNotFoundError, ProviderUnavailableError
from .enums import MediaType, ProviderType
assert provider or provider_id, "provider or provider_id must be supplied"
if provider == ProviderType.DATABASE or provider_id == "database":
return await self.get_db_item(provider_item_id, db=db)
- if item_id := await self.mass.music.get_provider_mapping(
- self.media_type, provider_item_id, provider, provider_id=provider_id, db=db
+ for item in await self.get_db_items_by_prov_id(
+ provider=provider,
+ provider_id=provider_id,
+ provider_item_ids=(provider_item_id,),
+ db=db,
):
- return await self.get_db_item(item_id, db=db)
+ return item
return None
async def get_db_items_by_prov_id(
self,
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
+ provider_item_ids: Optional[Tuple[str]] = None,
db: Optional[Db] = None,
) -> List[ItemCls]:
"""Fetch all records from database for given provider."""
assert provider or provider_id, "provider or provider_id must be supplied"
- db_ids = await self.mass.music.get_provider_mappings(
- self.media_type, provider=provider, provider_id=provider_id, db=db
- )
- query = f"SELECT * FROM tracks WHERE item_id in {str(tuple(db_ids))}"
+ if provider == ProviderType.DATABASE or provider_id == "database":
+ return await self.get_db_items(db=db)
+
+ query = f"SELECT * FROM {self.db_table}, json_each(provider_ids)"
+ if provider_id is not None:
+ query += (
+ f" WHERE json_extract(json_each.value, '$.prov_id') = '{provider_id}'"
+ )
+ elif provider is not None:
+ query += f" WHERE json_extract(json_each.value, '$.prov_type') = '{provider.value}'"
+ if provider_item_ids is not None:
+ prov_ids = str(tuple(provider_item_ids))
+ if prov_ids.endswith(",)"):
+ prov_ids = prov_ids.replace(",)", ")")
+ query += f" AND json_extract(json_each.value, '$.item_id') in {prov_ids}"
+
return await self.get_db_items(query, db=db)
async def set_db_library(
"""Delete record from the database."""
async with self.mass.database.get_db(db) as db:
- # delete prov mappings
- await self.mass.database.delete(
- TABLE_PROV_MAPPINGS,
- {"item_id": int(item_id), "media_type": self.media_type.value},
- db=db,
- )
# delete item
await self.mass.database.delete(
self.db_table,
MetadataTypes = Union[int, bool, str, List[str]]
-JSON_KEYS = ("artists", "artist", "album", "metadata", "provider_ids")
+JSON_KEYS = ("artists", "artist", "albums", "metadata", "provider_ids")
@dataclass(frozen=True)
artists: List[Union[Artist, ItemMapping]] = field(default_factory=list)
# album track only
album: Union[Album, ItemMapping, None] = None
+ albums: List[ItemMapping] = field(default_factory=list)
disc_number: Optional[int] = None
track_number: Optional[int] = None
# playlist track only