def parse_uri(uri: str) -> tuple[MediaType, str, str]:
"""Try to parse URI to Mass identifiers.
- Returns Tuple: MediaType, provider_domain_or_instance_id, item_id
+ Returns Tuple: MediaType, provider_instance_id_or_domain, item_id
"""
try:
if uri.startswith("https://open."):
# public share URL (e.g. Spotify or Qobuz, not sure about others)
# https://open.spotify.com/playlist/5lH9NjOeJvctAO92ZrKQNB?si=04a63c8234ac413e
- provider_domain_or_instance_id = uri.split(".")[1]
+ provider_instance_id_or_domain = uri.split(".")[1]
media_type_str = uri.split("/")[3]
media_type = MediaType(media_type_str)
item_id = uri.split("/")[4].split("?")[0]
elif uri.startswith("http://") or uri.startswith("https://"):
# Translate a plain URL to the URL provider
- provider_domain_or_instance_id = "url"
+ provider_instance_id_or_domain = "url"
media_type = MediaType.UNKNOWN
item_id = uri
elif "://" in uri:
# music assistant-style uri
# provider://media_type/item_id
- provider_domain_or_instance_id = uri.split("://")[0]
+ provider_instance_id_or_domain = uri.split("://")[0]
media_type_str = uri.split("/")[2]
media_type = MediaType(media_type_str)
item_id = uri.split(f"{media_type_str}/")[1]
elif ":" in uri:
# spotify new-style uri
- provider_domain_or_instance_id, media_type_str, item_id = uri.split(":")
+ provider_instance_id_or_domain, media_type_str, item_id = uri.split(":")
media_type = MediaType(media_type_str)
elif os.path.isfile(uri):
# Translate a local file (which is not from file provider) to the URL provider
- provider_domain_or_instance_id = "url"
+ provider_instance_id_or_domain = "url"
media_type = MediaType.TRACK
item_id = uri
else:
raise KeyError
except (TypeError, AttributeError, ValueError, KeyError) as err:
raise MusicAssistantError(f"Not a valid Music Assistant uri: {uri}") from err
- return (media_type, provider_domain_or_instance_id, item_id)
+ return (media_type, provider_instance_id_or_domain, item_id)
-def create_uri(media_type: MediaType, provider_domain_or_instance_id: str, item_id: str) -> str:
+def create_uri(media_type: MediaType, provider_instance_id_or_domain: str, item_id: str) -> str:
"""Create Music Assistant URI from MediaItem values."""
- return f"{provider_domain_or_instance_id}://{media_type.value}/{item_id}"
+ return f"{provider_instance_id_or_domain}://{media_type.value}/{item_id}"
"""Base representation of a media item."""
item_id: str
- provider: str
+ provider: str # provider instance id or provider domain
name: str
provider_mappings: set[ProviderMapping] = field(default_factory=set)
return hash((self.media_type, self.provider, self.item_id))
-@dataclass(frozen=True)
+@dataclass
class ItemMapping(DataClassDictMixin):
"""Representation of a minimized item object."""
media_type: MediaType
item_id: str
- provider: str
+ provider: str # provider instance id or provider domain
name: str
- sort_name: str
- uri: str
+ sort_name: str | None = None
+ uri: str | None = None
version: str = ""
@classmethod
"""Return custom hash."""
return hash((self.media_type, self.provider, self.item_id))
+ def __post_init__(self):
+ """Call after init."""
+ if not self.uri:
+ self.uri = create_uri(self.media_type, self.provider, self.item_id)
+ if not self.sort_name:
+ self.sort_name = create_sort_name(self.name)
+
@dataclass
class Artist(MediaItem):
barcode: set[str] = field(default_factory=set)
musicbrainz_id: str | None = None # release group id
- @property
- def artist(self) -> Artist | ItemMapping | None:
- """Return (first) artist of album."""
- if self.artists:
- return self.artists[0]
- return None
-
- @artist.setter
- def artist(self, artist: Artist | ItemMapping) -> None:
- """Set (first/only) artist of album."""
- self.artists = [artist]
-
def __hash__(self):
"""Return custom hash."""
return hash((self.provider, self.item_id))
artists: list[ItemMapping] = field(default_factory=list)
-@dataclass(frozen=True)
+@dataclass
class TrackAlbumMapping(ItemMapping):
"""Model for a track that is mapped to an album."""
return getattr(self.album, "image", None)
return None
- @property
- def artist(self) -> Artist | ItemMapping | None:
- """Return (first) artist of track."""
- if self.artists:
- return self.artists[0]
- return None
-
- @artist.setter
- def artist(self, artist: Artist | ItemMapping) -> None:
- """Set (first/only) artist of track."""
- self.artists = [artist]
-
@property
def has_chapters(self) -> bool:
"""
self.mass.register_api_command("music/album", self.get)
self.mass.register_api_command("music/album/tracks", self.tracks)
self.mass.register_api_command("music/album/versions", self.versions)
- self.mass.register_api_command("music/album/update", self.update_db_item)
- self.mass.register_api_command("music/album/delete", self.delete_db_item)
+ self.mass.register_api_command("music/album/update", self._update_db_item)
+ self.mass.register_api_command("music/album/delete", self.delete)
async def get(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
force_refresh: bool = False,
lazy: bool = True,
details: Album = None,
) -> Album:
"""Return (full) details for a single media item."""
album = await super().get(
- item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ item_id,
+ provider_instance_id_or_domain,
force_refresh=force_refresh,
lazy=lazy,
details=details,
add_to_db=add_to_db,
)
# append full artist details to full album item
- if album.artist:
- album.artist = await self.mass.music.artists.get(
- album.artist.item_id,
- provider_instance=album.artist.provider,
+ album.artists = [
+ await self.mass.music.artists.get(
+ item.item_id,
+ item.provider,
lazy=True,
- details=None if isinstance(album.artist, ItemMapping) else album.artist,
+ details=item,
add_to_db=add_to_db,
)
+ for item in album.artists
+ ]
return album
+ async def add(self, item: Album, skip_metadata_lookup: bool = False) -> Album:
+ """Add album to local db and return the database item."""
+ # resolve any ItemMapping artists
+ item.artists = [
+ await self.mass.music.artists.get_provider_item(artist.item_id, artist.provider)
+ if isinstance(artist, ItemMapping)
+ else artist
+ for artist in item.artists
+ ]
+ # grab additional metadata
+ if not skip_metadata_lookup:
+ await self.mass.metadata.get_album_metadata(item)
+ existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
+ if existing:
+ db_item = await self._update_db_item(existing.item_id, item)
+ else:
+ db_item = await self._add_db_item(item)
+ # also fetch the same album on all providers
+ if not skip_metadata_lookup:
+ await self._match(db_item)
+ # return final db_item after all match/metadata actions
+ db_item = await self.get_db_item(db_item.item_id)
+ # preload album tracks in db
+ for prov_mapping in db_item.provider_mappings:
+ for track in await self._get_provider_album_tracks(
+ prov_mapping.item_id, prov_mapping.provider_instance
+ ):
+ if not await self.mass.music.tracks.get_db_item_by_prov_id(
+ track.item_id, track.provider
+ ):
+ await self.mass.music.tracks.add(track, skip_metadata_lookup=True)
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ return db_item
+
+ async def delete(self, item_id: int, recursive: bool = False) -> None:
+ """Delete record from the database."""
+ # check album tracks
+ db_rows = await self.mass.music.database.get_rows_from_query(
+ f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE albums LIKE '%\"{item_id}\"%'",
+ limit=5000,
+ )
+ assert not (db_rows and not recursive), "Tracks attached to album"
+ for db_row in db_rows:
+ with contextlib.suppress(MediaNotFoundError):
+ await self.mass.music.tracks.delete(db_row["item_id"], recursive)
+
+ # delete the album itself from db
+ await super().delete(item_id)
+
async def tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> list[Track]:
"""Return album tracks for the given provider album id."""
- if "database" in (provider_domain, provider_instance):
+ if provider_instance_id_or_domain == "database":
if db_result := await self._get_db_album_tracks(item_id):
return db_result
# no results in db (yet), grab provider details
if not prov_mapping.available:
continue
return await self._get_provider_album_tracks(
- prov_mapping.item_id, provider_instance=prov_mapping.provider_instance
+ prov_mapping.item_id, prov_mapping.provider_instance
)
# return provider album tracks
- return await self._get_provider_album_tracks(item_id, provider_domain or provider_instance)
+ return await self._get_provider_album_tracks(item_id, provider_instance_id_or_domain)
async def versions(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> list[Album]:
"""Return all versions of an album we can find on all providers."""
- assert provider_domain or provider_instance, "Provider type or ID must be specified"
- album = await self.get(item_id, provider_domain or provider_instance, add_to_db=False)
+ album = await self.get(item_id, provider_instance_id_or_domain, add_to_db=False)
# perform a search on all provider(types) to collect all versions/variants
- provider_domains = {item.domain for item in self.mass.music.providers}
- search_query = f"{album.artist.name} - {album.name}"
+ search_query = f"{album.artists[0].name} - {album.name}"
all_versions = {
prov_item.item_id: prov_item
for prov_items in await asyncio.gather(
*[
- self.search(search_query, provider_domain)
- for provider_domain in provider_domains
+ self.search(search_query, provider_instance_id)
+ for provider_instance_id in self.mass.music.get_unique_providers()
]
)
for prov_item in prov_items
# return the aggregated result
return all_versions.values()
- async def add(self, item: Album) -> Album:
- """Add album to local db and return the database item."""
- # grab additional metadata
- await self.mass.metadata.get_album_metadata(item)
- existing = await self.get_db_item_by_prov_id(item.item_id, provider_instance=item.provider)
- if existing:
- db_item = await self.update_db_item(existing.item_id, item)
- else:
- db_item = await self.add_db_item(item)
- # also fetch same album on all providers
- await self._match(db_item)
- # return final db_item after all match/metadata actions
- db_item = await self.get_db_item(db_item.item_id)
- # preload album tracks in db
- for prov_mapping in db_item.provider_mappings:
- for track in await self._get_provider_album_tracks(
- prov_mapping.item_id, prov_mapping.provider_instance
- ):
- await self.mass.music.tracks.get(
- track.item_id, provider_instance=track.provider, details=track, add_to_db=True
- )
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
- return db_item
-
- async def add_db_item(self, item: Album) -> Album:
+ async def _add_db_item(self, item: Album) -> Album:
"""Add a new record to the database."""
assert item.provider_mappings, "Item is missing provider mapping(s)"
- assert item.artist, f"Album {item.name} is missing artist"
+ assert item.artists, f"Album {item.name} is missing artists"
async with self._db_add_lock:
cur_item = None
# always try to grab existing item by musicbrainz_id
break
if cur_item:
# update existing
- return await self.update_db_item(cur_item.item_id, item)
+ return await self._update_db_item(cur_item.item_id, item)
# insert new item
album_artists = await self._get_album_artists(item, cur_item)
# return created object
return await self.get_db_item(item_id)
- async def update_db_item(
+ async def _update_db_item(
self,
item_id: int,
item: Album,
) -> Album:
"""Update Album record in the database."""
assert item.provider_mappings, "Item is missing provider mapping(s)"
- assert item.artist, f"Album {item.name} is missing artist"
+ assert item.artists, f"Album {item.name} is missing artist"
cur_item = await self.get_db_item(item_id)
is_file_provider = item.provider.startswith("filesystem")
metadata = cur_item.metadata.update(item.metadata, is_file_provider)
self.logger.debug("updated %s in database: %s", item.name, item_id)
return await self.get_db_item(item_id)
- async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
- """Delete record from the database."""
- # check album tracks
- db_rows = await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE albums LIKE '%\"{item_id}\"%'",
- limit=5000,
- )
- assert not (db_rows and not recursive), "Tracks attached to album"
- for db_row in db_rows:
- with contextlib.suppress(MediaNotFoundError):
- await self.mass.music.tracks.delete_db_item(db_row["item_id"], recursive)
-
- # delete the album itself from db
- await super().delete_db_item(item_id)
-
async def _get_provider_album_tracks(
- self,
- item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ self, item_id: str, provider_instance_id_or_domain: str
) -> list[Track]:
"""Return album tracks for the given provider album id."""
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
- full_album = await self.get_provider_item(item_id, provider_instance or provider_domain)
+ full_album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
# prefer cache items (if any)
cache_key = f"{prov.instance_id}.albumtracks.{item_id}"
cache_checksum = full_album.metadata.checksum
async def _get_provider_dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
):
"""Generate a dynamic list of tracks based on the album content."""
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
return []
album_tracks = await self._get_provider_album_tracks(
- item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ item_id, provider_instance_id_or_domain
)
# Grab a random track from the album that we use to obtain similar tracks for
track = choice(album_tracks)
match_found = False
for search_str in (
db_album.name,
- f"{db_album.artist.name} - {db_album.name}",
- f"{db_album.artist.name} {db_album.name}",
+ f"{db_album.artists[0].name} - {db_album.name}",
+ f"{db_album.artists[0].name} {db_album.name}",
):
if match_found:
break
)
if compare_album(prov_album, db_album):
# 100% match, we can simply update the db with additional provider ids
- await self.update_db_item(db_album.item_id, prov_album)
+ await self._update_db_item(db_album.item_id, prov_album)
match_found = True
return match_found
return ItemMapping.from_item(artist)
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
- artist.item_id, provider_instance=artist.provider
+ artist.item_id, artist.provider
):
return ItemMapping.from_item(db_artist)
- db_artist = await self.mass.music.artists.add_db_item(artist)
+ db_artist = await self.mass.music.artists._add_db_item(artist)
return ItemMapping.from_item(db_artist)
DB_TABLE_ARTISTS,
DB_TABLE_TRACKS,
)
-from music_assistant.server.helpers.compare import compare_strings
+from music_assistant.server.helpers.compare import compare_artist, compare_strings
if TYPE_CHECKING:
from music_assistant.server.models.music_provider import MusicProvider
self.mass.register_api_command("music/artist", self.get)
self.mass.register_api_command("music/artist/albums", self.albums)
self.mass.register_api_command("music/artist/tracks", self.tracks)
- self.mass.register_api_command("music/artist/update", self.update_db_item)
- self.mass.register_api_command("music/artist/delete", self.delete_db_item)
+ self.mass.register_api_command("music/artist/update", self._update_db_item)
+ self.mass.register_api_command("music/artist/delete", self.delete)
+
+ async def add(self, item: Artist, skip_metadata_lookup: bool = False) -> Artist:
+ """Add artist to local db and return the database item."""
+ # grab musicbrainz id and additional metadata
+ if not skip_metadata_lookup:
+ await self.mass.metadata.get_artist_metadata(item)
+ existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
+ if existing:
+ db_item = await self._update_db_item(existing.item_id, item)
+ else:
+ db_item = await self._add_db_item(item)
+ # also fetch same artist on all providers
+ if not skip_metadata_lookup:
+ await self.match_artist(db_item)
+ # return final db_item after all match/metadata actions
+ db_item = await self.get_db_item(db_item.item_id)
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ return db_item
async def album_artists(
self,
async def tracks(
self,
item_id: str | None = None,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str | None = None,
artist: Artist | None = None,
) -> list[Track]:
"""Return top tracks for an artist."""
if not artist:
- artist = await self.get(item_id, provider_domain, provider_instance, add_to_db=False)
+ artist = await self.get(item_id, provider_instance_id_or_domain, add_to_db=False)
# get results from all providers
coros = [
self.get_provider_artist_toptracks(
prov_mapping.item_id,
- provider_domain=prov_mapping.provider_domain,
- provider_instance=prov_mapping.provider_instance,
+ prov_mapping.provider_instance,
cache_checksum=artist.metadata.checksum,
)
for prov_mapping in artist.provider_mappings
async def albums(
self,
item_id: str | None = None,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str | None = None,
artist: Artist | None = None,
) -> list[Album]:
"""Return (all/most popular) albums for an artist."""
if not artist:
- artist = await self.get(item_id, provider_domain or provider_instance, add_to_db=False)
+ artist = await self.get(item_id, provider_instance_id_or_domain, add_to_db=False)
# get results from all providers
coros = [
self.get_provider_artist_albums(
)
for item in artist.provider_mappings
]
- albums = itertools.chain.from_iterable(await asyncio.gather(*coros))
+ albums: list[Album] = itertools.chain.from_iterable(await asyncio.gather(*coros))
# merge duplicates using a dict
final_items: dict[str, Album] = {}
for album in albums:
- key = f".{album.name}.{album.version}"
+ key = f".{album.name}.{album.version}.{album.metadata.explicit}"
if key in final_items:
final_items[key].provider_mappings.update(album.provider_mappings)
else:
final_items[key].in_library = True
return list(final_items.values())
- async def add(self, item: Artist) -> Artist:
- """Add artist to local db and return the database item."""
- # grab musicbrainz id and additional metadata
- await self.mass.metadata.get_artist_metadata(item)
- existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
- if existing:
- db_item = await self.update_db_item(existing.item_id, item)
- else:
- db_item = await self.add_db_item(item)
- # also fetch same artist on all providers
- await self.match_artist(db_item)
- # return final db_item after all match/metadata actions
- db_item = await self.get_db_item(db_item.item_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
+ async def delete(self, item_id: int, recursive: bool = False) -> None:
+ """Delete record from the database."""
+ # check artist albums
+ db_rows = await self.mass.music.database.get_rows_from_query(
+ f"SELECT item_id FROM {DB_TABLE_ALBUMS} WHERE artists LIKE '%\"{item_id}\"%'",
+ limit=5000,
)
- return db_item
+ assert not (db_rows and not recursive), "Albums attached to artist"
+ for db_row in db_rows:
+ with contextlib.suppress(MediaNotFoundError):
+ await self.mass.music.albums.delete(db_row["item_id"], recursive)
+
+ # check artist tracks
+ db_rows = await self.mass.music.database.get_rows_from_query(
+ f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE artists LIKE '%\"{item_id}\"%'",
+ limit=5000,
+ )
+ assert not (db_rows and not recursive), "Tracks attached to artist"
+ for db_row in db_rows:
+ with contextlib.suppress(MediaNotFoundError):
+ await self.mass.music.albums.delete(db_row["item_id"], recursive)
+
+ # delete the artist itself from db
+ await super().delete(item_id)
async def match_artist(self, db_artist: Artist):
"""Try to find matching artists on all providers for the provided (database) item_id.
async def get_provider_artist_toptracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str | None = None,
cache_checksum: Any = None,
) -> list[Track]:
"""Return top tracks for an artist on given provider."""
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
# prefer cache items (if any)
items = []
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
):
- prov_id = provider_instance or provider_domain
# TODO: adjust to json query instead of text search?
query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
- query += f" AND provider_mappings LIKE '%\"{prov_id}\"%'"
+ query += f" AND provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
items = await self.mass.music.tracks.get_db_items_by_query(query)
# store (serializable items) in cache
self.mass.create_task(
async def get_provider_artist_albums(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str | None = None,
cache_checksum: Any = None,
) -> list[Album]:
"""Return albums for an artist on given provider."""
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
# prefer cache items (if any)
# fallback implementation using the db
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id( # noqa: PLR5501
item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
):
- prov_id = provider_instance or provider_domain
# TODO: adjust to json query instead of text search?
query = f"SELECT * FROM albums WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
- query += f" AND provider_mappings LIKE '%\"{prov_id}\"%'"
+ query += f" AND provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
items = await self.mass.music.albums.get_db_items_by_query(query)
else:
# edge case
)
return items
- async def add_db_item(self, item: Artist) -> Artist:
+ async def _add_db_item(self, item: Artist) -> Artist:
"""Add a new item record to the database."""
assert isinstance(item, Artist), "Not a full Artist object"
assert item.provider_mappings, "Item is missing provider mapping(s)"
break
if cur_item:
# update existing
- return await self.update_db_item(cur_item.item_id, item)
+ return await self._update_db_item(cur_item.item_id, item)
# insert item
item.timestamp_added = int(utc_timestamp())
# return created object
return await self.get_db_item(item_id)
- async def update_db_item(
+ async def _update_db_item(
self,
item_id: int,
item: Artist,
self.logger.debug("updated %s in database: %s", item.name, item_id)
return await self.get_db_item(item_id)
- async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
- """Delete record from the database."""
- # check artist albums
- db_rows = await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_ALBUMS} WHERE artists LIKE '%\"{item_id}\"%'",
- limit=5000,
- )
- assert not (db_rows and not recursive), "Albums attached to artist"
- for db_row in db_rows:
- with contextlib.suppress(MediaNotFoundError):
- await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
-
- # check artist tracks
- db_rows = await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE artists LIKE '%\"{item_id}\"%'",
- limit=5000,
- )
- assert not (db_rows and not recursive), "Tracks attached to artist"
- for db_row in db_rows:
- with contextlib.suppress(MediaNotFoundError):
- await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
-
- # delete the artist itself from db
- await super().delete_db_item(item_id)
-
async def _get_provider_dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
):
"""Generate a dynamic list of tracks based on the artist's top tracks."""
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
return []
top_tracks = await self.get_provider_artist_toptracks(
- item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ item_id,
+ provider_instance_id_or_domain,
)
# Grab a random track from the album that we use to obtain similar tracks for
track = choice(top_tracks)
prov_artist = await self.get_provider_item(
search_item_artist.item_id, search_item_artist.provider
)
- await self.update_db_item(db_artist.item_id, prov_artist)
+ await self._update_db_item(db_artist.item_id, prov_artist)
return True
# try to get a match with some reference albums of this artist
artist_albums = await self.albums(db_artist.item_id, db_artist.provider, artist=db_artist)
for ref_album in artist_albums:
if ref_album.album_type == AlbumType.COMPILATION:
continue
- if ref_album.artist is None:
+ if not ref_album.artists:
continue
for search_str in (
ref_album.name,
):
search_result = await self.mass.music.albums.search(search_str, provider.domain)
for search_result_item in search_result:
- if search_result_item.artist is None:
+ if not search_result_item.artists:
continue
if search_result_item.sort_name != ref_album.sort_name:
continue
# artist must match 100%
- if search_result_item.artist.sort_name != ref_album.artist.sort_name:
+ if not compare_artist(search_result_item.artists[0], db_artist):
continue
# 100% match
# get full artist details so we have all metadata
prov_artist = await self.get_provider_item(
- search_result_item.artist.item_id,
- search_result_item.artist.provider,
+ search_result_item.artists[0].item_id,
+ search_result_item.artists[0].provider,
)
- await self.update_db_item(db_artist.item_id, prov_artist)
+ await self._update_db_item(db_artist.item_id, prov_artist)
return True
return False
from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
from music_assistant.common.models.errors import MediaNotFoundError
from music_assistant.common.models.media_items import (
+ ItemMapping,
MediaItemType,
PagedItems,
ProviderMapping,
self._db_add_lock = asyncio.Lock()
@abstractmethod
- async def add(self, item: ItemCls) -> ItemCls:
+ async def add(self, item: ItemCls, skip_metadata_lookup: bool = False) -> ItemCls:
"""Add item to local db and return the database item."""
raise NotImplementedError
- @abstractmethod
- async def add_db_item(self, item: ItemCls, overwrite_existing: bool = False) -> ItemCls:
- """Add a new record for this mediatype to the database."""
- raise NotImplementedError
-
- @abstractmethod
- async def update_db_item(
- self,
- item_id: int,
- item: ItemCls,
- overwrite: bool = False,
- ) -> ItemCls:
- """Update record in the database, merging data."""
- raise NotImplementedError
+ async def delete(self, item_id: int, recursive: bool = False) -> None: # noqa: ARG002
+ """Delete record from the database."""
+ db_item = await self.get_db_item(item_id)
+ assert db_item, f"Item does not exist: {item_id}"
+ # delete item
+ await self.mass.music.database.delete(
+ self.db_table,
+ {"item_id": int(item_id)},
+ )
+ # update provider_mappings table
+ await self.mass.music.database.delete(
+ DB_TABLE_PROVIDER_MAPPINGS,
+ {"media_type": self.media_type.value, "item_id": int(item_id)},
+ )
+ # NOTE: this does not delete any references to this item in other records,
+ # this is handled/overridden in the mediatype specific controllers
+ self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, db_item.uri, db_item)
+ self.logger.debug("deleted item with id %s from database", item_id)
async def db_items(
self,
async def get(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
force_refresh: bool = False,
lazy: bool = True,
details: ItemCls = None,
add_to_db: bool = True,
) -> ItemCls:
"""Return (full) details for a single media item."""
- assert (
- provider_domain or provider_instance
- ), "provider_domain or provider_instance must be supplied"
- if not add_to_db and "database" in (provider_domain, provider_instance):
+ if not add_to_db and provider_instance_id_or_domain == "database":
return await self.get_db_item(item_id)
- if details and details.provider == "database":
+ if details and (details.provider == "database" or isinstance(details, ItemMapping)):
+ # invalidate details if not (full) provider details for this item
details = None
db_item = await self.get_db_item_by_prov_id(
- item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ item_id,
+ provider_instance_id_or_domain,
)
if db_item and (time() - (db_item.metadata.last_refresh or 0)) > REFRESH_INTERVAL:
# it's been too long since the full metadata was last retrieved (or never at all)
force_refresh = True
if db_item and force_refresh and add_to_db:
# get (first) provider item id belonging to this db item
- provider_instance, item_id = await self.get_provider_mapping(db_item)
+ provider_instance_id_or_domain, item_id = await self.get_provider_mapping(db_item)
elif db_item:
# we have a db item and no refreshing is needed, return the results!
return db_item
- if not details and provider_instance:
+ if not details:
# no details provider nor in db, fetch them from the provider
details = await self.get_provider_item(
- item_id, provider_instance, force_refresh=force_refresh
+ item_id, provider_instance_id_or_domain, force_refresh=force_refresh
)
- if not details and provider_domain:
- # check providers for given provider domain one by one
- for prov in self.mass.music.providers:
- if not prov.available:
- continue
- if prov.domain == provider_domain:
- try:
- details = await self.get_provider_item(
- item_id, prov.domain, force_refresh=force_refresh
- )
- except MediaNotFoundError:
- pass
- else:
- break
if not details:
# we couldn't get a match from any of the providers, raise error
- raise MediaNotFoundError(f"Item not found: {provider_domain or id}/{item_id}")
+ raise MediaNotFoundError(f"Item not found: {provider_instance_id_or_domain}/{item_id}")
if not add_to_db:
return details
# create task to add the item to the db, including matching metadata etc. takes some time
# in 99% of the cases we just return lazy because we want the details as fast as possible
# only if we really need to wait for the result (e.g. to prevent race conditions), we
- # can set lazy to false and we await to job to complete.
+ # can set lazy to false and we await the job to complete.
task_id = f"add_{self.media_type.value}.{details.provider}.{details.item_id}"
add_task = self.mass.create_task(self.add, details, task_id=task_id)
if not lazy:
async def search(
self,
search_query: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
) -> list[ItemCls]:
"""Search database or provider with given query."""
# create safe search string
search_query = search_query.replace("/", " ").replace("'", "")
- if "database" in (provider_domain, provider_instance):
+ if provider_instance_id_or_domain == "database":
return [
self.item_cls.from_db_row(db_row)
for db_row in await self.mass.music.database.search(self.db_table, search_query)
]
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
if ProviderFeature.SEARCH not in prov.supported_features:
async def add_to_library(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> None:
"""Add an item to the library."""
prov_item = await self.get_db_item_by_prov_id(
item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
)
if prov_item is None:
- prov_item = await self.get_provider_item(item_id, provider_instance or provider_domain)
+ prov_item = await self.get_provider_item(item_id, provider_instance_id_or_domain)
if prov_item.in_library is True:
return
# mark as favorite/library item on provider(s)
prov_item.in_library = True
await self.set_db_library(prov_item.item_id, True)
- async def remove_from_library(
- self,
- item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
- ) -> None:
+ async def remove_from_library(self, item_id: str, provider_instance_id_or_domain: str) -> None:
"""Remove item from the library."""
prov_item = await self.get_db_item_by_prov_id(
item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
)
if prov_item is None:
- prov_item = await self.get_provider_item(item_id, provider_instance or provider_domain)
+ prov_item = await self.get_provider_item(item_id, provider_instance_id_or_domain)
if prov_item.in_library is False:
return
# unmark as favorite/library item on provider(s)
async def get_db_item_by_prov_id(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> ItemCls | None:
"""Get the database item for the given provider_instance."""
- assert (
- provider_domain or provider_instance
- ), "provider_domain or provider_instance must be supplied"
- if "database" in (provider_domain, provider_instance):
+ if provider_instance_id_or_domain == "database":
return await self.get_db_item(item_id)
for item in await self.get_db_items_by_prov_id(
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
provider_item_ids=(item_id,),
):
return item
async def get_db_items_by_prov_id(
self,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
provider_item_ids: tuple[str, ...] | None = None,
limit: int = 500,
offset: int = 0,
) -> list[ItemCls]:
"""Fetch all records from database for given provider."""
- assert (
- provider_domain or provider_instance
- ), "provider_domain or provider_instance must be supplied"
- if "database" in (provider_domain, provider_instance):
+ if provider_instance_id_or_domain == "database":
return await self.get_db_items_by_query(limit=limit, offset=offset)
# we use the separate provider_mappings table to perform quick lookups
# from provider id's to database id's because this is faster
# (and more compatible) than querying the provider_mappings json column
subquery = f"SELECT item_id FROM {DB_TABLE_PROVIDER_MAPPINGS} "
- if provider_instance is not None:
- subquery += f"WHERE provider_instance = '{provider_instance}'"
- elif provider_domain is not None:
- subquery += f"WHERE provider_domain = '{provider_domain}'"
+ subquery += f"WHERE (provider_instance = '{provider_instance_id_or_domain}'"
+ subquery += f" OR provider_domain = '{provider_instance_id_or_domain}')"
if provider_item_ids is not None:
prov_ids = str(tuple(provider_item_ids))
if prov_ids.endswith(",)"):
async def iter_db_items_by_prov_id(
self,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
provider_item_ids: tuple[str, ...] | None = None,
limit: int = 500,
offset: int = 0,
offset: int = 0
while True:
next_items = await self.get_db_items_by_prov_id(
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
provider_item_ids=provider_item_ids,
limit=limit,
offset=offset,
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
async def get_provider_item(
- self, item_id: str, provider_domain_or_instance_id: str, force_refresh: bool = False
+ self, item_id: str, provider_instance_id_or_domain: str, force_refresh: bool = False
) -> ItemCls:
"""Return item details for the given provider item id."""
cache_key = (
- f"provider_item.{self.media_type.value}.{provider_domain_or_instance_id}.{item_id}"
+ f"provider_item.{self.media_type.value}.{provider_instance_id_or_domain}.{item_id}"
)
- if provider_domain_or_instance_id == "database":
+ if provider_instance_id_or_domain == "database":
return await self.get_db_item(item_id)
if not force_refresh and (cache := await self.mass.cache.get(cache_key)):
return self.item_cls.from_dict(cache)
- if provider := self.mass.get_provider(provider_domain_or_instance_id):
- item = await provider.get_item(self.media_type, item_id)
- await self.mass.cache.set(cache_key, item.to_dict(), 3600)
- return item
+ if provider := self.mass.get_provider(provider_instance_id_or_domain): # noqa: SIM102
+ if item := await provider.get_item(self.media_type, item_id):
+ await self.mass.cache.set(cache_key, item.to_dict())
+ return item
raise MediaNotFoundError(
- f"{self.media_type.value}://{item_id} not found on provider {provider_domain_or_instance_id}" # noqa: E501
+ f"{self.media_type.value}://{item_id} not "
+ "found on provider {provider_instance_id_or_domain}"
)
- async def remove_prov_mapping(self, item_id: int, provider_instance: str) -> None:
+ async def remove_prov_mapping(self, item_id: int, provider_instance_id: str) -> None:
"""Remove provider id(s) from item."""
try:
db_item = await self.get_db_item(item_id)
{
"media_type": self.media_type.value,
"item_id": int(item_id),
- "provider_instance": provider_instance,
+ "provider_instance": provider_instance_id,
},
)
# update the item in db (provider_mappings column only)
db_item.provider_mappings = {
- x for x in db_item.provider_mappings if x.provider_instance != provider_instance
+ x for x in db_item.provider_mappings if x.provider_instance != provider_instance_id
}
match = {"item_id": item_id}
if db_item.provider_mappings:
match,
{"provider_mappings": serialize_to_json(db_item.provider_mappings)},
)
- self.logger.debug("removed provider %s from item id %s", provider_instance, item_id)
+ self.logger.debug("removed provider %s from item id %s", provider_instance_id, item_id)
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
else:
# delete item if it has no more providers
with suppress(AssertionError):
- await self.delete_db_item(item_id)
-
- async def delete_db_item(self, item_id: int, recursive: bool = False) -> None: # noqa: ARG002
- """Delete record from the database."""
- db_item = await self.get_db_item(item_id)
- assert db_item, f"Item does not exist: {item_id}"
- # delete item
- await self.mass.music.database.delete(
- self.db_table,
- {"item_id": int(item_id)},
- )
- # update provider_mappings table
- await self.mass.music.database.delete(
- DB_TABLE_PROVIDER_MAPPINGS,
- {"media_type": self.media_type.value, "item_id": int(item_id)},
- )
- # NOTE: this does not delete any references to this item in other records,
- # this is handled/overridden in the mediatype specific controllers
- self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, db_item.uri, db_item)
- self.logger.debug("deleted item with id %s from database", item_id)
+ await self.delete(item_id)
async def dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
) -> list[Track]:
"""Return a dynamic list of tracks based on the given item."""
- ref_item = await self.get(item_id, provider_domain, provider_instance)
+ ref_item = await self.get(item_id, provider_instance_id_or_domain)
for prov_mapping in ref_item.provider_mappings:
prov = self.mass.get_provider(prov_mapping.provider_instance)
if prov is None:
if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
continue
return await self._get_provider_dynamic_tracks(
- item_id=prov_mapping.item_id,
- provider_domain=prov_mapping.provider_domain,
- provider_instance=prov_mapping.provider_instance,
+ prov_mapping.item_id,
+ provider_instance_id_or_domain,
limit=limit,
)
# Fallback to the default implementation
async def _get_provider_dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
) -> list[Track]:
"""Generate a dynamic list of tracks based on the item's content."""
import random
from collections.abc import AsyncGenerator
-from time import time
from typing import Any
from music_assistant.common.helpers.datetime import utc_timestamp
self.mass.register_api_command("music/playlist/tracks", self.tracks)
self.mass.register_api_command("music/playlist/tracks/add", self.add_playlist_tracks)
self.mass.register_api_command("music/playlist/tracks/remove", self.remove_playlist_tracks)
- self.mass.register_api_command("music/playlist/update", self.update_db_item)
- self.mass.register_api_command("music/playlist/delete", self.delete_db_item)
+ self.mass.register_api_command("music/playlist/update", self._update_db_item)
+ self.mass.register_api_command("music/playlist/delete", self.delete)
self.mass.register_api_command("music/playlist/create", self.create)
+ async def add(self, item: Playlist, skip_metadata_lookup: bool = False) -> Playlist:
+ """Add playlist to local db and return the new database item."""
+ if not skip_metadata_lookup:
+ await self.mass.metadata.get_playlist_metadata(item)
+ # preload playlist tracks listing (do not load them in the db)
+ async for track in self.tracks(item.item_id, item.provider):
+ if not item.is_editable:
+ continue
+ # only add tracks from owned (editable) playlists to the db to avoid too much clutter
+ if not await self.mass.music.tracks.get_db_item_by_prov_id(
+ track.item_id, track.provider
+ ):
+ await self.mass.music.tracks.add(track, skip_metadata_lookup=True)
+ existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
+ if existing:
+ db_item = await self._update_db_item(existing.item_id, item)
+ else:
+ db_item = await self._add_db_item(item)
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
+ db_item.uri,
+ db_item,
+ )
+ return db_item
+
async def tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> AsyncGenerator[Track, None]:
"""Return playlist tracks for the given provider playlist id."""
- playlist = await self.get(item_id, provider_domain, provider_instance)
+ playlist = await self.get(item_id, provider_instance_id_or_domain)
prov = next(x for x in playlist.provider_mappings)
async for track in self._get_provider_playlist_tracks(
prov.item_id,
- provider_domain=prov.provider_domain,
- provider_instance=prov.provider_instance,
+ prov.provider_instance,
cache_checksum=playlist.metadata.checksum,
):
yield track
- async def add(self, item: Playlist) -> Playlist:
- """Add playlist to local db and return the new database item."""
- item.metadata.last_refresh = int(time())
- await self.mass.metadata.get_playlist_metadata(item)
- existing = await self.get_db_item_by_prov_id(item.item_id, provider_instance=item.provider)
- if existing:
- db_item = await self.update_db_item(existing.item_id, item)
- else:
- db_item = await self.add_db_item(item)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
- db_item.uri,
- db_item,
- )
- return db_item
-
async def create(self, name: str, provider_instance_or_domain: str | None = None) -> Playlist:
"""Create new playlist."""
# if provider is omitted, just pick first provider
# grab all existing track ids in the playlist so we can check for duplicates
cur_playlist_track_ids = set()
count = 0
- async for item in self.tracks(playlist_prov.item_id, playlist_prov.provider_domain):
+ async for item in self.tracks(playlist_prov.item_id, playlist_prov.provider_instance):
count += 1
cur_playlist_track_ids.update(
{
provider = self.mass.get_provider(playlist_prov.provider_instance)
await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
# invalidate cache by updating the checksum
- await self.get(db_playlist_id, provider_domain="database", force_refresh=True)
+ await self.get(db_playlist_id, "database", force_refresh=True)
async def remove_playlist_tracks(
self, db_playlist_id: str, positions_to_remove: tuple[int, ...]
# invalidate cache by updating the checksum
await self.get(db_playlist_id, "database", force_refresh=True)
- async def add_db_item(self, item: Playlist) -> Playlist:
+ async def _add_db_item(self, item: Playlist) -> Playlist:
"""Add a new record to the database."""
assert item.provider_mappings, "Item is missing provider mapping(s)"
async with self._db_add_lock:
match = {"name": item.name, "owner": item.owner}
if cur_item := await self.mass.music.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_item(cur_item["item_id"], item)
+ return await self._update_db_item(cur_item["item_id"], item)
# insert new item
item.timestamp_added = int(utc_timestamp())
# return created object
return await self.get_db_item(item_id)
- async def update_db_item(
+ async def _update_db_item(
self,
item_id: int,
item: Playlist,
async def _get_provider_playlist_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
cache_checksum: Any = None,
) -> AsyncGenerator[Track, None]:
"""Return album tracks for the given provider album id."""
- provider = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ provider = self.mass.get_provider(provider_instance_id_or_domain)
if not provider:
return
# prefer cache items (if any)
async def _get_provider_dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
):
"""Generate a dynamic list of tracks based on the playlist content."""
- provider = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ provider = self.mass.get_provider(provider_instance_id_or_domain)
if not provider or ProviderFeature.SIMILAR_TRACKS not in provider.supported_features:
return []
playlist_tracks = [
x
async for x in self._get_provider_playlist_tracks(
- item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ item_id, provider_instance_id_or_domain
)
# filter out unavailable tracks
if x.available
from __future__ import annotations
import asyncio
-from time import time
from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
self.mass.register_api_command("music/radios", self.db_items)
self.mass.register_api_command("music/radio", self.get)
self.mass.register_api_command("music/radio/versions", self.versions)
- self.mass.register_api_command("music/radio/update", self.update_db_item)
- self.mass.register_api_command("music/radio/delete", self.delete_db_item)
+ self.mass.register_api_command("music/radio/update", self._update_db_item)
+ self.mass.register_api_command("music/radio/delete", self.delete)
async def versions(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> list[Radio]:
"""Return all versions of a radio station we can find on all providers."""
- assert provider_domain or provider_instance, "Provider type or ID must be specified"
- radio = await self.get(item_id, provider_domain, provider_instance)
+ radio = await self.get(item_id, provider_instance_id_or_domain)
# perform a search on all provider(types) to collect all versions/variants
- provider_domains = {prov.domain for prov in self.mass.music.providers}
all_versions = {
prov_item.item_id: prov_item
for prov_items in await asyncio.gather(
- *[self.search(radio.name, provider_domain) for provider_domain in provider_domains]
+ *[
+ self.search(radio.name, provider_domain)
+ for provider_domain in self.mass.music.get_unique_providers()
+ ]
)
for prov_item in prov_items
if loose_compare_strings(radio.name, prov_item.name)
# return the aggregated result
return all_versions.values()
- async def add(self, item: Radio) -> Radio:
+ async def add(self, item: Radio, skip_metadata_lookup: bool = False) -> Radio:
"""Add radio to local db and return the new database item."""
- item.metadata.last_refresh = int(time())
- await self.mass.metadata.get_radio_metadata(item)
- existing = await self.get_db_item_by_prov_id(item.item_id, provider_instance=item.provider)
+ if not skip_metadata_lookup:
+ await self.mass.metadata.get_radio_metadata(item)
+ existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
if existing:
- db_item = await self.update_db_item(existing.item_id, item)
+ db_item = await self._update_db_item(existing.item_id, item)
else:
- db_item = await self.add_db_item(item)
+ db_item = await self._add_db_item(item)
self.mass.signal_event(
EventType.MEDIA_ITEM_UPDATED if existing else EventType.MEDIA_ITEM_ADDED,
db_item.uri,
)
return db_item
- async def add_db_item(self, item: Radio) -> Radio:
+ async def _add_db_item(self, item: Radio) -> Radio:
"""Add a new item record to the database."""
assert item.provider_mappings, "Item is missing provider mapping(s)"
async with self._db_add_lock:
match = {"name": item.name}
if cur_item := await self.mass.music.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_item(cur_item["item_id"], item)
+ return await self._update_db_item(cur_item["item_id"], item)
# insert new item
item.timestamp_added = int(utc_timestamp())
# return created object
return await self.get_db_item(item_id)
- async def update_db_item(
+ async def _update_db_item(
self,
item_id: int,
item: Radio,
async def _get_provider_dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
) -> list[Track]:
"""Generate a dynamic list of tracks based on the item's content."""
self.mass.register_api_command("music/track", self.get)
self.mass.register_api_command("music/track/versions", self.versions)
self.mass.register_api_command("music/track/albums", self.albums)
- self.mass.register_api_command("music/track/update", self.update_db_item)
- self.mass.register_api_command("music/track/delete", self.delete_db_item)
+ self.mass.register_api_command("music/track/update", self._update_db_item)
+ self.mass.register_api_command("music/track/delete", self.delete)
self.mass.register_api_command("music/track/preview", self.get_preview_url)
async def get(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
force_refresh: bool = False,
lazy: bool = True,
details: Track = None,
) -> Track:
"""Return (full) details for a single media item."""
track = await super().get(
- item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ item_id,
+ provider_instance_id_or_domain,
force_refresh=force_refresh,
lazy=lazy,
details=details,
elif track.album:
track.album = await self.mass.music.albums.get(
track.album.item_id,
- provider_instance=track.album.provider,
+ track.album.provider,
lazy=True,
details=None if isinstance(track.album, ItemMapping) else track.album,
add_to_db=add_to_db,
full_artists.append(
await self.mass.music.artists.get(
artist.item_id,
- provider_instance=artist.provider,
+ artist.provider,
lazy=True,
details=None if isinstance(artist, ItemMapping) else artist,
add_to_db=add_to_db,
track.artists = full_artists
return track
- async def add(self, item: Track) -> Track:
+ async def add(self, item: Track, skip_metadata_lookup: bool = False) -> Track:
"""Add track to local db and return the new database item."""
- # make sure we have artists
assert item.artists
+ # resolve any ItemMapping artists
+ item.artists = [
+ await self.mass.music.artists.get_provider_item(artist.item_id, artist.provider)
+ if isinstance(artist, ItemMapping)
+ else artist
+ for artist in item.artists
+ ]
+ # resolve ItemMapping album
+ if isinstance(item.album, ItemMapping):
+ item.album = await self.mass.music.albums.get_provider_item(
+ item.album.item_id, item.album.provider
+ )
# grab additional metadata
- await self.mass.metadata.get_track_metadata(item)
- existing = await self.get_db_item_by_prov_id(item.item_id, provider_instance=item.provider)
+ if not skip_metadata_lookup:
+ await self.mass.metadata.get_track_metadata(item)
+ existing = await self.get_db_item_by_prov_id(item.item_id, item.provider)
if existing:
- db_item = await self.update_db_item(existing.item_id, item)
+ db_item = await self._update_db_item(existing.item_id, item)
else:
- db_item = await self.add_db_item(item)
+ db_item = await self._add_db_item(item)
# also fetch same track on all providers (will also get other quality versions)
- await self._match(db_item)
+ if not skip_metadata_lookup:
+ await self._match(db_item)
# return final db_item after all match/metadata actions
db_item = await self.get_db_item(db_item.item_id)
self.mass.signal_event(
async def versions(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> list[Track]:
"""Return all versions of a track we can find on all providers."""
- assert provider_domain or provider_instance, "Provider type or ID must be specified"
- track = await self.get(item_id, provider_domain or provider_instance, add_to_db=False)
+ track = await self.get(item_id, provider_instance_id_or_domain, add_to_db=False)
# perform a search on all provider(types) to collect all versions/variants
- provider_domains = {prov.domain for prov in self.mass.music.providers}
- search_query = f"{track.artist.name} - {track.name}"
+ search_query = f"{track.artists[0].name} - {track.name}"
all_versions = {
prov_item.item_id: prov_item
for prov_items in await asyncio.gather(
*[
self.search(search_query, provider_domain)
- for provider_domain in provider_domains
+ for provider_domain in self.mass.music.get_unique_providers()
]
)
for prov_item in prov_items
async def albums(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> list[Album]:
"""Return all albums the track appears on."""
- assert provider_domain or provider_instance, "Provider type or ID must be specified"
- track = await self.get(item_id, provider_domain or provider_instance, add_to_db=False)
+ track = await self.get(item_id, provider_instance_id_or_domain, add_to_db=False)
return await asyncio.gather(
*[
- self.mass.music.albums.get(
- album.item_id, provider_instance=album.provider, add_to_db=False
- )
+ self.mass.music.albums.get(album.item_id, album.provider, add_to_db=False)
for album in track.albums
]
)
- async def get_preview_url(self, provider_domain: str, item_id: str) -> str:
+ async def get_preview_url(self, provider_instance_id_or_domain: str, item_id: str) -> str:
"""Return url to short preview sample."""
- track = await self.get_provider_item(item_id, provider_domain)
+ track = await self.get_provider_item(item_id, provider_instance_id_or_domain)
# prefer provider-provided preview
if preview := track.metadata.preview:
return preview
# fallback to a preview/sample hosted by our own webserver
- return self.mass.streams.get_preview_url(provider_domain, item_id)
+ return self.mass.streams.get_preview_url(provider_instance_id_or_domain, item_id)
async def _match(self, db_track: Track) -> None:
"""Try to find matching track on all providers for the provided (database) track_id.
if compare_track(prov_track, db_track):
# 100% match, we can simply update the db with additional provider ids
match_found = True
- await self.update_db_item(db_track.item_id, search_result_item)
+ await self._update_db_item(db_track.item_id, search_result_item)
if not match_found:
self.logger.debug(
async def _get_provider_dynamic_tracks(
self,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
limit: int = 25,
):
"""Generate a dynamic list of tracks based on the track."""
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ assert provider_instance_id_or_domain != "database"
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
"No Music Provider found that supports requesting similar tracks."
)
- async def add_db_item(self, item: Track) -> Track:
+ async def _add_db_item(self, item: Track) -> Track:
"""Add a new item record to the database."""
assert isinstance(item, Track), "Not a full Track object"
assert item.artists, "Track is missing artist(s)"
break
if cur_item:
# update existing
- return await self.update_db_item(cur_item.item_id, item)
+ return await self._update_db_item(cur_item.item_id, item)
# no existing match found: insert new item
track_artists = await self._get_track_artists(item)
self.logger.debug("added %s to database: %s", item.name, item_id)
return await self.get_db_item(item_id)
- async def update_db_item(
+ async def _update_db_item(
self,
item_id: int,
item: Track,
return ItemMapping.from_item(album)
if db_album := await self.mass.music.albums.get_db_item_by_prov_id(
- album.item_id, provider_domain=album.provider
+ album.item_id, album.provider
):
return ItemMapping.from_item(db_album)
- db_album = await self.mass.music.albums.add_db_item(album)
+ db_album = await self.mass.music.albums._add_db_item(album)
return ItemMapping.from_item(db_album)
async def _get_artist_mapping(self, artist: Artist | ItemMapping) -> ItemMapping:
return ItemMapping.from_item(artist)
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
- artist.item_id, provider_domain=artist.provider
+ artist.item_id, artist.provider
):
return ItemMapping.from_item(db_artist)
- db_artist = await self.mass.music.artists.add_db_item(artist)
+ db_artist = await self.mass.music.artists._add_db_item(artist)
return ItemMapping.from_item(db_artist)
# set timestamp, used to determine when this function was last called
album.metadata.last_refresh = int(time())
# ensure the album has a musicbrainz id or artist
- if not (album.musicbrainz_id or album.artist):
+ if not (album.musicbrainz_id or album.artists):
return
# collect metadata from all providers
for provider in self.providers:
return await self.get_image_url_for_item(media_item.album, img_type, resolve)
# try artist instead for albums
- if media_item.media_type == MediaType.ALBUM and media_item.artist:
- return await self.get_image_url_for_item(media_item.artist, img_type, resolve)
+ if media_item.media_type == MediaType.ALBUM and media_item.artists:
+ return await self.get_image_url_for_item(media_item.artists[0], img_type, resolve)
# last resort: track artist(s)
if media_item.media_type == MediaType.TRACK and media_item.artists:
*[
self.search_provider(
search_query,
+ provider_instance,
media_types,
- provider_instance=provider_instance,
limit=limit,
)
for provider_instance in self.get_unique_providers()
async def search_provider(
self,
search_query: str,
+ provider_instance_id_or_domain: str,
media_types: list[MediaType] = MediaType.ALL,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
limit: int = 10,
) -> SearchResults:
"""Perform search on given provider.
:param search_query: Search query
- :param provider_domain: domain of the provider to perform the search on.
+ :param provider_instance_id_or_domain: instance_id or domain of the provider
+ to perform the search on.
:param provider_instance: instance id of the provider to perform the search on.
:param media_types: A list of media_types to include. All types if None.
:param limit: number of items to return in the search (per type).
"""
- assert provider_domain or provider_instance, "Provider needs to be supplied"
- prov = self.mass.get_provider(provider_instance or provider_domain)
+ prov = self.mass.get_provider(provider_instance_id_or_domain)
if not prov:
return SearchResults()
if ProviderFeature.SEARCH not in prov.supported_features:
self, uri: str, force_refresh: bool = False, lazy: bool = True
) -> MediaItemType:
"""Fetch MediaItem by uri."""
- media_type, provider_domain_or_instance_id, item_id = parse_uri(uri)
- for prov in self.providers:
- if prov.instance_id == provider_domain_or_instance_id:
- provider_instance = prov.instance_id
- provider_domain = prov.domain
- break
- else:
- provider_instance = None
- provider_domain = provider_domain_or_instance_id
-
+ media_type, provider_instance_id_or_domain, item_id = parse_uri(uri)
return await self.get_item(
media_type=media_type,
item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
force_refresh=force_refresh,
lazy=lazy,
)
self,
media_type: MediaType,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
force_refresh: bool = False,
lazy: bool = True,
add_to_db: bool = False,
) -> MediaItemType:
"""Get single music item by id and media type."""
- assert (
- provider_domain or provider_instance
- ), "provider_domain or provider_instance must be supplied"
- if "url" in (provider_domain, provider_instance):
+ if provider_instance_id_or_domain == "url":
# handle special case of 'URL' MusicProvider which allows us to play regular url's
return await self.mass.get_provider("url").parse_item(item_id)
ctrl = self.get_controller(media_type)
return await ctrl.get(
item_id=item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
force_refresh=force_refresh,
lazy=lazy,
add_to_db=add_to_db,
self,
media_type: MediaType,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> None:
"""Add an item to the library."""
# make sure we have a full db item
full_item = await self.get_item(
media_type,
item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
lazy=False,
add_to_db=True,
)
ctrl = self.get_controller(media_type)
await ctrl.add_to_library(
full_item.item_id,
- provider_domain=full_item.provider,
+ full_item.provider,
)
@api_command("music/library/add_items")
self.add_to_library(
media_type=item.media_type,
item_id=item.item_id,
- provider_instance=item.provider,
+ provider_instance_id_or_domain=item.provider,
)
)
)
self,
media_type: MediaType,
item_id: str,
- provider_domain: str | None = None,
- provider_instance: str | None = None,
+ provider_instance_id_or_domain: str,
) -> None:
"""Remove item from the library."""
ctrl = self.get_controller(media_type)
await ctrl.remove_from_library(
item_id,
- provider_domain=provider_domain,
- provider_instance=provider_instance,
+ provider_instance_id_or_domain,
)
@api_command("music/library/remove_items")
self.remove_from_library(
media_type=item.media_type,
item_id=item.item_id,
- provider_domain=item.provider,
- provider_instance=item.provider,
+ provider_instance_id_or_domain=item.provider,
)
)
)
await asyncio.gather(*tasks)
- @api_command("music/delete_db_item")
- async def delete_db_item(
+ @api_command("music/delete")
+ async def delete(
self, media_type: MediaType, db_item_id: str | int, recursive: bool = False
) -> None:
"""Remove item from the database."""
ctrl = self.get_controller(media_type)
- await ctrl.delete_db_item(db_item_id, recursive)
+ await ctrl.delete(db_item_id, recursive)
async def refresh_items(self, items: list[MediaItemType]) -> None:
"""Refresh MediaItems to force retrieval of full info and matches.
return await self.get_item(
media_item.media_type,
media_item.item_id,
- provider_instance=media_item.provider,
+ media_item.provider,
force_refresh=True,
lazy=False,
add_to_db=True,
await self.get_item(item.media_type, item.item_id, item.provider, lazy=False)
return None
- async def set_track_loudness(self, item_id: str, provider_domain: str, loudness: int):
+ async def set_track_loudness(
+ self, item_id: str, provider_instance_id_or_domain: str, loudness: int
+ ):
"""List integrated loudness for a track in db."""
await self.database.insert(
DB_TABLE_TRACK_LOUDNESS,
- {"item_id": item_id, "provider": provider_domain, "loudness": loudness},
+ {"item_id": item_id, "provider": provider_instance_id_or_domain, "loudness": loudness},
allow_replace=True,
)
- async def get_track_loudness(self, item_id: str, provider_domain: str) -> float | None:
+ async def get_track_loudness(
+ self, item_id: str, provider_instance_id_or_domain: str
+ ) -> float | None:
"""Get integrated loudness for a track in db."""
if result := await self.database.get_row(
DB_TABLE_TRACK_LOUDNESS,
{
"item_id": item_id,
- "provider": provider_domain,
+ "provider": provider_instance_id_or_domain,
},
):
return result["loudness"]
return None
- async def get_provider_loudness(self, provider_domain: str) -> float | None:
+ async def get_provider_loudness(self, provider_instance_id_or_domain: str) -> float | None:
"""Get average integrated loudness for tracks of given provider."""
all_items = []
- if provider_domain == "url":
+ if provider_instance_id_or_domain == "url":
# this is not a very good idea for random urls
return None
for db_row in await self.database.get_rows(
DB_TABLE_TRACK_LOUDNESS,
{
- "provider": provider_domain,
+ "provider": provider_instance_id_or_domain,
},
):
all_items.append(db_row["loudness"])
return statistics.fmean(all_items)
return None
- async def mark_item_played(self, item_id: str, provider_domain: str):
+ async def mark_item_played(self, item_id: str, provider_instance_id_or_domain: str):
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
await self.database.insert(
DB_TABLE_PLAYLOG,
{
"item_id": item_id,
- "provider": provider_domain,
+ "provider": provider_instance_id_or_domain,
"timestamp": timestamp,
},
allow_replace=True,
self.add_to_library(
media_item.media_type,
media_item.item_id,
- provider_instance=media_item.provider,
+ media_item.provider,
)
)
self.remove_from_library(
media_item.media_type,
media_item.item_id,
- provider_instance=media_item.provider,
+ media_item.provider,
)
)
self.mass.music.albums,
self.mass.music.artists,
):
- prov_items = await ctrl.get_db_items_by_prov_id(provider_instance=provider_instance)
+ prov_items = await ctrl.get_db_items_by_prov_id(provider_instance)
for item in prov_items:
await ctrl.remove_prov_mapping(item.item_id, provider_instance)
if radio_mode:
queue.radio_source.append(media_item)
elif media_item.media_type == MediaType.PLAYLIST:
- async for playlist_track in ctrl.tracks(
- media_item.item_id, provider_domain=media_item.provider
- ):
+ async for playlist_track in ctrl.tracks(media_item.item_id, media_item.provider):
tracks.append(playlist_track)
elif media_item.media_type in (
MediaType.ARTIST,
MediaType.ALBUM,
):
- tracks += await ctrl.tracks(media_item.item_id, provider_domain=media_item.provider)
+ tracks += await ctrl.tracks(media_item.item_id, media_item.provider)
else:
# single track or radio item
tracks += [media_item]
# shuffle the source items, just in case
for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
ctrl = self.mass.music.get_controller(radio_item.media_type)
- tracks += await ctrl.dynamic_tracks(
- item_id=radio_item.item_id, provider_domain=radio_item.provider
- )
+ tracks += await ctrl.dynamic_tracks(radio_item.item_id, radio_item.provider)
# make sure we do not grab too much items
if len(tracks) >= 50:
break
url = f"{self.mass.webserver.base_url}/stream/{player_id}/{queue_item.queue_item_id}/{stream_job.stream_id}.{fmt}" # noqa: E501
return url
- def get_preview_url(self, provider_domain_or_instance_id: str, track_id: str) -> str:
+ def get_preview_url(self, provider_instance_id_or_domain: str, track_id: str) -> str:
"""Return url to short preview sample."""
enc_track_id = urllib.parse.quote(track_id)
return (
f"{self.mass.webserver.base_url}/stream/preview?"
- f"provider={provider_domain_or_instance_id}&item_id={enc_track_id}"
+ f"provider={provider_instance_id_or_domain}&item_id={enc_track_id}"
)
async def serve_queue_stream(self, request: web.Request) -> web.Response:
async def serve_preview(self, request: web.Request):
"""Serve short preview sample."""
- provider_domain_or_instance_id = request.query["provider"]
+ provider_instance_id_or_domain = request.query["provider"]
item_id = urllib.parse.unquote(request.query["item_id"])
resp = web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "audio/mp3"})
await resp.prepare(request)
- async for chunk in get_preview_stream(self.mass, provider_domain_or_instance_id, item_id):
+ async for chunk in get_preview_stream(self.mass, provider_instance_id_or_domain, item_id):
await resp.write(chunk)
return resp
async def get_preview_stream(
mass: MusicAssistant,
- provider_domain_or_instance_id: str,
+ provider_instance_id_or_domain: str,
track_id: str,
) -> AsyncGenerator[bytes, None]:
"""Create a 30 seconds preview audioclip for the given streamdetails."""
- music_prov = mass.get_provider(provider_domain_or_instance_id)
+ music_prov = mass.get_provider(provider_instance_id_or_domain)
streamdetails = await music_prov.get_stream_details(track_id)
# compare album artist
# Note: Not present on ItemMapping
if (
- hasattr(left_album, "artist")
- and hasattr(right_album, "artist")
- and not compare_artist(left_album.artist, right_album.artist)
+ isinstance(left_album, Album)
+ and isinstance(right_album, Album)
+ and not compare_artists(left_album.artists, right_album.artists, True)
):
return False
return left_album.sort_name == right_album.sort_name
"</DIDL-Lite>"
)
title = _escape_str(queue_item.media_item.name)
- if queue_item.media_item.artist and queue_item.media_item.artist.name:
- artist = _escape_str(queue_item.media_item.artist.name)
+ if queue_item.media_item.artists and queue_item.media_item.artists[0].name:
+ artist = _escape_str(queue_item.media_item.artists[0].name)
else:
artist = ""
if queue_item.media_item.album and queue_item.media_item.album.name:
async def sync_library(self, media_types: tuple[MediaType, ...] | None = None) -> None:
"""Run library sync for this provider."""
- # this reference implementation can be overridden with provider specific approach
- # this logic is aimed at streaming/online providers,
- # which all have more or less the same structure.
- # filesystem implementation(s) just override this.
- if media_types is None:
- media_types = tuple(x for x in MediaType)
+ # this reference implementation can be overridden
+ # with a provider specific approach if needed
+ media_types = tuple(x for x in MediaType)
for media_type in media_types:
if not self.library_supported(media_type):
continue
cur_db_ids = set()
async for prov_item in self._get_library_gen(media_type):
db_item: MediaItemType = await controller.get_db_item_by_prov_id(
- item_id=prov_item.item_id,
- provider_domain=prov_item.provider,
+ prov_item.item_id,
+ prov_item.provider,
)
- if not db_item:
- # dump the item in the db, rich metadata is lazy loaded later
- db_item = await controller.get(prov_item)
+ if not db_item: # noqa: SIM114
+ # create full db item
+ db_item = await controller.add(prov_item, skip_metadata_lookup=True)
elif (
db_item.metadata.checksum and prov_item.metadata.checksum
) and db_item.metadata.checksum != prov_item.metadata.checksum:
# item checksum changed
- db_item = await controller.update_db_item(db_item.item_id, prov_item)
- # add album tracks to the db too
- if prov_item.media_type == MediaType.ALBUM:
- prov_item: Album # noqa: PLW2901
- for track in controller.tracks(prov_item.item_id, prov_item.provider):
- track: Track # noqa: PLW2901
- track.album = db_item
- await self.mass.music.tracks.get(
- track.item_id,
- provider_instance=self.instance_id,
- lazy=False,
- details=track,
- add_to_db=True,
- )
- # preload playlist tracks listing, do not load them in the db
- # because that would make the sync very slow and has not much benefit
- if prov_item.media_type == MediaType.PLAYLIST:
- async for track in controller.tracks(
- prov_item.item_id, provider_instance=self.instance_id
- ):
- pass
+ db_item = await controller.add(prov_item, skip_metadata_lookup=True)
cur_db_ids.add(db_item.item_id)
if not db_item.in_library:
await controller.set_db_library(db_item.item_id, True)
if queue_item.media_item.album
else "",
"songName": queue_item.media_item.name,
- "artist": queue_item.media_item.artist.name if queue_item.media_item.artist else "",
+ "artist": queue_item.media_item.artists[0].name
+ if queue_item.media_item.artists
+ else "",
"title": queue_item.name,
"images": [{"url": queue_item.image_url}] if queue_item.image_url else None,
}
if item.ext in TRACK_EXTENSIONS:
if db_item := await self.mass.music.tracks.get_db_item_by_prov_id(
- item.path, provider_instance=self.instance_id
+ item.path, self.instance_id
):
subitems.append(db_item)
elif track := await self.get_track(item.path):
# make sure that the item exists
# https://github.com/music-assistant/hass-music-assistant/issues/707
- db_item = await self.mass.music.tracks.add_db_item(track)
+ db_item = await self.mass.music.tracks.add(track, skip_metadata_lookup=True)
subitems.append(db_item)
continue
if item.ext in PLAYLIST_EXTENSIONS:
if db_item := await self.mass.music.playlists.get_db_item_by_prov_id(
- item.path, provider_instance=self.instance_id
+ item.path, self.instance_id
):
subitems.append(db_item)
elif playlist := await self.get_playlist(item.path):
# make sure that the item exists
# https://github.com/music-assistant/hass-music-assistant/issues/707
- db_item = await self.mass.music.playlists.add_db_item(playlist)
+ db_item = await self.mass.music.playlists.add(
+ playlist, skip_metadata_lookup=True
+ )
subitems.append(db_item)
continue
if item.ext in TRACK_EXTENSIONS:
# add/update track to db
track = await self._parse_track(item)
- await self.mass.music.tracks.add_db_item(track)
+ await self.mass.music.tracks.add(track, skip_metadata_lookup=True)
elif item.ext in PLAYLIST_EXTENSIONS:
playlist = await self.get_playlist(item.path)
# add/update] playlist to db
playlist.metadata.checksum = item.checksum
# playlist is always in-library
playlist.in_library = True
- await self.mass.music.playlists.add_db_item(playlist)
+ await self.mass.music.playlists.add(playlist, skip_metadata_lookup=True)
except Exception as err: # pylint: disable=broad-except
# we don't want the whole sync to crash on one file so we catch all exceptions here
self.logger.exception("Error processing %s - %s", item.path, str(err))
else:
controller = self.mass.music.get_controller(MediaType.TRACK)
- if db_item := await controller.get_db_item_by_prov_id(
- file_path, provider_instance=self.instance_id
- ):
- await controller.delete_db_item(db_item.item_id, True)
+ if db_item := await controller.get_db_item_by_prov_id(file_path, self.instance_id):
+ await controller.delete(db_item.item_id, True)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
db_artist = await self.mass.music.artists.get_db_item_by_prov_id(
- item_id=prov_artist_id, provider_instance=self.instance_id
+ prov_artist_id, self.instance_id
)
if db_artist is None:
raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
"""Get album tracks for given album id."""
# filesystem items are always stored in db so we can query the database
db_album = await self.mass.music.albums.get_db_item_by_prov_id(
- prov_album_id, provider_instance=self.instance_id
+ prov_album_id, self.instance_id
)
if db_album is None:
raise MediaNotFoundError(f"Album not found: {prov_album_id}")
filename = f"{name}.m3u"
await self.write_file_content(filename, b"")
playlist = await self.get_playlist(filename)
- db_playlist = await self.mass.music.playlists.add_db_item(playlist)
+ db_playlist = await self.mass.music.playlists.add(playlist, skip_metadata_lookup=True)
return db_playlist
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- db_item = await self.mass.music.tracks.get_db_item_by_prov_id(
- item_id=item_id, provider_instance=self.instance_id
- )
+ db_item = await self.mass.music.tracks.get_db_item_by_prov_id(item_id, self.instance_id)
if db_item is None:
raise MediaNotFoundError(f"Item not found: {item_id}")
if musicbrainz_id := info.get("musicbrainzreleasegroupid"):
album.musicbrainz_id = musicbrainz_id
if mb_artist_id := info.get("musicbrainzalbumartistid"): # noqa: SIM102
- if album.artist and not album.artist.musicbrainz_id:
- album.artist.musicbrainz_id = mb_artist_id
+ if album.artists and not album.artists[0].musicbrainz_id:
+ album.artists[0].musicbrainz_id = mb_artist_id
if description := info.get("review"):
album.metadata.description = description
if year := info.get("year"):
def playlist_item_from_mass(queue_item: QueueItem, index: int = 0) -> PlaylistItem:
"""Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
if queue_item.media_item and queue_item.media_type == MediaType.TRACK:
- artist = queue_item.media_item.artist.name if queue_item.media_item.artist else ""
+ artist = queue_item.media_item.artists[0].name if queue_item.media_item.artists else ""
album = queue_item.media_item.album.name if queue_item.media_item.album else ""
title = queue_item.media_item.name
elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
"""Plex musicprovider support for MusicAssistant."""
+import logging
from asyncio import TaskGroup
from collections.abc import AsyncGenerator, Callable, Coroutine
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
-from music_assistant.common.helpers.uri import create_uri
-from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.config_entries import ConfigEntry, ProviderConfig
from music_assistant.common.models.enums import (
ConfigEntryType,
async def handle_setup(self) -> None:
"""Set up the music provider by connecting to the server."""
+ # silence urllib logger
+ logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
def connect():
plex_account = MyPlexAccount(token=self.config.get_value(CONF_AUTH_TOKEN))
key,
self.instance_id,
name,
- create_uri(media_type, self.instance_id, key),
- create_sort_name(self.name),
)
async def _parse(self, plex_media) -> MediaItem | None:
if plex_album.summary:
album.metadata.description = plex_album.summary
- album.artist = self._get_item_mapping(
- MediaType.ARTIST, plex_album.parentKey, plex_album.parentTitle
+ album.artists.append(
+ self._get_item_mapping(MediaType.ARTIST, plex_album.parentKey, plex_album.parentTitle)
)
album.add_provider_mapping(
track = Track(item_id=plex_track.key, provider=self.instance_id, name=plex_track.title)
if plex_track.grandparentKey:
- track.artist = self._get_item_mapping(
- MediaType.ARTIST, plex_track.grandparentKey, plex_track.grandparentTitle
+ track.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST, plex_track.grandparentKey, plex_track.grandparentTitle
+ )
)
if thumb := plex_track.firstAttr("thumb", "parentThumb", "grandparentThumb"):
track.metadata.images = [MediaItemImage(ImageType.THUMB, thumb, self.instance_id)]
)
)
- album.artist = await self._parse_artist(artist_obj or album_obj["artist"])
+ album.artists.append(await self._parse_artist(artist_obj or album_obj["artist"]))
if (
album_obj.get("product_type", "") == "single"
or album_obj.get("release_type", "") == "single"
):
album.album_type = AlbumType.SINGLE
- elif album_obj.get("product_type", "") == "compilation" or "Various" in album.artist.name:
+ elif (
+ album_obj.get("product_type", "") == "compilation" or "Various" in album.artists[0].name
+ ):
album.album_type = AlbumType.COMPILATION
elif (
album_obj.get("product_type", "") == "album"
album.metadata.copyright = album_obj["copyright"]
if album_obj.get("description"):
album.metadata.description = album_obj["description"]
+ if album_obj.get("parental_warning"):
+ album.metadata.explicit = True
return album
async def _parse_track(self, track_obj: dict):
result = await self._get_data("album-mb.php", i=album.musicbrainz_id)
if result and result.get("album"):
adb_album = result["album"][0]
- elif album.artist:
+ elif album.artists:
# lookup by name
- result = await self._get_data("searchalbum.php", s=album.artist.name, a=album.name)
+ artist = album.artists[0]
+ result = await self._get_data("searchalbum.php", s=artist.name, a=album.name)
if result and result.get("album"):
for item in result["album"]:
- assert isinstance(album.artist, Artist)
- if album.artist.musicbrainz_id:
- if album.artist.musicbrainz_id != item["strMusicBrainzArtistID"]:
+ assert isinstance(artist, Artist)
+ if artist.musicbrainz_id:
+ if artist.musicbrainz_id != item["strMusicBrainzArtistID"]:
continue
- elif not compare_strings(album.artist.name, item["strArtistStripped"]):
+ elif not compare_strings(artist.name, item["strArtistStripped"]):
continue
if compare_strings(album.name, item["strAlbumStripped"]):
adb_album = item
album.year = int(adb_album.get("intYearReleased", "0"))
if not album.musicbrainz_id:
album.musicbrainz_id = adb_album["strMusicBrainzID"]
- assert isinstance(album.artist, Artist)
- if album.artist and not album.artist.musicbrainz_id:
- album.artist.musicbrainz_id = adb_album["strMusicBrainzArtistID"]
+ assert isinstance(album.artists[0], Artist)
+ if album.artists and not album.artists[0].musicbrainz_id:
+ album.artists[0].musicbrainz_id = adb_album["strMusicBrainzArtistID"]
if album.album_type == AlbumType.UNKNOWN:
album.album_type = ALBUMTYPE_MAPPING.get(
adb_album.get("strReleaseFormat"), AlbumType.UNKNOWN
# found match - update album metadata too while we're here
if not ref_album.musicbrainz_id:
ref_album.metadata = self.__parse_album(item)
- await self.mass.music.albums.add_db_item(ref_album)
+ await self.mass.music.albums.add(ref_album, skip_metadata_lookup=True)
musicbrainz_id = item["strMusicBrainzArtistID"]
return musicbrainz_id