CONF_MUSIC_APP_TOKEN = "music_app_token"
CONF_MUSIC_USER_TOKEN = "music_user_token"
+CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
CACHE_CATEGORY_DECRYPT_KEY = 1
key=CONF_MUSIC_USER_TOKEN,
type=ConfigEntryType.SECURE_STRING,
label="Music User Token",
- required=True,
+ required=False,
action="CONF_ACTION_AUTH",
description="Authenticate with Apple Music to retrieve a valid music user token.",
action_label="Authenticate with Apple Music",
)
else None,
),
+ ConfigEntry(
+ key=CONF_MUSIC_USER_MANUAL_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Manual Music User Token",
+ required=False,
+ category="advanced",
+ description=(
+ "Authenticate with a manual Music User Token in case the Authentication flow"
+ " is unsupported (e.g. when using child accounts)."
+ ),
+ help_link="https://www.music-assistant.io/music-providers/apple-music/",
+ value=values.get(CONF_MUSIC_USER_MANUAL_TOKEN),
+ ),
ConfigEntry(
key=CONF_MUSIC_USER_TOKEN_TIMESTAMP,
type=ConfigEntryType.INTEGER,
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self._music_user_token = self.config.get_value(CONF_MUSIC_USER_TOKEN)
+ self._music_user_token = self.config.get_value(
+ CONF_MUSIC_USER_MANUAL_TOKEN
+ ) or self.config.get_value(CONF_MUSIC_USER_TOKEN)
self._music_app_token = self.config.get_value(CONF_MUSIC_APP_TOKEN)
self._storefront = await self._get_user_storefront()
# create random session id to use for decryption keys
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve library albums from the provider."""
endpoint = "me/library/albums"
- for item in await self._get_all_items(
+ album_items = await self._get_all_items(
endpoint, include="catalog,artists", extend="editorialNotes"
- ):
+ )
+ album_catalog_item_ids = [
+ item["id"]
+ for item in album_items
+ if item and item["id"] and not self.is_library_id(item["id"])
+ ]
+ album_library_item_ids = [
+ item["id"]
+ for item in album_items
+ if item and item["id"] and self.is_library_id(item["id"])
+ ]
+ rating_catalog_response = await self._get_ratings(album_catalog_item_ids, MediaType.ALBUM)
+ rating_library_response = await self._get_ratings(album_library_item_ids, MediaType.ALBUM)
+ for item in album_items:
if item and item["id"]:
- album = self._parse_album(item)
+ is_favourite = (
+ rating_catalog_response.get(item["id"])
+ if not self.is_library_id(item["id"])
+ else rating_library_response.get(item["id"])
+ )
+ album = self._parse_album(item, is_favourite)
if album:
yield album
track = self._parse_track(item, is_favourite)
yield track
# Yield library-only tracks using their library metadata
+ library_ids = [item["id"] for item in library_only_tracks if item and item["id"]]
+ library_rating_response = await self._get_ratings(library_ids, MediaType.TRACK)
for item in library_only_tracks:
- yield self._parse_track(item)
+ is_favourite = library_rating_response.get(item["id"])
+ yield self._parse_track(item, is_favourite)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve playlists from the provider."""
endpoint = "me/library/playlists"
- for item in await self._get_all_items(endpoint):
+ playlist_items = await self._get_all_items(endpoint)
+ playlist_library_item_ids = [
+ item["id"]
+ for item in playlist_items
+ if item and item["id"] and self.is_library_id(item["id"])
+ ]
+ rating_library_response = await self._get_ratings(
+ playlist_library_item_ids, MediaType.PLAYLIST
+ )
+ for item in playlist_items:
+ is_favourite = rating_library_response.get(item["id"])
# Prefer catalog information over library information in case of public playlists
if item["attributes"]["hasCatalog"]:
- yield await self.get_playlist(item["attributes"]["playParams"]["globalId"])
+ yield await self.get_playlist(
+ item["attributes"]["playParams"]["globalId"], is_favourite
+ )
elif item and item["id"]:
- yield self._parse_playlist(item)
+ yield self._parse_playlist(item, is_favourite)
@use_cache()
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full album details by id."""
endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}"
response = await self._get_data(endpoint, include="artists")
- return self._parse_album(response["data"][0])
+ rating_response = await self._get_ratings([prov_album_id], MediaType.ALBUM)
+ is_favourite = rating_response.get(prov_album_id)
+ return self._parse_album(response["data"][0], is_favourite)
@use_cache()
async def get_track(self, prov_track_id) -> Track:
return self._parse_track(response["data"][0], is_favourite)
@use_cache()
- async def get_playlist(self, prov_playlist_id) -> Playlist:
+ async def get_playlist(self, prov_playlist_id, is_favourite: bool = False) -> Playlist:
"""Get full playlist details by id."""
- if self._is_catalog_id(prov_playlist_id):
+ if not self.is_library_id(prov_playlist_id):
endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
else:
endpoint = f"me/library/playlists/{prov_playlist_id}"
endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
response = await self._get_data(endpoint)
- return self._parse_playlist(response["data"][0])
+ return self._parse_playlist(response["data"][0], is_favourite)
@use_cache()
async def get_album_tracks(self, prov_album_id) -> list[Track]:
)
if not response or "data" not in response:
return result
+ playlist_track_ids = [track["id"] for track in response["data"] if track and track["id"]]
+ rating_response = await self._get_ratings(playlist_track_ids, MediaType.TRACK)
for index, track in enumerate(response["data"]):
if track and track["id"]:
- parsed_track = self._parse_track(track)
+ is_favourite = rating_response.get(track["id"])
+ parsed_track = self._parse_track(track, is_favourite)
parsed_track.position = offset + index + 1
result.append(parsed_track)
return result
# Some artists do not have albums, return empty list
self.logger.info("No albums found for artist %s", prov_artist_id)
return []
- return [self._parse_album(album) for album in response if album["id"]]
+ album_ids = [album["id"] for album in response if album["id"]]
+ rating_response = await self._get_ratings(album_ids, MediaType.ALBUM)
+ albums = []
+ for album in response:
+ if not album["id"]:
+ continue
+ is_favourite = rating_response.get(album["id"])
+ parsed_album = self._parse_album(album, is_favourite)
+ if parsed_album:
+ albums.append(parsed_album)
+ return albums
@use_cache(3600 * 24 * 7) # cache for 7 days
async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
# Some artists do not have top tracks, return empty list
self.logger.info("No top tracks found for artist %s", prov_artist_id)
return []
- return [self._parse_track(track) for track in response["data"] if track["id"]]
+ track_ids = [track["id"] for track in response["data"] if track["id"]]
+ rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
+ tracks = []
+ for track in response["data"]:
+ if not track["id"]:
+ continue
+ is_favourite = rating_response.get(track["id"])
+ tracks.append(self._parse_track(track, is_favourite))
+ return tracks
async def library_add(self, item: MediaItemType) -> None:
"""Add item to library."""
response = await self._post_data(endpoint, include="artists")
if not response or "data" not in response:
break
+ track_ids = [track["id"] for track in response["data"] if track and track["id"]]
+ rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
for track in response["data"]:
if track and track["id"]:
- found_tracks.append(self._parse_track(track))
+ is_favourite = rating_response.get(track["id"])
+ found_tracks.append(self._parse_track(track, is_favourite))
return found_tracks
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
artist.metadata.description = notes.get("standard") or notes.get("short")
return artist
- def _parse_album(self, album_obj: dict) -> Album | ItemMapping | None:
+ def _parse_album(
+ self, album_obj: dict, is_favourite: bool | None = None
+ ) -> Album | ItemMapping | None:
"""Parse album object to generic layout."""
relationships = album_obj.get("relationships", {})
response_type = album_obj.get("type")
inferred_type = infer_album_type(album.name, "")
if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
album.album_type = inferred_type
-
+ album.favorite = is_favourite or False
return album
def _parse_track(
track.favorite = is_favourite or False
return track
- def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
+ def _parse_playlist(
+ self, playlist_obj: dict[str, Any], is_favourite: bool | None = None
+ ) -> Playlist:
"""Parse Apple Music playlist object to generic layout."""
attributes = playlist_obj["attributes"]
playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
)
if description := attributes.get("description"):
playlist.metadata.description = description.get("standard")
+ playlist.favorite = is_favourite or False
return playlist
async def _get_all_items(self, endpoint, key="data", **kwargs) -> list[dict]:
raise NotImplementedError(
"Ratings are not available for artist in the Apple Music API."
)
- endpoint = self._translate_media_type_to_apple_type(media_type)
+ if len(item_ids) == 0:
+ return {}
+ apple_type = self._translate_media_type_to_apple_type(media_type)
+ endpoint = apple_type if not self.is_library_id(item_ids[0]) else f"library-{apple_type}"
# Apple Music limits to 200 ids per request
max_ids_per_request = 200
results = {}