# merge duplicates using a dict
final_items: Dict[str, Track] = {}
for track in tracks:
- key = f".{track.name}.{track.version}"
- if track.disc_number and track.track_number:
- key += f".{track.disc_number}.{track.track_number}"
-
+ key = f".{track.name}.{track.version}.{track.disc_number}.{track.track_number}"
if key in final_items:
final_items[key].provider_ids.update(track.provider_ids)
else:
track.album = album
final_items[key] = track
+ if album.in_library:
+ final_items[key].in_library = True
return list(final_items.values())
async def versions(
import asyncio
import itertools
-from typing import List, Optional
+from typing import Dict, List, Optional
from databases import Database as Db
self.get_provider_artist_toptracks(item.item_id, item.prov_id)
for item in artist.provider_ids
]
- # use intermediate set to remove (some) duplicates
- return list(set(itertools.chain.from_iterable(await asyncio.gather(*coros))))
+ tracks = itertools.chain.from_iterable(await asyncio.gather(*coros))
+ # merge duplicates using a dict
+ final_items: Dict[str, Track] = {}
+ for track in tracks:
+ key = f".{track.name}.{track.version}"
+ if key in final_items:
+ final_items[key].provider_ids.update(track.provider_ids)
+ else:
+ final_items[key] = track
+ return list(final_items.values())
async def albums(
self,
self.get_provider_artist_albums(item.item_id, item.prov_id)
for item in artist.provider_ids
]
- # use intermediate set to remove (some) duplicates
- return list(set(itertools.chain.from_iterable(await asyncio.gather(*coros))))
+ albums = itertools.chain.from_iterable(await asyncio.gather(*coros))
+ # merge duplicates using a dict
+ final_items: Dict[str, Album] = {}
+ for album in albums:
+ key = f".{album.name}.{album.version}"
+ if key in final_items:
+ final_items[key].provider_ids.update(album.provider_ids)
+ else:
+ final_items[key] = album
+ if album.in_library:
+ final_items[key].in_library = True
+ return list(final_items.values())
async def add(self, item: Artist) -> Artist:
"""Add artist to local db and return the database item."""
result = []
for track in await self.mass.music.tracks.get_db_items(query):
track.album = db_album
+ album_mapping = next(
+ (x for x in track.albums if x.item_id == db_album.item_id), None
+ )
+ track.disc_number = album_mapping.disc_number
+ track.track_number = album_mapping.track_number
result.append(track)
return result
from music_assistant.models.enums import EventType, MediaType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
-from music_assistant.models.media_items import Album, Artist, ItemMapping, Track
+from music_assistant.models.media_items import (
+ Album,
+ Artist,
+ ItemMapping,
+ Track,
+ TrackAlbumMapping,
+)
class TracksController(MediaControllerBase[Track]):
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
"isrc": track.isrc or cur_item.isrc,
- "disc_number": track.disc_number or cur_item.disc_number,
- "track_number": track.track_number or cur_item.track_number,
},
db=db,
)
base_track: Track,
upd_track: Optional[Track] = None,
db: Optional[Db] = None,
- ) -> List[ItemMapping]:
- """Extract all (unique) artists of track as ItemMapping."""
- track_albums = []
+ ) -> List[TrackAlbumMapping]:
+ """Extract all (unique) albums of track as TrackAlbumMapping."""
+ track_albums: List[TrackAlbumMapping] = []
+ # existing TrackAlbumMappings are starting point
+ if upd_track and upd_track.albums:
+ track_albums = upd_track.albums
+ elif base_track.albums:
+ track_albums = base_track.albums
+ # append update item album if needed
if upd_track and upd_track.album:
- track_albums.append(upd_track.album)
- if base_track.album and base_track.album not in track_albums:
- track_albums.append(base_track.album)
- for item in base_track.albums:
- if item not in track_albums:
- track_albums.append(item)
- if upd_track:
- for item in upd_track.albums:
- if item not in track_albums:
- track_albums.append(item)
- # use intermediate set to clear out duplicates
- return [await self._get_album_mapping(x, db=db) for x in track_albums]
+ mapping = await self._get_album_mapping(upd_track.album, db=db)
+ mapping = TrackAlbumMapping.from_dict(
+ {
+ **mapping.to_dict(),
+ "disc_number": upd_track.disc_number,
+ "track_number": upd_track.track_number,
+ }
+ )
+ if mapping not in track_albums:
+ track_albums.append(mapping)
+ # append base item album if needed
+ elif base_track and base_track.album:
+ mapping = await self._get_album_mapping(base_track.album, db=db)
+ mapping = TrackAlbumMapping.from_dict(
+ {
+ **mapping.to_dict(),
+ "disc_number": base_track.disc_number,
+ "track_number": base_track.track_number,
+ }
+ )
+ if mapping not in track_albums:
+ track_albums.append(mapping)
+
+ return track_albums
async def _get_album_mapping(
self, album: Union[Album, ItemMapping], db: Optional[Db] = None
from music_assistant.mass import MusicAssistant
-SCHEMA_VERSION = 14
+SCHEMA_VERSION = 15
TABLE_TRACK_LOUDNESS = "track_loudness"
TABLE_PLAYLOG = "playlog"
await self.__create_database_tables(db)
if prev_version < 14:
- # album --> albums on track entity
# no more need for prov_mappings table
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
await db.execute("DROP TABLE IF EXISTS provider_mappings")
+
+ if prev_version < 15:
+ # album --> albums on track entity
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
# recreate missing tables
await self.__create_database_tables(db)
artists json,
albums json,
metadata json,
- disc_number INTEGER NULL,
- track_number INTEGER NULL,
provider_ids json
);"""
)
data BLOB,
UNIQUE(path, size));"""
)
+ # create indexes
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS artists_in_library_idx on artists(in_library);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS albums_in_library_idx on albums(in_library);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS tracks_in_library_idx on tracks(in_library);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS playlists_in_library_idx on playlists(in_library);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS radios_in_library_idx on radios(in_library);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS artists_sort_name_idx on artists(sort_name);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS albums_sort_name_idx on albums(sort_name);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS tracks_sort_name_idx on tracks(sort_name);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS playlists_sort_name_idx on playlists(sort_name);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS radios_sort_name_idx on radios(sort_name);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS artists_musicbrainz_id_idx on artists(musicbrainz_id);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS albums_musicbrainz_id_idx on albums(musicbrainz_id);"
+ )
+ await db.execute(
+ "CREATE INDEX IF NOT EXISTS tracks_musicbrainz_id_idx on tracks(musicbrainz_id);"
+ )
+ await db.execute("CREATE INDEX IF NOT EXISTS tracks_isrc_idx on tracks(isrc);")
+ await db.execute("CREATE INDEX IF NOT EXISTS albums_upc_idx on albums(upc);")
provider_id: Optional[str] = None,
db: Optional[Db] = None,
) -> ItemCls | None:
- """Get the database album for the given prov_id."""
+ """Get the database item for the given prov_id."""
assert provider or provider_id, "provider or provider_id must be supplied"
if provider == ProviderType.DATABASE or provider_id == "database":
return await self.get_db_item(provider_item_id, db=db)
db_row["in_library"] = bool(db_row["in_library"])
if db_row.get("albums"):
db_row["album"] = db_row["albums"][0]
+ db_row["disc_number"] = db_row["albums"][0]["disc_number"]
+ db_row["track_number"] = db_row["albums"][0]["track_number"]
db_row["item_id"] = str(db_row["item_id"])
return cls.from_dict(db_row)
"uri",
"album",
"position",
+ "track_number",
+ "disc_number",
]
}
"""Return timestamp the metadata was last refreshed (0 if full data never retrieved)."""
return self.metadata.last_refresh or 0
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.media_type, self.provider, self.item_id))
+
@dataclass(frozen=True)
class ItemMapping(DataClassDictMixin):
return hash((self.provider, self.item_id))
+@dataclass(frozen=True)
+class TrackAlbumMapping(ItemMapping):
+ """Model for a track that is mapped to an album."""
+
+ disc_number: Optional[int] = None
+ track_number: Optional[int] = None
+
+
@dataclass
class Track(MediaItem):
"""Model for a track."""
artists: List[Union[Artist, ItemMapping]] = field(default_factory=list)
# album track only
album: Union[Album, ItemMapping, None] = None
- albums: List[ItemMapping] = field(default_factory=list)
+ albums: List[TrackAlbumMapping] = field(default_factory=list)
disc_number: Optional[int] = None
track_number: Optional[int] = None
# playlist track only
owner: str = ""
is_editable: bool = False
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.provider, self.item_id))
+
@dataclass
class Radio(MediaItem):
val.pop("duration", None)
return val
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.provider, self.item_id))
+
MediaItemType = Union[Artist, Album, Track, Radio, Playlist]