async def get_artist_metadata(self, artist: Artist) -> None:
"""Get/update rich metadata for an artist."""
+ # set timestamp, used to determine when this function was last called
+ artist.metadata.last_refresh = int(time())
+
if not artist.musicbrainz_id:
artist.musicbrainz_id = await self.get_artist_musicbrainz_id(artist)
if metadata := await self.audiodb.get_artist_metadata(artist):
artist.metadata.update(metadata)
- artist.metadata.last_refresh = int(time())
-
async def get_album_metadata(self, album: Album) -> None:
"""Get/update rich metadata for an album."""
+ # set timestamp, used to determine when this function was last called
+ album.metadata.last_refresh = int(time())
+
if not (album.musicbrainz_id or album.artist):
return
if metadata := await self.audiodb.get_album_metadata(album):
if metadata := await self.fanarttv.get_album_metadata(album):
album.metadata.update(metadata)
- album.metadata.last_refresh = int(time())
-
async def get_track_metadata(self, track: Track) -> None:
"""Get/update rich metadata for a track."""
+ # set timestamp, used to determine when this function was last called
+ track.metadata.last_refresh = int(time())
+
if not (track.album and track.artists):
return
if metadata := await self.audiodb.get_track_metadata(track):
track.metadata.update(metadata)
- track.metadata.last_refresh = int(time())
-
async def get_playlist_metadata(self, playlist: Playlist) -> None:
"""Get/update rich metadata for a playlist."""
+ # set timestamp, used to determine when this function was last called
+ playlist.metadata.last_refresh = int(time())
# retrieve genres from tracks
# TODO: retrieve style/mood ?
playlist.metadata.genres = set()
elif track.album and track.album.metadata.genres:
playlist.metadata.genres.update(track.album.metadata.genres)
# TODO: create mosaic thumb/fanart from playlist tracks
- playlist.metadata.last_refresh = int(time())
async def get_radio_metadata(self, radio: Radio) -> None:
"""Get/update rich metadata for a radio station."""
provider_id: Optional[str] = None,
) -> List[Album]:
"""Return all versions of an album we can find on all providers."""
+ assert provider or provider_id, "Provider type or ID must be specified"
album = await self.get(item_id, provider, provider_id)
+ # perform a search on all provider(types) to collect all versions/variants
prov_types = {item.type for item in self.mass.music.providers}
- return [
- prov_item
+ search_query = f"{album.artist.name} - {album.name}"
+ all_versions = {
+ prov_item.item_id: prov_item
for prov_items in await asyncio.gather(
- *[self.search(album.name, prov_type) for prov_type in prov_types]
+ *[self.search(search_query, prov_type) for prov_type in prov_types]
)
for prov_item in prov_items
- if prov_item.sort_name == album.sort_name
+ if (
+ (prov_item.sort_name in album.sort_name)
+ or (album.sort_name in prov_item.sort_name)
+ )
and compare_artist(prov_item.artist, album.artist)
- ]
+ }
+ # make sure that the 'base' version is included
+ for prov_version in album.provider_ids:
+ if prov_version.item_id in all_versions:
+ continue
+ album_copy = Album.from_dict(album.to_dict())
+ album_copy.item_id = prov_version.item_id
+ album_copy.provider = prov_version.prov_type
+ album_copy.provider_ids = {prov_version}
+ all_versions[prov_version.item_id] = album_copy
+
+ # return the aggregated result
+ return all_versions.values()
async def add(self, item: Album, overwrite_existing: bool = False) -> Album:
"""Add album to local db and return the database item."""
# also fetch same album on all providers
await self._match(db_item)
db_item = await self.get_db_item(db_item.item_id)
+ # add the album's tracks to the db
+ for prov in item.provider_ids:
+ for track in await self._get_provider_album_tracks(
+ prov.item_id, prov.prov_type, prov.prov_id
+ ):
+ await self.mass.music.tracks.add_db_item(track)
return db_item
async def _get_provider_album_tracks(
prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
- full_album = await self.get(item_id, provider, provider_id)
+ full_album = await self.get_provider_item(item_id, provider_id or provider)
# prefer cache items (if any)
cache_key = f"{prov.type.value}.albumtracks.{item_id}"
cache_checksum = full_album.metadata.checksum
item_id: str,
) -> List[Track]:
"""Return in-database album tracks for the given database album."""
- album_tracks = []
db_album = await self.get_db_item(item_id)
- # combine the info we have in the db with the full listing from a streaming provider
- for prov in db_album.provider_ids:
- for prov_track in await self._get_provider_album_tracks(
- prov.item_id, prov.prov_type, prov.prov_id
+ # simply grab all tracks in the db that are linked to this album
+ # TODO: adjust to json query instead of text search?
+ query = f"SELECT * FROM tracks WHERE albums LIKE '%\"{item_id}\"%'"
+ result = []
+ for track in await self.mass.music.tracks.get_db_items_by_query(query):
+ if album_mapping := next(
+ (x for x in track.albums if x.item_id == db_album.item_id), None
):
- if db_track := await self.mass.music.tracks.get_db_item_by_prov_id(
- prov_track.item_id, prov_track.provider
- ):
- if album_mapping := next(
- (x for x in db_track.albums if x.item_id == db_album.item_id),
- None,
- ):
- db_track.disc_number = album_mapping.disc_number
- db_track.track_number = album_mapping.track_number
- prov_track = db_track
- # make sure that the (db) album is stored on the tracks
- prov_track.album = db_album
- prov_track.metadata.images = db_album.metadata.images
- album_tracks.append(prov_track)
- # once we have the details from one streaming provider,
- # there is no need to iterate them all (if there are multiple)
- # for the same album
- if not prov.prov_type.is_file():
- break
-
- return album_tracks
+ # make sure that the full album is set on the track and prefer the album's images
+ track.album = db_album
+ if db_album.metadata.images:
+ track.metadata.images = db_album.metadata.images
+ # apply the disc and track number from the mapping
+ track.disc_number = album_mapping.disc_number
+ track.track_number = album_mapping.track_number
+ result.append(track)
+ return sorted(result, key=lambda x: (x.disc_number or 0, x.track_number or 0))
async def add_db_item(self, item: Album, overwrite_existing: bool = False) -> Album:
"""Add a new record to the database."""
- assert isinstance(item, Album), "Not a full Album object"
assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
assert item.artist, f"Album {item.name} is missing artist"
async with self._db_add_lock:
self, artist: Union[Artist, ItemMapping], overwrite: bool = False
) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
-
- if artist.provider == ProviderType.DATABASE:
- if isinstance(artist, ItemMapping):
- return artist
- return ItemMapping.from_item(artist)
-
if overwrite:
artist = await self.mass.music.artists.add_db_item(
artist, overwrite_existing=True
)
+ if artist.provider == ProviderType.DATABASE:
+ if isinstance(artist, ItemMapping):
+ return artist
+ return ItemMapping.from_item(artist)
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
artist.item_id, provider=artist.provider
"""Manage MediaItems of type Radio."""
from __future__ import annotations
+import asyncio
from time import time
+from typing import List, Optional
from music_assistant.helpers.database import TABLE_RADIOS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import EventType, MediaType
+from music_assistant.models.enums import EventType, MediaType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import Radio
"""Get in-library radio by name."""
return await self.mass.database.get_row(self.db_table, {"name": name})
+ async def versions(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Radio]:
+ """Return all versions of a radio station we can find on all providers."""
+ assert provider or provider_id, "Provider type or ID must be specified"
+ radio = await self.get(item_id, provider, provider_id)
+ # perform a search on all provider(types) to collect all versions/variants
+ prov_types = {item.type for item in self.mass.music.providers}
+ all_versions = {
+ prov_item.item_id: prov_item
+ for prov_items in await asyncio.gather(
+ *[self.search(radio.name, prov_type) for prov_type in prov_types]
+ )
+ for prov_item in prov_items
+ if (
+ (prov_item.name in radio.name)
+ or (radio.name in prov_item.name)
+ or (prov_item.sort_name in radio.sort_name)
+ or (radio.sort_name in prov_item.sort_name)
+ )
+ }
+ # make sure that the 'base' version is included
+ for prov_version in radio.provider_ids:
+ if prov_version.item_id in all_versions:
+ continue
+ radio_copy = Radio.from_dict(radio.to_dict())
+ radio_copy.item_id = prov_version.item_id
+ radio_copy.provider = prov_version.prov_type
+ radio_copy.provider_ids = {prov_version}
+ all_versions[prov_version.item_id] = radio_copy
+
+ # return the aggregated result
+ return all_versions.values()
+
async def add(self, item: Radio, overwrite_existing: bool = False) -> Radio:
"""Add radio to local db and return the new database item."""
item.metadata.last_refresh = int(time())
provider_id: Optional[str] = None,
) -> List[Track]:
"""Return all versions of a track we can find on all providers."""
+ assert provider or provider_id, "Provider type or ID must be specified"
track = await self.get(item_id, provider, provider_id)
+ # perform a search on all provider(types) to collect all versions/variants
prov_types = {item.type for item in self.mass.music.providers}
- return [
- prov_item
+ search_query = f"{track.artist.name} - {track.name}"
+ all_versions = {
+ prov_item.item_id: prov_item
for prov_items in await asyncio.gather(
- *[self.search(track.name, prov_type) for prov_type in prov_types]
+ *[self.search(search_query, prov_type) for prov_type in prov_types]
)
for prov_item in prov_items
- if compare_artists(prov_item.artists, track.artists)
- ]
+ if (
+ (prov_item.sort_name in track.sort_name)
+ or (track.sort_name in prov_item.sort_name)
+ )
+ and compare_artists(prov_item.artists, track.artists, any_match=True)
+ }
+ # make sure that the 'base' version is included
+ for prov_version in track.provider_ids:
+ if prov_version.item_id in all_versions:
+ continue
+ track_copy = Track.from_dict(track.to_dict())
+ track_copy.item_id = prov_version.item_id
+ track_copy.provider = prov_version.prov_type
+ track_copy.provider_ids = {prov_version}
+ all_versions[prov_version.item_id] = track_copy
+
+ # return the aggregated result
+ return all_versions.values()
async def _match(self, db_track: Track) -> None:
"""
def compare_artists(
left_artists: List[Union[Artist, ItemMapping]],
right_artists: List[Union[Artist, ItemMapping]],
+ any_match: bool = False,
) -> bool:
"""Compare two lists of artist and return True if both lists match (exactly)."""
matches = 0
for left_artist in left_artists:
for right_artist in right_artists:
if compare_artist(left_artist, right_artist):
+ if any_match:
+ return True
matches += 1
return len(left_artists) == matches
prov = self.mass.music.get_provider(provider_id or provider)
if not prov or MusicProviderFeature.SEARCH not in prov.supported_features:
return []
+ if not prov.library_supported(self.media_type):
+ # assume library supported also means that this mediatype is supported
+ return []
# prefer cache items (if any)
cache_key = (
await self.set_db_library(prov_item.item_id, False)
async def get_provider_id(self, item: ItemCls) -> Tuple[str, str]:
- """Return provider and item id."""
+ """Return (first) provider and item id."""
if item.provider == ProviderType.DATABASE:
# make sure we have a full object
item = await self.get_db_item(item.item_id)
match = {"item_id": int(item_id)}
if db_row := await self.mass.database.get_row(self.db_table, match):
return self.item_cls.from_db_row(db_row)
- return None
+ raise MediaNotFoundError(f"Album not found in database: {item_id}")
async def get_db_item_by_prov_id(
self,
return tuple()
return tuple(self.isrc.split(";"))
+ @property
+ def artist(self) -> Artist | ItemMapping | None:
+ """Return (first) artist of track."""
+ if self.artists:
+ return self.artists[0]
+ return None
+
+ @artist.setter
+ def artist(self, artist: Union[Artist, ItemMapping]) -> None:
+ """Set (first/only) artist of track."""
+ self.artists = [artist]
+
@dataclass
class Playlist(MediaItem):
import pytube
import ytmusicapi
-from music_assistant.models.enums import MusicProviderFeature, ProviderType
+from music_assistant.models.enums import (
+ MediaQuality,
+ MusicProviderFeature,
+ ProviderType,
+)
from music_assistant.models.errors import (
InvalidDataError,
LoginFailed,
prov_type=self.type,
prov_id=self.id,
available=available,
+ quality=MediaQuality.LOSSY_M4A,
)
)
return track