from __future__ import annotations
import asyncio
-from typing import List, Optional
+import itertools
+from typing import Dict, List, Optional
from music_assistant.helpers.compare import compare_album, compare_strings
from music_assistant.helpers.database import TABLE_ALBUMS
) -> List[Track]:
"""Return album tracks for the given provider album id."""
album = await self.get(item_id, provider, provider_id)
- # simply return the tracks from the first provider
- for prov in album.provider_ids:
- if tracks := await self.get_provider_album_tracks(
- prov.item_id, prov.prov_id
- ):
- return tracks
- return []
+ # get results from all providers
+ coros = [
+ self.get_provider_album_tracks(item.item_id, item.prov_id)
+ for item in album.provider_ids
+ ]
+ tracks = itertools.chain.from_iterable(await asyncio.gather(*coros))
+ # merge duplicates using a dict
+ final_items: Dict[str, Track] = {}
+ for track in tracks:
+ key = f"{track.disc_number}.{track.track_number}"
+ if key in final_items:
+ final_items[key].provider_ids.update(track.provider_ids)
+ else:
+ track.album = album
+ final_items[key] = track
+ return list(final_items.values())
async def versions(
self,
*[self.search(search_query, prov_type) for prov_type in prov_types]
)
for prov_item in prov_items
- if compare_strings(prov_item.artist.name, album.artist.name)
+ if prov_item.sort_name == album.sort_name
+ and compare_strings(prov_item.artist.name, album.artist.name)
]
async def add(self, item: Album) -> Album:
self.get_provider_artist_toptracks(item.item_id, item.prov_id)
for item in artist.provider_ids
]
- if provider == ProviderType.DATABASE:
- coros.append(self.get_database_artist_tracks(item_id, provider))
- return itertools.chain.from_iterable(await asyncio.gather(*coros))
+ # use intermediate set to remove duplicates
+ return list(set(itertools.chain.from_iterable(await asyncio.gather(*coros))))
async def albums(
self,
self.get_provider_artist_albums(item.item_id, item.prov_id)
for item in artist.provider_ids
]
- if provider == ProviderType.DATABASE:
- coros.append(self.get_database_artist_albums(item_id, provider))
- return itertools.chain.from_iterable(await asyncio.gather(*coros))
+ # use intermediate set to remove duplicates
+ return list(set(itertools.chain.from_iterable(await asyncio.gather(*coros))))
async def add(self, item: Artist) -> Artist:
"""Add artist to local db and return the database item."""
return []
return await provider.get_artist_toptracks(item_id)
- async def get_database_artist_tracks(
- self, artist_id: str, provider: ProviderType
- ) -> List[Track]:
- """Return tracks for an artist in database."""
- query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{artist_id}\"%'"
- query += f" and artists LIKE '%\"{provider.value}\"%'"
- return await self.mass.music.tracks.get_db_items(query)
-
- async def get_database_artist_albums(
- self, artist_id: str, provider: ProviderType
- ) -> List[Track]:
- """Return tracks for an artist in database."""
- query = f"SELECT * FROM albums WHERE artist LIKE '%\"{artist_id}\"%'"
- query += f" and artist LIKE '%\"{provider.value}\"%'"
- return await self.mass.music.albums.get_db_items(query)
-
async def get_provider_artist_albums(
self, item_id: str, provider_id: str
) -> List[Album]:
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- itempath = await self.get_filepath(prov_artist_id)
+ itempath = await self.get_filepath(MediaType.ARTIST, prov_artist_id)
return await self._parse_artist(artist_path=itempath)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- itempath = await self.get_filepath(prov_album_id)
+ itempath = await self.get_filepath(MediaType.ALBUM, prov_album_id)
return await self._parse_album(album_path=itempath)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- itempath = await self.get_filepath(prov_track_id)
+ itempath = await self.get_filepath(MediaType.TRACK, prov_track_id)
return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- itempath = await self.get_filepath(prov_playlist_id)
+ itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
return await self._parse_playlist(itempath)
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
- itempath = await self.get_filepath(prov_album_id)
- if not self.exists(itempath):
- query = f"SELECT * FROM tracks WHERE album LIKE '%\"{prov_album_id}\"%'"
- query += f" and album LIKE '%\"{self.type.value}\"%'"
- return await self.mass.music.tracks.get_db_items(query)
- result = []
- for entry in scantree(itempath):
- # mtime is used as file checksum
- checksum = int(entry.stat().st_mtime)
- if track := await self._parse_track(entry.path, checksum):
- result.append(track)
- return result
+ # filesystem items are always stored in db so we can query the database
+ query = f"SELECT * FROM tracks WHERE (album LIKE '%{prov_album_id}%'"
+ query += f" AND album LIKE '%{self.type.value}%')"
+ db_id = await self.mass.music.get_provider_mapping(
+ MediaType.ALBUM, provider=self.type, provider_item_id=prov_album_id
+ )
+ if db_id is not None:
+ query += f" OR (album LIKE '%{db_id}%' AND album LIKE '%database%')"
+ return await self.mass.music.tracks.get_db_items(query)
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
- playlist_path = await self.get_filepath(prov_playlist_id)
+ playlist_path = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
checksum = self._get_checksum(playlist_path)
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
- itempath = await self.get_filepath(prov_artist_id)
- if not self.exists(itempath):
- return await self.mass.music.artists.get_database_artist_albums(
- prov_artist_id, self.type
- )
- result = []
- for entry in os.scandir(itempath):
- if entry.is_dir(follow_symlinks=False):
- if album := await self._parse_album(entry.path):
- result.append(album)
- return result
+ # filesystem items are always stored in db so we can query the database
+ query = f"SELECT * FROM albums WHERE (artist LIKE '%\"{prov_artist_id}\"%'"
+ query += f" AND artist LIKE '%\"{self.type.value}\"%')"
+ db_id = await self.mass.music.get_provider_mapping(
+ MediaType.ARTIST, provider=self.type, provider_item_id=prov_artist_id
+ )
+ if db_id is not None:
+ query += f" OR (artist LIKE '%{db_id}%' AND artist LIKE '%database%')"
+ return await self.mass.music.albums.get_db_items(query)
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of all tracks as we have no clue about preference."""
- itempath = await self.get_filepath(prov_artist_id)
- if not self.exists(itempath):
- return await self.mass.music.artists.get_database_artist_tracks(
- prov_artist_id, self.type
- )
- result = []
- for entry in scantree(itempath):
- # mtime is used as file checksum
- checksum = int(entry.stat().st_mtime)
- if track := await self._parse_track(entry.path, checksum):
- result.append(track)
- return result
+ # filesystem items are always stored in db so we can query the database
+ query = f"SELECT * FROM tracks WHERE (artists LIKE '%{prov_artist_id}%'"
+ query += f" AND artists LIKE '%{self.type.value}%')"
+ db_id = await self.mass.music.get_provider_mapping(
+ MediaType.ARTIST, provider=self.type, provider_item_id=prov_artist_id
+ )
+ if db_id is not None:
+ query += f" OR (artists LIKE '%{db_id}%' AND artists LIKE '%database%')"
+ return await self.mass.music.tracks.get_db_items(query)
async def library_add(self, *args, **kwargs) -> bool:
"""Add item to provider's library. Return true on succes."""
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Add track(s) to playlist."""
- itempath = await self.get_filepath(prov_playlist_id)
+ itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
async with self.open_file(itempath, "r") as _file:
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Remove track(s) from playlist."""
- itempath = await self.get_filepath(prov_playlist_id)
+ itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
cur_lines = []
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- itempath = await self.get_filepath(item_id)
+ itempath = await self.get_filepath(MediaType.TRACK, item_id)
if not self.exists(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
self, track_path: str, checksum: Optional[int] = None
) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
- track_item_id = self._get_item_id(track_path)
if not self.exists(track_path):
raise MediaNotFoundError(f"Track path does not exist: {track_path}")
+ track_item_id = self._get_item_id(track_path)
+
# reading file/tags is slow so we keep a cache and checksum
checksum = checksum or self._get_checksum(track_path)
cache_key = f"{self.id}_tracks_{track_item_id}"
# create fake path
album_path = os.path.join(self.config.path, artist.name, name)
elif not album_path:
- album_path = os.path.join(self.config.path, name)
+ album_path = os.path.join("Albums", name)
album_item_id = self._get_item_id(album_path)
if not name:
playlist.metadata.checksum = checksum
return playlist
- async def _parse_track_from_uri(self, uri):
+ async def _parse_track_from_uri(self, uri: str):
"""Try to parse a track from an uri found in playlist."""
if "://" in uri:
# track is uri from external provider?
return await self.mass.loop.run_in_executor(None, _get_data)
- async def get_filepath(self, item_id: str) -> str | None:
+ async def get_filepath(self, media_type: MediaType, item_id: str) -> str | None:
"""Get full filepath on disk for item_id."""
if item_id is None:
return None # guard
file_path = await self.mass.music.get_provider_mapping(
- provider_id=self.id, provider_item_id=item_id, return_key="url"
+ media_type, provider_id=self.id, provider_item_id=item_id, return_key="url"
)
if file_path is not None:
# ensure we have a full path and not relative