from music_assistant.common.models.enums import ImageType, MediaType
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
ItemMapping,
MediaItemImage,
item_id: str,
provider_instance_id_or_domain: str,
in_library_only: bool = False,
- ) -> list[AlbumTrack]:
+ ) -> list[Track]:
"""Get tracks for given album."""
return [
- AlbumTrack.from_dict(item)
+ Track.from_dict(item)
for item in await self.client.send_command(
"music/albums/album_tracks",
item_id=item_id,
duration: int = 0
version: str = ""
artists: UniqueList[Artist | ItemMapping] = field(default_factory=UniqueList)
- album: Album | ItemMapping | None = None # optional
- disc_number: int | None = None # required for album tracks
- track_number: int | None = None # required for album tracks
+ album: Album | ItemMapping | None = None # required for album tracks
+ disc_number: int = 0 # required for album tracks
+ track_number: int = 0 # required for album tracks
@property
def has_chapters(self) -> bool:
return "/".join(x.name for x in self.artists)
-@dataclass(kw_only=True)
-class AlbumTrack(Track):
- """
- Model for a track on an album.
-
- Same as regular Track but with explicit and required definitions of
- album, disc_number and track_number
- """
-
- __hash__ = _MediaItemBase.__hash__
- __eq__ = _MediaItemBase.__eq__
-
- album: Album | ItemMapping
- disc_number: int
- track_number: int
-
- @classmethod
- def from_track(
- cls,
- track: Track,
- album: Album | None = None,
- disc_number: int | None = None,
- track_number: int | None = None,
- ) -> AlbumTrack:
- """Cast Track to AlbumTrack."""
- album_track = track.to_dict()
- if album_track["album"] is None:
- if not album:
- raise InvalidDataError("AlbumTrack requires an album")
- album_track["album"] = album.to_dict()
- if album_track["disc_number"] is None:
- if disc_number is None:
- raise InvalidDataError("AlbumTrack requires a disc_number")
- album_track["disc_number"] = disc_number
- if album_track["track_number"] is None:
- if track_number is None:
- raise InvalidDataError("AlbumTrack requires a track_number")
- album_track["track_number"] = track_number
- # let mushmumaro instantiate a new object - this will ensure that valididation takes place
- return AlbumTrack.from_dict(album_track)
-
-
@dataclass(kw_only=True)
class PlaylistTrack(Track):
"""
)
-MediaItemType = (
- Artist | Album | PlaylistTrack | AlbumTrack | Track | Radio | Playlist | BrowseFolder
-)
+MediaItemType = Artist | Album | PlaylistTrack | Track | Radio | Playlist | BrowseFolder
@dataclass(kw_only=True)
import contextlib
from collections.abc import Iterable
from random import choice, random
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING
from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.json import serialize_to_json
)
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
AlbumType,
Artist,
ItemMapping,
item_id: str,
provider_instance_id_or_domain: str,
in_library_only: bool = False,
- ) -> UniqueList[AlbumTrack]:
+ ) -> UniqueList[Track]:
"""Return album tracks for the given provider album id."""
# always check if we have a library item for this album
library_album = await self.get_library_item_by_prov_id(
if not library_album:
return await self._get_provider_album_tracks(item_id, provider_instance_id_or_domain)
db_items = await self.get_library_album_tracks(library_album.item_id)
- result: UniqueList[AlbumTrack] = UniqueList(db_items)
+ result: UniqueList[Track] = UniqueList(db_items)
if in_library_only:
# return in-library items only
return sorted(db_items, key=lambda x: (x.disc_number, x.track_number))
+
# return all (unique) items from all providers
- unique_ids: set[str] = {f"{x.disc_number or 1}.{x.track_number}" for x in db_items}
+ # because we are returning the items from all providers combined,
+ # we need to make sure that we don't return duplicates
+ unique_ids: set[str] = {f"{x.disc_number}.{x.track_number}" for x in db_items}
+ unique_ids.update({f"{x.name.lower()}.{x.version.lower()}" for x in db_items})
for db_item in db_items:
unique_ids.add(x.item_id for x in db_item.provider_mappings)
for provider_mapping in library_album.provider_mappings:
for provider_track in provider_tracks:
if provider_track.item_id in unique_ids:
continue
- unique_id = f"{provider_track.disc_number or 1}.{provider_track.track_number}"
+ unique_id = f"{provider_track.disc_number}.{provider_track.track_number}"
+ if unique_id in unique_ids:
+ continue
+ unique_id = f"{provider_track.name.lower()}.{provider_track.version.lower()}"
if unique_id in unique_ids:
continue
unique_ids.add(unique_id)
- result.append(AlbumTrack.from_track(provider_track, library_album))
+ provider_track.album = library_album
+ result.append(provider_track)
# NOTE: we need to return the results sorted on disc/track here
# to ensure the correct order at playback
return sorted(result, key=lambda x: (x.disc_number, x.track_number))
async def get_library_album_tracks(
self,
item_id: str | int,
- ) -> list[AlbumTrack]:
+ ) -> list[Track]:
"""Return in-database album tracks for the given database album."""
query = f"WHERE {DB_TABLE_ALBUM_TRACKS}.album_id = {item_id}"
- result = await self.mass.music.tracks._get_library_items_by_query(extra_query=query)
- if TYPE_CHECKING:
- return cast(list[AlbumTrack], result)
- return result
+ return await self.mass.music.tracks._get_library_items_by_query(extra_query=query)
async def _add_library_item(self, item: Album) -> int:
"""Add a new record to the database."""
'version', {DB_TABLE_ALBUMS}.version,
'images', json_extract({DB_TABLE_ALBUMS}.metadata, '$.images'),
'media_type', 'album') END as album,
- {DB_TABLE_ALBUM_TRACKS}.disc_number,
- {DB_TABLE_ALBUM_TRACKS}.track_number
+ CASE WHEN {DB_TABLE_ALBUM_TRACKS}.disc_number IS NULL THEN 0 ELSE {DB_TABLE_ALBUM_TRACKS}.disc_number END as disc_number,
+ CASE WHEN {DB_TABLE_ALBUM_TRACKS}.track_number IS NULL THEN 0 ELSE {DB_TABLE_ALBUM_TRACKS}.track_number END as track_number
FROM {self.db_table}
LEFT JOIN {DB_TABLE_ALBUM_TRACKS} on {DB_TABLE_ALBUM_TRACKS}.track_id = {self.db_table}.item_id
LEFT JOIN {DB_TABLE_ALBUMS} on {DB_TABLE_ALBUMS}.item_id = {DB_TABLE_ALBUM_TRACKS}.album_id
await self._set_track_album(
db_id=db_id,
album=item.album,
- disc_number=getattr(item, "disc_number", None) or 0,
- track_number=getattr(item, "track_number", None) or 0,
+ disc_number=getattr(item, "disc_number", 0),
+ track_number=getattr(item, "track_number", 0),
)
self.logger.debug("added %s to database (id: %s)", item.name, db_id)
return db_id
await self._set_track_album(
db_id=db_id,
album=update.album,
- disc_number=getattr(update, "disc_number", None) or 0,
- track_number=getattr(update, "track_number", None) or 1,
+ disc_number=update.disc_number or cur_item.disc_number,
+ track_number=update.track_number or cur_item.track_number,
overwrite=overwrite,
)
self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
PlayerUnavailableError,
QueueEmpty,
)
-from music_assistant.common.models.media_items import AlbumTrack, MediaItemType, media_from_dict
+from music_assistant.common.models.media_items import MediaItemType, media_from_dict
from music_assistant.common.models.player import PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
return []
- async def get_album_tracks(self, album: Album) -> list[AlbumTrack]:
+ async def get_album_tracks(self, album: Album) -> list[Track]:
"""Return tracks for given album, based on user preference."""
album_items_conf = self.mass.config.get_raw_core_config_value(
self.domain,
ExternalID.DISCOGS,
ExternalID.ACOUSTID,
ExternalID.TADB,
- # make sure to check isrc before musicbrainz
+ # make sure to check musicbrainz before isrc
# https://github.com/music-assistant/hass-music-assistant/issues/2316
ExternalID.ISRC,
ExternalID.ASIN,
# track version must match
if strict and not compare_version(base_item.version, compare_item.version):
return False
- if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
- return True
- # for strict matching we REQUIRE both items to be a real track object
- assert isinstance(base_item, Track)
- assert isinstance(compare_item, Track)
# check if both tracks are (not) explicit
if base_item.metadata.explicit is None and isinstance(base_item.album, Album):
base_item.metadata.explicit = base_item.album.metadata.explicit
if strict and compare_explicit(base_item.metadata, compare_item.metadata) is False:
return False
if not strict and not (base_item.album or track_albums):
- # in non-strict mode, the album does not have to match
- return abs(base_item.duration - compare_item.duration) <= 3
+ # in non-strict mode, the album does not have to match (but duration needs to)
+ return abs(base_item.duration - compare_item.duration) <= 2
# exact albumtrack match = 100% match
if (
base_item.album
and compare_item.album
and compare_album(base_item.album, compare_item.album, False)
+ and base_item.disc_number == compare_item.disc_number
and base_item.track_number == compare_item.track_number
):
return True
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.tags")
+# silence the eyed3 logger because it is too verbose
+logging.getLogger("eyed3").setLevel(logging.WARNING)
+
+
# the only multi-item splitter we accept is the semicolon,
# which is also the default in Musicbrainz Picard.
# the slash is also a common splitter but causes collisions with
-# artists actually containing a slash in the name, such as ACDC
+# artists actually containing a slash in the name, such as AC/DC
TAG_SPLITTER = ";"
track=deezer_track,
user_country=self.gw_client.user_country,
# TODO: doesn't Deezer have disc and track number in the api ?
- position=count,
+ position=0,
)
- for count, deezer_track in enumerate(await album.get_tracks(), 1)
+ for deezer_track in await album.get_tracks()
]
async def get_playlist_tracks(
metadata=self.parse_metadata_track(track=track),
track_number=position,
position=position,
- disc_number=getattr(track, "disk_number", 1),
+ disc_number=getattr(track, "disk_number", 0),
)
if isrc := getattr(track, "isrc", None):
item.external_ids.add((ExternalID.ISRC, isrc))
# parse other info
track.duration = tags.duration or 0
track.metadata.genres = set(tags.genres)
- track.disc_number = tags.disc
- track.track_number = tags.track
+ if tags.disc:
+ track.disc_number = tags.disc
+ if tags.track:
+ track.track_number = tags.track
track.metadata.copyright = tags.get("copyright")
track.metadata.lyrics = tags.lyrics
explicit_tag = tags.get("itunesadvisory")
parentdir = os.path.dirname(track_path)
dirname = parentdir.rsplit(os.sep)[-1]
dirname_lower = dirname.lower()
- if disc_number is not None and compare_strings(f"disc {disc_number}", dirname, False):
+ if disc_number and compare_strings(f"disc {disc_number}", dirname, False):
return parentdir
if dirname_lower.startswith(album_name.lower()) and "disc" in dirname_lower:
return parentdir
url=plex_track.getWebURL(self._baseurl),
)
},
+ disc_number=plex_track.parentIndex or 0,
+ track_number=plex_track.trackNumber or 0,
)
# Only add 5-star rated tracks to Favorites. userRating will be 10.0 for those.
# TODO: Let user set threshold?
for plex_chapter in plex_track.chapters
]
- available = False
- content = None
-
return track
async def search(
"""Get album tracks for given album id."""
plex_album: PlexAlbum = await self._get_data(prov_album_id, PlexAlbum)
tracks = []
- for idx, plex_track in enumerate(await self._run_async(plex_album.tracks), 1):
+ for plex_track in await self._run_async(plex_album.tracks):
track = await self._parse_track(
plex_track,
)
- track.disc_number = plex_track.parentIndex
- track.track_number = plex_track.trackNumber or idx
tracks.append(track)
return tracks
url=f'https://open.qobuz.com/track/{track_obj["id"]}',
)
},
- disc_number=track_obj.get("media_number"),
- track_number=track_obj.get("track_number"),
+ disc_number=track_obj.get("media_number", 0),
+ track_number=track_obj.get("track_number", 0),
)
if isrc := track_obj.get("isrc"):
track.external_ids.add((ExternalID.ISRC, isrc))
available=not track_obj["is_local"] and track_obj["is_playable"],
)
},
- disc_number=track_obj.get("disc_number"),
- track_number=track_obj.get("track_number"),
+ disc_number=track_obj.get("disc_number", 0),
+ track_number=track_obj.get("track_number", 0),
)
if isrc := track_obj.get("external_ids", {}).get("isrc"):
track.external_ids.add((ExternalID.ISRC, isrc))
available=track_obj.available,
)
},
- disc_number=track_obj.volume_num,
- track_number=track_obj.track_num,
+ disc_number=track_obj.volume_num or 0,
+ track_number=track_obj.track_num or 0,
)
if track_obj.isrc:
track.external_ids.add((ExternalID.ISRC, track_obj.isrc))
if not album_obj.get("tracks"):
return []
tracks = []
- for idx, track_obj in enumerate(album_obj["tracks"], 1):
+ for track_obj in album_obj["tracks"]:
try:
track = self._parse_track(track_obj=track_obj)
- track.disc_number = 0
- track.track_number = track_obj.get("trackNumber", idx)
except InvalidDataError:
continue
tracks.append(track)
),
)
},
+ disc_number=0, # not supported on YTM?
+ track_number=track_obj.get("trackNumber", 0),
)
if track_obj.get("artists"):