ProviderMapping,
Radio,
Track,
+ UniqueList,
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import DB_SCHEMA_VERSION, MASS_LOGO, VARIOUS_ARTISTS_FANART
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
parsed_item = await self.parse_item(prov_track_id)
+ assert isinstance(parsed_item, Track)
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
# always prefer the stored info, such as the name
parsed_item.name = stored_item["name"]
if image_url := stored_item.get("image_url"):
- parsed_item.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=image_url,
- provider=self.instance_id,
- remotely_accessible=image_url.startswith("http"),
- )
- ]
+ parsed_item.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=image_url.startswith("http"),
+ )
+ ]
+ )
return parsed_item
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get full radio details by id."""
parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
+ assert isinstance(parsed_item, Radio)
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
# always prefer the stored info, such as the name
parsed_item.name = stored_item["name"]
if image_url := stored_item.get("image_url"):
- parsed_item.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=image_url,
- provider=self.instance_id,
- remotely_accessible=image_url.startswith("http"),
- )
- ]
+ parsed_item.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=image_url.startswith("http"),
+ )
+ ]
+ )
return parsed_item
- async def get_artist(self, prov_artist_id: str) -> Track:
+ async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
artist = prov_artist_id
# this is here for compatibility reasons only
owner="Music Assistant",
is_editable=False,
metadata=MediaItemMetadata(
- images=[DEFAULT_THUMB]
+ images=UniqueList([DEFAULT_THUMB])
if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
- else [DEFAULT_THUMB, DEFAULT_FANART],
+ else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]),
cache_checksum=str(int(time.time())),
),
)
)
playlist.metadata.cache_checksum = f"{DB_SCHEMA_VERSION}.{stored_item.get('last_updated')}"
if image_url := stored_item.get("image_url"):
- playlist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=image_url,
- provider=self.instance_id,
- remotely_accessible=image_url.startswith("http"),
- )
- ]
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=image_url.startswith("http"),
+ )
+ ]
+ )
return playlist
async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
track = await media_controller.get_provider_item(
item_id, provider_instance_id_or_domain
)
+ assert isinstance(track, Track)
track.position = offset + index
result.append(track)
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
# mark last_updated on playlist object
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
- stored_item["last_updated"] = int(time.time())
- self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+ if stored_item:
+ stored_item["last_updated"] = int(time.time())
+ self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
# mark last_updated on playlist object
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
- stored_item["last_updated"] = int(time.time())
- self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+ if stored_item:
+ stored_item["last_updated"] = int(time.time())
+ self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
- async def create_playlist(self, name: str) -> Playlist: # type: ignore[return]
+ async def create_playlist(self, name: str) -> Playlist:
"""Create a new playlist on provider with given name."""
item_id = shortuuid.random(8)
stored_item = StoredItem(item_id=item_id, name=name)
),
)
}
+ media_item: Track | Radio
if is_radio or force_radio:
# treat as radio
media_item = Radio(
provider=self.domain,
name=media_info.title or url,
duration=int(media_info.duration or 0),
- artists=[await self.get_artist(artist) for artist in media_info.artists],
+ artists=UniqueList(
+ [await self.get_artist(artist) for artist in media_info.artists]
+ ),
provider_mappings=provider_mappings,
)
if media_info.has_cover_image:
- media_item.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=url,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ media_item.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=url,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
return media_item
async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
can_seek=not is_radio,
)
- async def _get_builtin_playlist_tracks(
- self, builtin_playlist_id: str
- ) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given builtin playlist id."""
+ async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
result: list[Track] = []
- if builtin_playlist_id == ALL_FAVORITE_TRACKS:
- res = await self.mass.music.tracks.library_items(
- favorite=True, limit=250000, order_by="random"
+ res = await self.mass.music.tracks.library_items(
+ favorite=True, limit=250000, order_by="random"
+ )
+ for idx, item in enumerate(res, 1):
+ item.position = idx
+ result.append(item)
+ return result
+
+ async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
+ result: list[Track] = []
+ res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
+ for idx, item in enumerate(res, 1):
+ item.position = idx
+ result.append(item)
+ return result
+
+ async def _get_builtin_playlist_random_album(self) -> list[Track]:
+ result: list[Track] = []
+ for random_album in await self.mass.music.albums.library_items(
+ limit=1, order_by="random_fast"
+ ):
+ tracks = await self.mass.music.albums.tracks(
+ random_album.item_id, random_album.provider
+ )
+ for idx, track in enumerate(tracks, 1):
+ track.position = idx
+ result.append(track)
+ return result
+
+ async def _get_builtin_playlist_random_artist(self) -> list[Track]:
+ result: list[Track] = []
+ for random_artist in await self.mass.music.artists.library_items(
+ limit=1, order_by="random_fast"
+ ):
+ tracks = await self.mass.music.artists.tracks(
+ random_artist.item_id, random_artist.provider
)
- for idx, item in enumerate(res, 1):
- item.position = idx
- result.append(item)
- return result
- if builtin_playlist_id == RANDOM_TRACKS:
- res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
- for idx, item in enumerate(res, 1):
- item.position = idx
- result.append(item)
- return result
- if builtin_playlist_id == RANDOM_ALBUM:
- for random_album in await self.mass.music.albums.library_items(
- limit=1, order_by="random_fast"
- ):
- # use the function specified in the queue controller as that
- # already handles unwrapping an album by user preference
- tracks = await self.mass.music.albums.tracks(
- random_album.item_id, random_album.provider
- )
- for idx, track in enumerate(tracks, 1):
- track.position = idx
- result.append(track)
- return result
- if builtin_playlist_id == RANDOM_ARTIST:
- for random_artist in await self.mass.music.artists.library_items(
- limit=1, order_by="random_fast"
- ):
- # use the function specified in the queue controller as that
- # already handles unwrapping an artist by user preference
- tracks = await self.mass.music.artists.tracks(
- random_artist.item_id, random_artist.provider
- )
- for idx, track in enumerate(tracks, 1):
- track.position = idx
- result.append(track)
- return result
- if builtin_playlist_id == RECENTLY_PLAYED:
- tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
for idx, track in enumerate(tracks, 1):
track.position = idx
result.append(track)
- return result
return result
+ async def _get_builtin_playlist_recently_played(self) -> list[Track]:
+ result: list[Track] = []
+ recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
+ for idx, track in enumerate(recent_tracks, 1):
+ assert isinstance(track, Track)
+ track.position = idx
+ result.append(track)
+ return result
+
+ async def _get_builtin_playlist_tracks(self, builtin_playlist_id: str) -> list[Track]:
+ """Get all playlist tracks for given builtin playlist id."""
+ try:
+ return await {
+ ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks,
+ RANDOM_TRACKS: self._get_builtin_playlist_random_tracks,
+ RANDOM_ALBUM: self._get_builtin_playlist_random_album,
+ RANDOM_ARTIST: self._get_builtin_playlist_random_artist,
+ RECENTLY_PLAYED: self._get_builtin_playlist_recently_played,
+ }[builtin_playlist_id]()
+ except KeyError:
+ raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")
+
async def _read_playlist_file_items(
self, playlist_id: str, offset: int = 0, limit: int = 100000
) -> list[str]: