score += 1
return int(score)
- def __hash__(self):
+ def __hash__(self) -> int:
"""Return custom hash."""
- return hash((self.provider_domain, self.item_id))
+ return hash((self.provider_instance, self.item_id))
+
+ def __eq__(self, other: ProviderMapping) -> bool:
+ """Check equality of two items."""
+ return self.provider_instance == other.provider_instance and self.item_id == other.item_id
@dataclass(frozen=True)
type: LinkType
url: str
- def __hash__(self):
+ def __hash__(self) -> int:
"""Return custom hash."""
return hash(self.type)
+ def __eq__(self, other: MediaItemLink) -> bool:
+ """Check equality of two items."""
+ return self.url == other.url
+
@dataclass(frozen=True)
class MediaItemImage(DataClassDictMixin):
# if the path is just a plain (remotely accessible) URL, set it to 'url'
provider: str = "url"
- def __hash__(self):
+ def __hash__(self) -> int:
"""Return custom hash."""
- return hash(self.type.value, self.path)
+ return hash((self.type.value, self.path))
+
+ def __eq__(self, other: MediaItemImage) -> bool:
+ """Check equality of two items."""
+ return self.__hash__() == other.__hash__()
@dataclass(frozen=True)
position_end: float | None = None
title: str | None = None
- def __hash__(self):
+ def __hash__(self) -> int:
"""Return custom hash."""
return hash(self.chapter_id)
+ def __eq__(self, other: MediaItemChapter) -> bool:
+ """Check equality of two items."""
+ return self.chapter_id == other.chapter_id
+
@dataclass
class MediaItemMetadata(DataClassDictMixin):
}
self.provider_mappings.add(prov_mapping)
- def __hash__(self):
- """Return custom hash."""
- return hash((self.media_type, self.provider, self.item_id))
-
@dataclass
class ItemMapping(DataClassDictMixin):
result.available = item.available
return result
- def __hash__(self):
- """Return custom hash."""
- return hash((self.media_type, self.provider, self.item_id))
-
def __post_init__(self):
"""Call after init."""
if not self.uri:
if not self.sort_name:
self.sort_name = create_sort_name(self.name)
+ def __hash__(self) -> int:
+ """Return custom hash."""
+ return hash((self.media_type.value, self.provider, self.item_id))
+
@dataclass
class Artist(MediaItem):
media_type: MediaType = MediaType.ARTIST
musicbrainz_id: str | None = None
- def __hash__(self):
- """Return custom hash."""
- return hash((self.provider, self.item_id))
-
@dataclass
class Album(MediaItem):
barcode: set[str] = field(default_factory=set)
musicbrainz_id: str | None = None # release group id
- def __hash__(self):
- """Return custom hash."""
- return hash((self.provider, self.item_id))
-
@dataclass
class DbAlbum(Album):
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
DEFAULT_PORT: Final[int] = 8095
-DEFAULT_DB_LIBRARY: Final[str] = "sqlite:///[storage_path]/library.db"
-DEFAULT_DB_CACHE: Final[str] = "sqlite:///[storage_path]/cache.db"
# common db tables
DB_TABLE_TRACK_LOUDNESS: Final[str] = "track_loudness"
import functools
import json
import logging
+import os
import time
from collections import OrderedDict
from collections.abc import Iterator, MutableMapping
from typing import TYPE_CHECKING, Any
from music_assistant.constants import (
- CONF_DB_CACHE,
DB_TABLE_CACHE,
DB_TABLE_SETTINGS,
- DEFAULT_DB_CACHE,
ROOT_LOGGER_NAME,
SCHEMA_VERSION,
)
async def close(self) -> None:
"""Cleanup on exit."""
+ await self.database.close()
async def get(self, cache_key: str, checksum: str | None = None, default=None):
"""Get object from cache and return the results.
async def _setup_database(self):
"""Initialize database."""
- db_url: str = self.mass.config.get(CONF_DB_CACHE, DEFAULT_DB_CACHE)
- db_url = db_url.replace("[storage_path]", self.mass.storage_path)
- self.database = DatabaseConnection(db_url)
+ db_path = os.path.join(self.mass.storage_path, "cache.db")
+ self.database = DatabaseConnection(db_path)
+ await self.database.setup()
# always create db tables if they don't exist to prevent errors trying to access them later
await self.__create_database_tables()
data=config,
)
# signal update to the player manager
- with suppress(PlayerUnavailableError):
+ with suppress(PlayerUnavailableError, AttributeError):
player = self.mass.players.get(config.player_id)
player.enabled = config.enabled
self.mass.players.update(config.player_id, force_update=True)
# grab additional metadata
if not skip_metadata_lookup:
await self.mass.metadata.get_album_metadata(item)
- async with self._db_add_lock:
- # use the lock to prevent a race condition of the same item being added twice
- existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
- if existing:
- db_item = await self._update_db_item(existing.item_id, item)
+ if item.provider == "database":
+ db_item = await self._update_db_item(item.item_id, item)
else:
- db_item = await self._add_db_item(item)
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ db_item = await self._add_db_item(item)
# also fetch the same album on all providers
if not skip_metadata_lookup:
await self._match(db_item)
- # return final db_item after all match/metadata actions
- db_item = await self.get_db_item(db_item.item_id)
- # preload album tracks in db
+ # preload album tracks listing (do not load them in the db)
for prov_mapping in db_item.provider_mappings:
- for track in await self._get_provider_album_tracks(
+ await self._get_provider_album_tracks(
prov_mapping.item_id, prov_mapping.provider_instance
- ):
- if not await self.mass.music.tracks.get_db_item_by_prov_id(
- track.item_id, track.provider
- ):
- track.album = db_item
- await self.mass.music.tracks.add(track, skip_metadata_lookup=True)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
- return db_item
+ )
+ # return final db_item after all match/metadata actions
+ return await self.get_db_item(db_item.item_id)
async def update(self, item_id: str | int, update: Album, overwrite: bool = False) -> Album:
"""Update existing record in the database."""
"""Add a new record to the database."""
assert item.provider_mappings, "Item is missing provider mapping(s)"
assert item.artists, f"Album {item.name} is missing artists"
- cur_item = None
+
# safety guard: check for existing item first
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- # always try to grab existing item by musicbrainz_id
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
- # try barcode/upc
- if not cur_item and item.barcode:
- for barcode in item.barcode:
- if search_result := await self.mass.music.database.search(
- self.db_table, barcode, "barcode"
- ):
- cur_item = Album.from_db_row(search_result[0])
- break
- if not cur_item:
- # fallback to search and match
- for row in await self.mass.music.database.search(self.db_table, item.name):
- row_album = Album.from_db_row(row)
- if compare_album(row_album, item):
- cur_item = row_album
- break
- if cur_item:
- # update existing
+ if cur_item := await self.get_db_item_by_prov_id(item.item_id, item.provider):
+ # existing item found: update it
return await self._update_db_item(cur_item.item_id, item)
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ if db_row := await self.mass.music.database.get_row(self.db_table, match):
+ cur_item = Album.from_db_row(db_row)
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ # try barcode/upc
+ if not cur_item and item.barcode:
+ for barcode in item.barcode:
+ if search_result := await self.mass.music.database.search(
+ self.db_table, barcode, "barcode"
+ ):
+ cur_item = Album.from_db_row(search_result[0])
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ # fallback to search and match
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.music.database.get_rows(self.db_table, match):
+ row_album = Album.from_db_row(row)
+ if compare_album(row_album, item):
+ cur_item = row_album
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
# insert new item
album_artists = await self._get_artist_mappings(item, cur_item)
sort_artist = album_artists[0].sort_name if album_artists else ""
- async with self._db_add_lock:
- new_item = await self.mass.music.database.insert(
- self.db_table,
- {
- **item.to_db_row(),
- "artists": serialize_to_json(album_artists) or None,
- "sort_artist": sort_artist,
- "timestamp_added": int(utc_timestamp()),
- "timestamp_modified": int(utc_timestamp()),
- },
- )
- item_id = new_item["item_id"]
+ new_item = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ **item.to_db_row(),
+ "artists": serialize_to_json(album_artists) or None,
+ "sort_artist": sort_artist,
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
+ },
+ )
+ db_id = new_item["item_id"]
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, item.provider_mappings)
+ await self._set_provider_mappings(db_id, item.provider_mappings)
self.logger.debug("added %s to database", item.name)
- # return created object
- return await self.get_db_item(item_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just added
+ return db_item
async def _update_db_item(
self, item_id: str | int, item: Album | ItemMapping, overwrite: bool = False
# update/set provider_mappings table
await self._set_provider_mappings(db_id, provider_mappings)
self.logger.debug("updated %s in database: %s", item.name, db_id)
- return await self.get_db_item(db_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just updated
+ return db_item
async def _get_provider_album_tracks(
self, item_id: str, provider_instance_id_or_domain: str
# grab musicbrainz id and additional metadata
if not skip_metadata_lookup:
await self.mass.metadata.get_artist_metadata(item)
- async with self._db_add_lock:
- # use the lock to prevent a race condition of the same item being added twice
- existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
- if existing:
- db_item = await self._update_db_item(existing.item_id, item)
+ if item.provider == "database":
+ db_item = await self._update_db_item(item.item_id, item)
else:
- db_item = await self._add_db_item(item)
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ db_item = await self._add_db_item(item)
# also fetch same artist on all providers
if not skip_metadata_lookup:
await self.match_artist(db_item)
# return final db_item after all match/metadata actions
- db_item = await self.get_db_item(db_item.item_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
- return db_item
+ return await self.get_db_item(db_item.item_id)
async def update(self, item_id: str | int, update: Artist, overwrite: bool = False) -> Artist:
"""Update existing record in the database."""
if item.musicbrainz_id == VARIOUS_ARTISTS_ID:
item.name = VARIOUS_ARTISTS
# safety guard: check for existing item first
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- # always try to grab existing item by musicbrainz_id
- cur_item = None
- if musicbrainz_id := getattr(item, "musicbrainz_id", None):
- match = {"musicbrainz_id": musicbrainz_id}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to exact name match
- # NOTE: we match an artist by name which could theoretically lead to collisions
- # but the chance is so small it is not worth the additional overhead of grabbing
- # the musicbrainz id upfront
- match = {"sort_name": item.sort_name}
- for row in await self.mass.music.database.get_rows(self.db_table, match):
- row_artist = Artist.from_db_row(row)
- if row_artist.sort_name == item.sort_name:
- cur_item = row_artist
- break
- if cur_item:
- # update existing
+ if isinstance(item, ItemMapping) and (
+ cur_item := await self.get_db_item_by_prov_id(item.item_id, item.provider)
+ ):
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ if cur_item := await self.get_db_item_by_prov_mappings(item.provider_mappings):
return await self._update_db_item(cur_item.item_id, item)
+ if musicbrainz_id := getattr(item, "musicbrainz_id", None):
+ match = {"musicbrainz_id": musicbrainz_id}
+ if db_row := await self.mass.music.database.get_row(self.db_table, match):
+ # existing item found: update it
+ cur_item = Artist.from_db_row(db_row)
+ return await self._update_db_item(cur_item.item_id, item)
+ # fallback to exact name match
+ # NOTE: we match an artist by name which could theoretically lead to collisions
+ # but the chance is so small it is not worth the additional overhead of grabbing
+ # the musicbrainz id upfront
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.music.database.get_rows(self.db_table, match):
+ row_artist = Artist.from_db_row(row)
+ if row_artist.sort_name == item.sort_name:
+ cur_item = row_artist
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
- # insert item
+ # no existing item matched: insert item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
# edge case: item is an ItemMapping,
# try to construct (a half baken) Artist object from it
if isinstance(item, ItemMapping):
item = Artist.from_dict(item.to_dict())
- async with self._db_add_lock:
- new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
- item_id = new_item["item_id"]
+ new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
+ db_id = new_item["item_id"]
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, item.provider_mappings)
+ await self._set_provider_mappings(db_id, item.provider_mappings)
self.logger.debug("added %s to database", item.name)
- # return created object
- return await self.get_db_item(item_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just added
+ return db_item
async def _update_db_item(
self, item_id: str | int, item: Artist | ItemMapping, overwrite: bool = False
# update/set provider_mappings table
await self._set_provider_mappings(db_id, provider_mappings)
self.logger.debug("updated %s in database: %s", item.name, db_id)
- return await self.get_db_item(db_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just updated
+ return db_item
async def _get_provider_dynamic_tracks(
self,
"""Base (ABC) MediaType specific controller."""
from __future__ import annotations
-import asyncio
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncGenerator
media_type: MediaType
item_cls: MediaItemType
db_table: str
- _db_add_lock = asyncio.Lock()
def __init__(self, mass: MusicAssistant):
"""Initialize class."""
return item
return None
+ async def get_db_item_by_prov_mappings(
+ self,
+ provider_mappings: list[ProviderMapping],
+ ) -> ItemCls | None:
+ """Get the database item for the given provider_instance."""
+ # always prefer provider instance first
+ for mapping in provider_mappings:
+ for item in await self.get_db_items_by_prov_id(
+ mapping.provider_instance,
+ provider_item_ids=(mapping.item_id,),
+ ):
+ return item
+ # check by domain too
+ for mapping in provider_mappings:
+ for item in await self.get_db_items_by_prov_id(
+ mapping.provider_domain,
+ provider_item_ids=(mapping.item_id,),
+ ):
+ return item
+ return None
+
async def get_db_items_by_prov_id(
self,
provider_instance_id_or_domain: str,
"""Set the in-library bool on a database item."""
db_id = int(item_id) # ensure integer
match = {"item_id": db_id}
- async with self._db_add_lock:
- await self.mass.music.database.update(self.db_table, match, {"in_library": in_library})
+ await self.mass.music.database.update(self.db_table, match, {"in_library": in_library})
db_item = await self.get_db_item(db_id)
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
return fallback_item
raise MediaNotFoundError(
f"{self.media_type.value}://{item_id} not "
- "found on provider {provider_instance_id_or_domain}"
+ f"found on provider {provider_instance_id_or_domain}"
)
async def remove_prov_mapping(self, item_id: str | int, provider_instance_id: str) -> None:
return
# update provider_mappings table
- async with self._db_add_lock:
- await self.mass.music.database.delete(
- DB_TABLE_PROVIDER_MAPPINGS,
- {
- "media_type": self.media_type.value,
- "item_id": db_id,
- "provider_instance": provider_instance_id,
- },
- )
+ await self.mass.music.database.delete(
+ DB_TABLE_PROVIDER_MAPPINGS,
+ {
+ "media_type": self.media_type.value,
+ "item_id": db_id,
+ "provider_instance": provider_instance_id,
+ },
+ )
# update the item in db (provider_mappings column only)
db_item.provider_mappings = {
}
match = {"item_id": db_id}
if db_item.provider_mappings:
- async with self._db_add_lock:
- await self.mass.music.database.update(
- self.db_table,
- match,
- {"provider_mappings": serialize_to_json(db_item.provider_mappings)},
- )
+ await self.mass.music.database.update(
+ self.db_table,
+ match,
+ {"provider_mappings": serialize_to_json(db_item.provider_mappings)},
+ )
self.logger.debug("removed provider %s from item id %s", provider_instance_id, db_id)
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
else:
) -> None:
"""Update the provider_items table for the media item."""
db_id = int(item_id) # ensure integer
- # clear all records first
- async with self._db_add_lock:
- await self.mass.music.database.delete(
- DB_TABLE_PROVIDER_MAPPINGS,
- {"media_type": self.media_type.value, "item_id": db_id},
+ # get current mappings (if any)
+ cur_mappings = set()
+ match = {"media_type": self.media_type.value, "item_id": db_id}
+ for db_row in await self.mass.music.database.get_rows(DB_TABLE_PROVIDER_MAPPINGS, match):
+ cur_mappings.add(
+ ProviderMapping(
+ item_id=db_row["provider_item_id"],
+ provider_domain=db_row["provider_domain"],
+ provider_instance=db_row["provider_instance"],
+ )
)
- # add entries
- for provider_mapping in provider_mappings:
- await self.mass.music.database.insert_or_replace(
+ # delete removed mappings
+ for prov_mapping in cur_mappings:
+ if prov_mapping not in set(provider_mappings):
+ await self.mass.music.database.delete(
DB_TABLE_PROVIDER_MAPPINGS,
{
- "media_type": self.media_type.value,
- "item_id": db_id,
- "provider_domain": provider_mapping.provider_domain,
- "provider_instance": provider_mapping.provider_instance,
- "provider_item_id": provider_mapping.item_id,
+ **match,
+ "provider_domain": prov_mapping.provider_domain,
+ "provider_instance": prov_mapping.provider_instance,
+ "provider_item_id": prov_mapping.item_id,
},
)
+ # add entries
+ for provider_mapping in provider_mappings:
+ await self.mass.music.database.insert_or_replace(
+ DB_TABLE_PROVIDER_MAPPINGS,
+ {
+ **match,
+ "provider_domain": provider_mapping.provider_domain,
+ "provider_instance": provider_mapping.provider_instance,
+ "provider_item_id": provider_mapping.item_id,
+ },
+ )
def _get_provider_mappings(
self,
async def add(self, item: Playlist, skip_metadata_lookup: bool = False) -> Playlist:
"""Add playlist to local db and return the new database item."""
- if not skip_metadata_lookup:
- await self.mass.metadata.get_playlist_metadata(item)
+ if item.provider == "database":
+ db_item = await self._update_db_item(item.item_id, item)
+ else:
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ db_item = await self._add_db_item(item)
# preload playlist tracks listing (do not load them in the db)
- async for track in self.tracks(item.item_id, item.provider):
+ async for _ in self.tracks(item.item_id, item.provider):
pass
- async with self._db_add_lock:
- # use the lock to prevent a race condition of the same item being added twice
- existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
- if existing:
- db_item = await self._update_db_item(existing.item_id, item)
- else:
- db_item = await self._add_db_item(item)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
+ # metadata lookup we need to do after adding it to the db
+ if not skip_metadata_lookup:
+ await self.mass.metadata.get_playlist_metadata(db_item)
+ db_item = await self._update_db_item(db_item.item_id, db_item)
return db_item
async def update(self, item_id: int, update: Playlist, overwrite: bool = False) -> Playlist:
async def _add_db_item(self, item: Playlist) -> Playlist:
"""Add a new record to the database."""
assert item.provider_mappings, "Item is missing provider mapping(s)"
- cur_item = None
# safety guard: check for existing item first
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- match = {"sort_name": item.sort_name, "owner": item.owner}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
- if cur_item:
- # update existing
- return await self._update_db_item(cur_item["item_id"], item)
+ if cur_item := await self.get_db_item_by_prov_mappings(item.provider_mappings):
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ # try name matching
+ match = {"name": item.name, "owner": item.owner}
+ if db_row := await self.mass.music.database.get_row(self.db_table, match):
+ cur_item = Playlist.from_db_row(db_row)
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
# insert new item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
- async with self._db_add_lock:
- new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
- item_id = new_item["item_id"]
+ new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
+ db_id = new_item["item_id"]
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, item.provider_mappings)
+ await self._set_provider_mappings(db_id, item.provider_mappings)
self.logger.debug("added %s to database", item.name)
- # return created object
- return await self.get_db_item(item_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just added
+ return db_item
async def _update_db_item(
self, item_id: str | int, item: Playlist, overwrite: bool = False
# update/set provider_mappings table
await self._set_provider_mappings(db_id, provider_mappings)
self.logger.debug("updated %s in database: %s", item.name, db_id)
- return await self.get_db_item(db_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just updated
+ return db_item
async def _get_provider_playlist_tracks(
self,
"""Add radio to local db and return the new database item."""
if not skip_metadata_lookup:
await self.mass.metadata.get_radio_metadata(item)
- existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
- if existing:
- db_item = await self._update_db_item(existing.item_id, item)
+ if item.provider == "database":
+ db_item = await self._update_db_item(item.item_id, item)
else:
- db_item = await self._add_db_item(item)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ db_item = await self._add_db_item(item)
return db_item
async def update(self, item_id: str | int, update: Radio, overwrite: bool = False) -> Radio:
assert item.provider_mappings, "Item is missing provider mapping(s)"
cur_item = None
# safety guard: check for existing item first
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- match = {"name": item.name}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
- if cur_item:
- # update existing
- return await self._update_db_item(cur_item["item_id"], item)
+ if cur_item := await self.get_db_item_by_prov_id(item.item_id, item.provider):
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ # try name matching
+ match = {"name": item.name}
+ if db_row := await self.mass.music.database.get_row(self.db_table, match):
+ cur_item = Radio.from_db_row(db_row)
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
# insert new item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
- async with self._db_add_lock:
- new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
- item_id = new_item["item_id"]
+ new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
+ db_id = new_item["item_id"]
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, item.provider_mappings)
+ await self._set_provider_mappings(db_id, item.provider_mappings)
self.logger.debug("added %s to database", item.name)
- # return created object
- return await self.get_db_item(item_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just added
+ return db_item
async def _update_db_item(
self, item_id: str | int, item: Radio, overwrite: bool = False
# update/set provider_mappings table
await self._set_provider_mappings(db_id, provider_mappings)
self.logger.debug("updated %s in database: %s", item.name, db_id)
- return await self.get_db_item(db_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just updated
+ return db_item
async def _get_provider_dynamic_tracks(
self,
# grab additional metadata
if not skip_metadata_lookup:
await self.mass.metadata.get_track_metadata(item)
- async with self._db_add_lock:
- # use the lock to prevent a race condition of the same item being added twice
- existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
- if existing:
- db_item = await self._update_db_item(existing.item_id, item)
+ if item.provider == "database":
+ db_item = await self._update_db_item(item.item_id, item)
else:
- db_item = await self._add_db_item(item)
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ db_item = await self._add_db_item(item)
# also fetch same track on all providers (will also get other quality versions)
if not skip_metadata_lookup:
await self._match(db_item)
# return final db_item after all match/metadata actions
- db_item = await self.get_db_item(db_item.item_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
- return db_item
+ return await self.get_db_item(db_item.item_id)
async def update(self, item_id: str | int, update: Track, overwrite: bool = False) -> Track:
"""Update existing record in the database."""
assert isinstance(item, Track), "Not a full Track object"
assert item.artists, "Track is missing artist(s)"
assert item.provider_mappings, "Track is missing provider mapping(s)"
- cur_item = None
-
# safety guard: check for existing item first
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- # always try to grab existing item by external_id
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
- for isrc in item.isrc:
- if search_result := await self.mass.music.database.search(
- self.db_table, isrc, "isrc"
- ):
- cur_item = Track.from_db_row(search_result[0])
- break
- if not cur_item:
- # fallback to matching
- match = {"sort_name": item.sort_name}
- for row in await self.mass.music.database.get_rows(self.db_table, match):
- row_track = Track.from_db_row(row)
- if compare_track(row_track, item):
- cur_item = row_track
- break
- if cur_item:
- # update existing
+ if cur_item := await self.get_db_item_by_prov_mappings(item.provider_mappings):
+ # existing item found: update it
return await self._update_db_item(cur_item.item_id, item)
+ # try matching on musicbrainz_id
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ if db_row := await self.mass.music.database.get_row(self.db_table, match):
+ cur_item = Track.from_db_row(db_row)
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ # try matching on isrc
+ for isrc in item.isrc:
+ if search_result := await self.mass.music.database.search(self.db_table, isrc, "isrc"):
+ cur_item = Track.from_db_row(search_result[0])
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
+ # fallback to compare matching
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.music.database.get_rows(self.db_table, match):
+ row_track = Track.from_db_row(row)
+ if compare_track(row_track, item):
+ cur_item = row_track
+ # existing item found: update it
+ return await self._update_db_item(cur_item.item_id, item)
# no existing match found: insert new item
track_artists = await self._get_artist_mappings(item)
track_albums = await self._get_track_albums(item)
sort_artist = track_artists[0].sort_name if track_artists else ""
sort_album = track_albums[0].sort_name if track_albums else ""
- async with self._db_add_lock:
- new_item = await self.mass.music.database.insert(
- self.db_table,
- {
- **item.to_db_row(),
- "artists": serialize_to_json(track_artists),
- "albums": serialize_to_json(track_albums),
- "sort_artist": sort_artist,
- "sort_album": sort_album,
- "timestamp_added": int(utc_timestamp()),
- "timestamp_modified": int(utc_timestamp()),
- },
- )
- item_id = new_item["item_id"]
+ new_item = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ **item.to_db_row(),
+ "artists": serialize_to_json(track_artists),
+ "albums": serialize_to_json(track_albums),
+ "sort_artist": sort_artist,
+ "sort_album": sort_album,
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
+ },
+ )
+ db_id = new_item["item_id"]
# update/set provider_mappings table
- await self._set_provider_mappings(item_id, item.provider_mappings)
+ await self._set_provider_mappings(db_id, item.provider_mappings)
# return created object
- self.logger.debug("added %s to database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ self.logger.debug("added %s to database: %s", item.name, db_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just added
+ return db_item
async def _update_db_item(
self, item_id: str | int, item: Track | ItemMapping, overwrite: bool = False
# update/set provider_mappings table
await self._set_provider_mappings(db_id, provider_mappings)
self.logger.debug("updated %s in database: %s", item.name, db_id)
- return await self.get_db_item(db_id)
+ # get full created object
+ db_item = await self.get_db_item(db_id)
+ # only signal event if we're not running a sync (to prevent a floodstorm of events)
+ if not self.mass.music.get_running_sync_tasks():
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED,
+ db_item.uri,
+ db_item,
+ )
+ # return the full item we just updated
+ return db_item
async def _get_track_albums(
self,
import asyncio
import logging
+import os
import statistics
from itertools import zip_longest
from typing import TYPE_CHECKING
from music_assistant.common.models.media_items import BrowseFolder, MediaItemType, SearchResults
from music_assistant.common.models.provider import SyncTask
from music_assistant.constants import (
- CONF_DB_LIBRARY,
DB_TABLE_ALBUMS,
DB_TABLE_ARTISTS,
DB_TABLE_PLAYLISTS,
DB_TABLE_SETTINGS,
DB_TABLE_TRACK_LOUDNESS,
DB_TABLE_TRACKS,
- DEFAULT_DB_LIBRARY,
ROOT_LOGGER_NAME,
SCHEMA_VERSION,
)
self.radio = RadioController(mass)
self.playlists = PlaylistController(mass)
self.in_progress_syncs: list[SyncTask] = []
+ self._sync_lock = asyncio.Lock()
async def setup(self):
"""Async initialize of module."""
async def close(self) -> None:
"""Cleanup on exit."""
+ await self.database.close()
@property
def providers(self) -> list[MusicProvider]:
if provider.instance_id not in providers:
continue
self._start_provider_sync(provider.instance_id, media_types)
- # trigger metadata scan after provider sync completed
- self.mass.metadata.start_scan()
# reschedule task if needed
def create_sync_task():
@api_command("music/synctasks")
def get_running_sync_tasks(self) -> list[SyncTask]:
- """Return list with providers that are currently syncing."""
+ """Return list with providers that are currently (scheduled for) syncing."""
return self.in_progress_syncs
@api_command("music/search")
)
return
- # we keep track of running sync tasks
provider = self.mass.get_provider(provider_instance)
- task = self.mass.create_task(provider.sync_library(media_types))
+
+ async def run_sync() -> None:
+ # Wrap the provider sync into a lock to prevent
+ # race conditions when multiple propviders are syncing at the same time.
+ async with self._sync_lock:
+ await provider.sync_library(media_types)
+
+ # we keep track of running sync tasks
+ task = self.mass.create_task(run_sync())
sync_spec = SyncTask(
provider_domain=provider.domain,
provider_instance=provider.instance_id,
def on_sync_task_done(task: asyncio.Task): # noqa: ARG001
self.in_progress_syncs.remove(sync_spec)
self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
+ # trigger metadata scan after provider sync completed
+ self.mass.metadata.start_scan()
task.add_done_callback(on_sync_task_done)
async def _setup_database(self):
"""Initialize database."""
- db_url: str = self.mass.config.get(CONF_DB_LIBRARY, DEFAULT_DB_LIBRARY)
- db_url = db_url.replace("[storage_path]", self.mass.storage_path)
- self.database = DatabaseConnection(db_url)
+ db_path = os.path.join(self.mass.storage_path, "library.db")
+ self.database = DatabaseConnection(db_path)
+ await self.database.setup()
# always create db tables if they don't exist to prevent errors trying to access them later
await self.__create_database_tables()
provider_domain TEXT NOT NULL,
provider_instance TEXT NOT NULL,
provider_item_id TEXT NOT NULL,
- UNIQUE(media_type, item_id, provider_instance,
- provider_item_id, provider_item_id)
+ UNIQUE(media_type, provider_instance, provider_item_id)
);"""
)
"-loglevel",
"warning" if LOGGER.isEnabledFor(logging.DEBUG) else "quiet",
"-ignore_unknown",
+ "-protocol_whitelist",
+ "file,http,https,tcp,tls,crypto,pipe", # support nested protocols (e.g. within playlist)
]
# collect input args
input_args = []
return left_album.musicbrainz_id == right_album.musicbrainz_id
# fallback to comparing
- if not compare_strings(left_album.name, right_album.name, False):
+ if not compare_strings(left_album.name, right_album.name, True):
return False
if not compare_version(left_album.version, right_album.version):
return False
# track artist(s) must match
if not compare_artists(left_track.artists, right_track.artists):
return False
- # track if both tracks are (not) explicit
+ # check if both tracks are (not) explicit
if strict and not compare_explicit(left_track.metadata, right_track.metadata):
return False
# exact albumtrack match = 100% match
and left_track.track_number == right_track.track_number
):
return True
- # exact album match = 100% match
- if left_track.albums and right_track.albums:
+ # check album match
+ if (
+ not (album_match_found := compare_album(left_track.album, right_track.album))
+ and left_track.albums
+ and right_track.albums
+ ):
for left_album in left_track.albums:
for right_album in right_track.albums:
if compare_album(left_album, right_album):
- return True
+ album_match_found = True
+ if (
+ (left_album.disc_number or 1) == (right_album.disc_number or 1)
+ and left_album.track_number
+ and right_album.track_number
+ and left_album.track_number == right_album.track_number
+ ):
+ # exact albumtrack match = 100% match
+ return True
# fallback: exact album match and (near-exact) track duration match
- if abs(left_track.duration - right_track.duration) <= 3 and compare_album(
- left_track.album, right_track.album
- ):
+ if album_match_found and abs(left_track.duration - right_track.duration) <= 3:
return True
return False
from collections.abc import Mapping
from typing import Any
-from databases import Database as Db
-from databases import DatabaseURL
-from sqlalchemy.sql import ClauseElement
+import aiosqlite
class DatabaseConnection:
"""Class that holds the (connection to the) database with some convenience helper functions."""
- def __init__(self, url: DatabaseURL):
+ _db: aiosqlite.Connection
+
+ def __init__(self, db_path: str):
"""Initialize class."""
- self.url = url
- # we maintain one global connection - otherwise we run into (dead)lock issues.
- # https://github.com/encode/databases/issues/456
- self._db = Db(self.url, timeout=360)
+ self.db_path = db_path
async def setup(self) -> None:
"""Perform async initialization."""
- await self._db.connect()
+ self._db = await aiosqlite.connect(self.db_path)
+ self._db.row_factory = aiosqlite.Row
async def close(self) -> None:
"""Close db connection on exit."""
- await self._db.disconnect()
+ await self._db.close()
async def get_rows(
self,
if order_by is not None:
sql_query += f" ORDER BY {order_by}"
sql_query += f" LIMIT {limit} OFFSET {offset}"
- return await self._db.fetch_all(sql_query, match)
+ return await self._db.execute_fetchall(sql_query, match)
async def get_rows_from_query(
self,
) -> list[Mapping]:
"""Get all rows for given custom query."""
query = f"{query} LIMIT {limit} OFFSET {offset}"
- return await self._db.fetch_all(query, params)
+ return await self._db.execute_fetchall(query, params)
async def get_count_from_query(
self,
) -> int:
"""Get row count for given custom query."""
query = f"SELECT count() FROM ({query})"
- if result := await self._db.fetch_one(query, params):
- return result[0]
+ async with self._db.execute(query, params) as cursor:
+ if result := await cursor.fetchone():
+ return result[0]
return 0
async def get_count(
) -> int:
"""Get row count for given table."""
query = f"SELECT count(*) FROM {table}"
- if result := await self._db.fetch_one(query):
- return result[0]
+ async with self._db.execute(query) as cursor:
+ if result := await cursor.fetchone():
+ return result[0]
return 0
async def search(self, table: str, search: str, column: str = "name") -> list[Mapping]:
"""Search table by column."""
sql_query = f"SELECT * FROM {table} WHERE {column} LIKE :search"
params = {"search": f"%{search}%"}
- return await self._db.fetch_all(sql_query, params)
+ return await self._db.execute_fetchall(sql_query, params)
async def get_row(self, table: str, match: dict[str, Any]) -> Mapping | None:
"""Get single row for given table where column matches keys/values."""
sql_query = f"SELECT * FROM {table} WHERE "
sql_query += " AND ".join(f"{x} = :{x}" for x in match)
- return await self._db.fetch_one(sql_query, match)
+ async with self._db.execute(sql_query, match) as cursor:
+ return await cursor.fetchone()
async def insert(
self,
sql_query = f'INSERT INTO {table}({",".join(keys)})'
sql_query += f' VALUES ({",".join((f":{x}" for x in keys))})'
await self.execute(sql_query, values)
+ await self._db.commit()
# return inserted/replaced item
lookup_vals = {key: value for key, value in values.items() if value not in (None, "")}
return await self.get_row(table, lookup_vals)
sql_query = f'UPDATE {table} SET {",".join((f"{x}=:{x}" for x in keys))} WHERE '
sql_query += " AND ".join(f"{x} = :{x}" for x in match)
await self.execute(sql_query, {**match, **values})
+ await self._db.commit()
# return updated item
return await self.get_row(table, match)
sql_query += "WHERE " + query
elif query:
sql_query += query
-
await self.execute(sql_query, match)
+ await self._db.commit()
async def delete_where_query(self, table: str, query: str | None = None) -> None:
"""Delete data in given table using given where clausule."""
sql_query = f"DELETE FROM {table} WHERE {query}"
await self.execute(sql_query)
+ await self._db.commit()
- async def execute(self, query: ClauseElement | str, values: dict = None) -> Any:
+ async def execute(self, query: str | str, values: dict = None) -> Any:
"""Execute command on the database."""
return await self._db.execute(query, values)
data = json.loads(res)
if error := data.get("error"):
raise InvalidDataError(error["string"])
- if not data.get("streams") or data["streams"][0].get("codec_type") == "video":
+ if not data.get("streams"):
raise InvalidDataError("Not an audio file")
tags = AudioTags.parse(data)
del res
controller = self.mass.music.get_controller(media_type)
cur_db_ids = set()
async for prov_item in self._get_library_gen(media_type):
- db_item: MediaItemType
- if not (
- db_item := await controller.get_db_item_by_prov_id(
- prov_item.item_id,
- prov_item.provider,
- )
- ):
+ db_item = await controller.get_db_item_by_prov_mappings(
+ prov_item.provider_mappings,
+ )
+ if not db_item:
# create full db item
+ prov_item.in_library = True
db_item = await controller.add(prov_item, skip_metadata_lookup=True)
elif (
db_item.metadata.checksum and prov_item.metadata.checksum
await controller.set_db_library(db_item.item_id, True)
# process deletions (= no longer in library)
- async for db_item in controller.iter_db_items(True):
- if db_item.item_id in cur_db_ids:
- continue
- for prov_mapping in db_item.provider_mappings:
- provider_domains = {x.provider_domain for x in db_item.provider_mappings}
- if len(provider_domains) > 1:
- continue
- if prov_mapping.provider_instance != self.instance_id:
- continue
- # only mark the item as not in library and leave the metadata in db
- await controller.set_db_library(db_item.item_id, False)
+ cache_key = f"db_items.{media_type}.{self.instance_id}"
+ prev_db_items: list[int] | None
+ if prev_db_items := await self.mass.cache.get(cache_key):
+ for db_id in prev_db_items:
+ if db_id not in cur_db_ids:
+ # only mark the item as not in library and leave the metadata in db
+ await controller.set_db_library(db_id, False)
+ await self.mass.cache.set(cache_key, list(cur_db_ids))
# DO NOT OVERRIDE BELOW
album.barcode.add(album_obj["upc"])
if "label" in album_obj:
album.metadata.label = album_obj["label"]["name"]
- if album_obj.get("released_at"):
+ if (released_at := album_obj.get("released_at")) and released_at != 0:
album.year = datetime.datetime.fromtimestamp(album_obj["released_at"]).year
if album_obj.get("copyright"):
album.metadata.copyright = album_obj["copyright"]
if (
self._auth_token
and os.path.isdir(self._cache_dir)
- and (self._auth_token["expiresAt"] > int(time.time()) + 20)
+ and (self._auth_token["expiresAt"] > int(time.time()) + 600)
):
return self._auth_token
tokeninfo, userinfo = None, self._sp_user
"aiofiles==23.1.0",
"aiorun==2022.11.1",
"coloredlogs==15.0.1",
- "databases==0.7.0",
"aiosqlite==0.19.0",
"python-slugify==8.0.1",
"mashumaro==3.7",
asyncio-throttle==1.0.2
coloredlogs==15.0.1
cryptography==40.0.2
-databases==0.7.0
deezer-python==5.10.0
faust-cchardet>=2.1.18
git+https://github.com/gieljnssns/python-radios.git@main