Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.constants import MASS_LOGO, VARIOUS_ARTISTS_FANART
+from music_assistant.constants import DB_SCHEMA_VERSION, MASS_LOGO, VARIOUS_ARTISTS_FANART
from music_assistant.server.helpers.tags import AudioTags, parse_tags
from music_assistant.server.models.music_provider import MusicProvider
images=[DEFAULT_THUMB]
if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
else [DEFAULT_THUMB, DEFAULT_FANART],
- cache_checksum=str(time.time()),
+ cache_checksum=str(int(time.time())),
),
)
# user created universal playlist
owner="Music Assistant",
is_editable=True,
)
- playlist.metadata.cache_checksum = str(stored_item.get("last_updated", 0))
+ playlist.metadata.cache_checksum = f"{DB_SCHEMA_VERSION}.{stored_item.get('last_updated')}"
if image_url := stored_item.get("image_url"):
playlist.metadata.images = [
MediaItemImage(
return await self._get_builtin_playlist_tracks(prov_playlist_id)
# user created universal playlist
result: list[Track] = []
- playlist_items = await self._read_playlist_file_items(prov_playlist_id)
- for index, uri in enumerate(playlist_items[offset:limit]):
+ playlist_items = await self._read_playlist_file_items(prov_playlist_id, offset, limit)
+ for index, uri in enumerate(playlist_items):
try:
- # get the provider item and not the full track from a regular 'get' call
- # as we only need basic track info here
media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
media_controller = self.mass.music.get_controller(media_type)
- track = await media_controller.get_provider_item(
+ # prefer item already in the db
+ track = await media_controller.get_library_item_by_prov_id(
item_id, provider_instance_id_or_domain
)
+ if track is None:
+ # get the provider item and not the full track from a regular 'get' call
+ # as we only need basic track info here
+ track = await media_controller.get_provider_item(
+ item_id, provider_instance_id_or_domain
+ )
track.position = offset + index
result.append(track)
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
return result
return result
- async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
+ async def _read_playlist_file_items(
+ self, playlist_id: str, offset: int = 0, limit: int = 100000
+ ) -> list[str]:
"""Return lines of a playlist file."""
playlist_file = os.path.join(self._playlists_dir, playlist_id)
if not await asyncio.to_thread(os.path.isfile, playlist_file):
aiofiles.open(playlist_file, "r", encoding="utf-8") as _file,
):
lines = await _file.readlines()
- return [x.strip() for x in lines]
+ return [x.strip() for x in lines[offset : offset + limit]]
async def _write_playlist_file_items(self, playlist_id: str, lines: list[str]) -> None:
"""Return lines of a playlist file."""