album, disc_number and track_number
"""
- album: Album
+ album: Album | ItemMapping
disc_number: int
track_number: int
@classmethod
- def from_track(cls: Self, track: Track, album: Album | None = None) -> Self:
+ def from_track(
+ cls: Self,
+ track: Track,
+ album: Album | None = None,
+ disc_number: int | None = None,
+ track_number: int | None = None,
+ ) -> Self:
"""Cast Track to AlbumTrack."""
- if album:
- track.album = album
- assert isinstance(track.album, Album)
- assert track.disc_number is not None
- assert track.track_number is not None
- return cast(AlbumTrack, track)
+ if album is None:
+ album = track.album
+ if disc_number is None:
+ disc_number = track.disc_number
+ if track_number is None:
+ track_number = track.track_number
+ # let mushmumaro instantiate a new object - this will ensure that valididation takes place
+ return AlbumTrack.from_dict(
+ {
+ **track.to_dict(),
+ "album": album.to_dict(),
+ "disc_number": disc_number,
+ "track_number": track_number,
+ }
+ )
@dataclass(kw_only=True)
position: int
@classmethod
- def from_track(cls: Self, track: Track) -> Self:
+ def from_track(cls: Self, track: Track, position: int | None = None) -> Self:
"""Cast Track to PlaylistTrack."""
- assert track.position is not None
- return cast(AlbumTrack, track)
+ if position is None:
+ position = track.position
+ # let mushmumaro instantiate a new object - this will ensure that valididation takes place
+ return PlaylistTrack.from_dict(
+ {
+ **track.to_dict(),
+ "position": position,
+ }
+ )
@dataclass(kw_only=True)
full_album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
# prefer cache items (if any) for streaming providers only
cache_key = f"{prov.lookup_key}.albumtracks.{item_id}"
- if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
+ if (
+ prov.is_streaming_provider
+ and (cache := await self.mass.cache.get(cache_key)) is not None
+ ):
return [AlbumTrack.from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = []
return []
# prefer cache items (if any) - for streaming providers
cache_key = f"{prov.lookup_key}.artist_toptracks.{item_id}"
- if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
+ if (
+ prov.is_streaming_provider
+ and (cache := await self.mass.cache.get(cache_key)) is not None
+ ):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
item_id,
provider_instance_id_or_domain,
):
- # TODO: adjust to json query instead of text search?
- query = f"WHERE tracks.artists LIKE '%\"{db_artist.item_id}\"%'"
- query += (
- f" AND tracks.provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
+ query = (
+ f"WHERE trackartists.artist_id = {db_artist.item_id} AND "
+ f'(provider_mappings.provider_domain = "{provider_instance_id_or_domain}" OR '
+ f'provider_mappings.provider_instance = "{provider_instance_id_or_domain}")'
)
paged_list = await self.mass.music.tracks.library_items(extra_query=query)
return paged_list.items
return []
# prefer cache items (if any)
cache_key = f"{prov.lookup_key}.artist_albums.{item_id}"
- if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
+ if (
+ prov.is_streaming_provider
+ and (cache := await self.mass.cache.get(cache_key)) is not None
+ ):
return [Album.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if ProviderFeature.ARTIST_ALBUMS in prov.supported_features:
item_id,
provider_instance_id_or_domain,
):
- # TODO: adjust to json query instead of text search?
- query = f"WHERE albums.artists LIKE '%\"{db_artist.item_id}\"%'"
- query += (
- f" AND albums.provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
+ query = (
+ f"WHERE albumartists.artist_id = {db_artist.item_id} AND "
+ f'(provider_mappings.provider_domain = "{provider_instance_id_or_domain}" OR '
+ f'provider_mappings.provider_instance = "{provider_instance_id_or_domain}")'
)
paged_list = await self.mass.music.albums.library_items(extra_query=query)
return paged_list.items
# prefer cache items (if any)
cache_key = f"{prov.lookup_key}.search.{self.media_type.value}.{search_query}.{limit}"
- if cache := await self.mass.cache.get(cache_key):
+ cache_key = cache_key.lower().replace("", "")
+ if (cache := await self.mass.cache.get(cache_key)) is not None:
return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
searchresult = await prov.search(
return
# prefer cache items (if any)
cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks"
- if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
+ if (cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)) is not None:
for track_dict in cache:
yield PlaylistTrack.from_dict(track_dict)
return
version TEXT,
favorite BOOLEAN DEFAULT 0,
metadata json NOT NULL,
- provider_mappings json NOT NULL,
external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
sort_name TEXT NOT NULL,
favorite BOOLEAN DEFAULT 0,
metadata json NOT NULL,
- provider_mappings json NOT NULL,
external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
duration INTEGER,
favorite BOOLEAN DEFAULT 0,
metadata json NOT NULL,
- provider_mappings json NOT NULL,
external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
is_editable BOOLEAN NOT NULL,
favorite BOOLEAN DEFAULT 0,
metadata json,
- provider_mappings json,
external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
sort_name TEXT NOT NULL,
favorite BOOLEAN DEFAULT 0,
metadata json,
- provider_mappings json,
external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
raise MediaNotFoundError(msg)
album_tracks = await self.mass.music.albums.tracks(db_album.item_id, db_album.provider)
return [
- track
+ AlbumTrack.from_track(track, db_album)
for track in album_tracks
if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
]
playlist_lines = parse_pls(playlist_data)
for line_no, playlist_line in enumerate(playlist_lines, 0):
- if media_item := await self._parse_playlist_line(
- playlist_line.path, os.path.dirname(prov_playlist_id), line_no
+ if track := await self._parse_playlist_line(
+ playlist_line.path, os.path.dirname(prov_playlist_id)
):
- yield media_item
+ yield PlaylistTrack.from_track(track, line_no)
except Exception as err: # pylint: disable=broad-except
self.logger.warning(
exc_info=err if self.logger.isEnabledFor(10) else None,
)
- async def _parse_playlist_line(
- self, line: str, playlist_path: str, position: int
- ) -> PlaylistTrack | None:
+ async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
"""Try to parse a track from a playlist line."""
try:
# if a relative path was given in an upper level from the playlist,
for filename in (line, os.path.join(playlist_path, line)):
with contextlib.suppress(FileNotFoundError):
item = await self.resolve(filename)
- return await self._parse_track(item, playlist_position=position)
+ return await self._parse_track(item)
except MusicAssistantError as err:
self.logger.warning("Could not parse uri/file %s to track: %s", line, str(err))
file_item = await self.resolve(path)
return file_item.local_path or self.read_file_content(file_item.absolute_path)
- async def _parse_track(
- self, file_item: FileSystemItem, playlist_position: int | None = None
- ) -> Track | AlbumTrack | PlaylistTrack:
+ async def _parse_track(self, file_item: FileSystemItem) -> Track:
"""Get full track details by id."""
# ruff: noqa: PLR0915, PLR0912
input_file = file_item.local_path or self.read_file_content(file_item.absolute_path)
tags = await parse_tags(input_file, file_item.file_size)
name, version = parse_title_and_version(tags.title, tags.version)
- base_details = {
- "item_id": file_item.path,
- "provider": self.instance_id,
- "name": name,
- "version": version,
- "provider_mappings": {
+ track = Track(
+ item_id=file_item.path,
+ provider=self.instance_id,
+ name=name,
+ sort_name=tags.title_sort,
+ version=version,
+ provider_mappings={
ProviderMapping(
item_id=file_item.path,
provider_domain=self.domain,
),
)
},
- }
- if playlist_position is not None:
- track = PlaylistTrack(
- **base_details,
- position=playlist_position,
- )
- elif tags.album and tags.disc and tags.track:
- track = AlbumTrack( # pylint: disable=missing-kwoa
- **base_details,
- disc_number=tags.disc,
- track_number=tags.track,
- )
- else:
- track = Track(
- **base_details,
- )
+ disc_number=tags.disc,
+ track_number=tags.track,
+ )
if isrc_tags := tags.isrc:
for isrsc in isrc_tags: