Apple Music: Add remaining favourite parsing + custom music token config (#2609)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Sat, 8 Nov 2025 11:49:19 +0000 (12:49 +0100)
committerGitHub <noreply@github.com>
Sat, 8 Nov 2025 11:49:19 +0000 (12:49 +0100)
music_assistant/providers/apple_music/__init__.py

index e64be99a2ea1b79b636019a967439de7cf8844ff..48ff6127ee85390ceecf701271d4574f062d75ba 100644 (file)
@@ -107,6 +107,7 @@ UNKNOWN_PLAYLIST_NAME = "Unknown Apple Music Playlist"
 
 CONF_MUSIC_APP_TOKEN = "music_app_token"
 CONF_MUSIC_USER_TOKEN = "music_user_token"
+CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
 CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
 CACHE_CATEGORY_DECRYPT_KEY = 1
 
@@ -236,7 +237,7 @@ async def get_config_entries(
             key=CONF_MUSIC_USER_TOKEN,
             type=ConfigEntryType.SECURE_STRING,
             label="Music User Token",
-            required=True,
+            required=False,
             action="CONF_ACTION_AUTH",
             description="Authenticate with Apple Music to retrieve a valid music user token.",
             action_label="Authenticate with Apple Music",
@@ -250,6 +251,19 @@ async def get_config_entries(
             )
             else None,
         ),
+        ConfigEntry(
+            key=CONF_MUSIC_USER_MANUAL_TOKEN,
+            type=ConfigEntryType.SECURE_STRING,
+            label="Manual Music User Token",
+            required=False,
+            category="advanced",
+            description=(
+                "Authenticate with a manual Music User Token in case the Authentication flow"
+                " is unsupported (e.g. when using child accounts)."
+            ),
+            help_link="https://www.music-assistant.io/music-providers/apple-music/",
+            value=values.get(CONF_MUSIC_USER_MANUAL_TOKEN),
+        ),
         ConfigEntry(
             key=CONF_MUSIC_USER_TOKEN_TIMESTAMP,
             type=ConfigEntryType.INTEGER,
@@ -278,7 +292,9 @@ class AppleMusicProvider(MusicProvider):
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
-        self._music_user_token = self.config.get_value(CONF_MUSIC_USER_TOKEN)
+        self._music_user_token = self.config.get_value(
+            CONF_MUSIC_USER_MANUAL_TOKEN
+        ) or self.config.get_value(CONF_MUSIC_USER_TOKEN)
         self._music_app_token = self.config.get_value(CONF_MUSIC_APP_TOKEN)
         self._storefront = await self._get_user_storefront()
         # create random session id to use for decryption keys
@@ -349,11 +365,29 @@ class AppleMusicProvider(MusicProvider):
     async def get_library_albums(self) -> AsyncGenerator[Album, None]:
         """Retrieve library albums from the provider."""
         endpoint = "me/library/albums"
-        for item in await self._get_all_items(
+        album_items = await self._get_all_items(
             endpoint, include="catalog,artists", extend="editorialNotes"
-        ):
+        )
+        album_catalog_item_ids = [
+            item["id"]
+            for item in album_items
+            if item and item["id"] and not self.is_library_id(item["id"])
+        ]
+        album_library_item_ids = [
+            item["id"]
+            for item in album_items
+            if item and item["id"] and self.is_library_id(item["id"])
+        ]
+        rating_catalog_response = await self._get_ratings(album_catalog_item_ids, MediaType.ALBUM)
+        rating_library_response = await self._get_ratings(album_library_item_ids, MediaType.ALBUM)
+        for item in album_items:
             if item and item["id"]:
-                album = self._parse_album(item)
+                is_favourite = (
+                    rating_catalog_response.get(item["id"])
+                    if not self.is_library_id(item["id"])
+                    else rating_library_response.get(item["id"])
+                )
+                album = self._parse_album(item, is_favourite)
                 if album:
                     yield album
 
@@ -384,18 +418,33 @@ class AppleMusicProvider(MusicProvider):
                 track = self._parse_track(item, is_favourite)
                 yield track
         # Yield library-only tracks using their library metadata
+        library_ids = [item["id"] for item in library_only_tracks if item and item["id"]]
+        library_rating_response = await self._get_ratings(library_ids, MediaType.TRACK)
         for item in library_only_tracks:
-            yield self._parse_track(item)
+            is_favourite = library_rating_response.get(item["id"])
+            yield self._parse_track(item, is_favourite)
 
     async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
         """Retrieve playlists from the provider."""
         endpoint = "me/library/playlists"
-        for item in await self._get_all_items(endpoint):
+        playlist_items = await self._get_all_items(endpoint)
+        playlist_library_item_ids = [
+            item["id"]
+            for item in playlist_items
+            if item and item["id"] and self.is_library_id(item["id"])
+        ]
+        rating_library_response = await self._get_ratings(
+            playlist_library_item_ids, MediaType.PLAYLIST
+        )
+        for item in playlist_items:
+            is_favourite = rating_library_response.get(item["id"])
             # Prefer catalog information over library information in case of public playlists
             if item["attributes"]["hasCatalog"]:
-                yield await self.get_playlist(item["attributes"]["playParams"]["globalId"])
+                yield await self.get_playlist(
+                    item["attributes"]["playParams"]["globalId"], is_favourite
+                )
             elif item and item["id"]:
-                yield self._parse_playlist(item)
+                yield self._parse_playlist(item, is_favourite)
 
     @use_cache()
     async def get_artist(self, prov_artist_id) -> Artist:
@@ -409,7 +458,9 @@ class AppleMusicProvider(MusicProvider):
         """Get full album details by id."""
         endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}"
         response = await self._get_data(endpoint, include="artists")
-        return self._parse_album(response["data"][0])
+        rating_response = await self._get_ratings([prov_album_id], MediaType.ALBUM)
+        is_favourite = rating_response.get(prov_album_id)
+        return self._parse_album(response["data"][0], is_favourite)
 
     @use_cache()
     async def get_track(self, prov_track_id) -> Track:
@@ -421,15 +472,15 @@ class AppleMusicProvider(MusicProvider):
         return self._parse_track(response["data"][0], is_favourite)
 
     @use_cache()
-    async def get_playlist(self, prov_playlist_id) -> Playlist:
+    async def get_playlist(self, prov_playlist_id, is_favourite: bool = False) -> Playlist:
         """Get full playlist details by id."""
-        if self._is_catalog_id(prov_playlist_id):
+        if not self.is_library_id(prov_playlist_id):
             endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
         else:
             endpoint = f"me/library/playlists/{prov_playlist_id}"
         endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
         response = await self._get_data(endpoint)
-        return self._parse_playlist(response["data"][0])
+        return self._parse_playlist(response["data"][0], is_favourite)
 
     @use_cache()
     async def get_album_tracks(self, prov_album_id) -> list[Track]:
@@ -464,9 +515,12 @@ class AppleMusicProvider(MusicProvider):
         )
         if not response or "data" not in response:
             return result
+        playlist_track_ids = [track["id"] for track in response["data"] if track and track["id"]]
+        rating_response = await self._get_ratings(playlist_track_ids, MediaType.TRACK)
         for index, track in enumerate(response["data"]):
             if track and track["id"]:
-                parsed_track = self._parse_track(track)
+                is_favourite = rating_response.get(track["id"])
+                parsed_track = self._parse_track(track, is_favourite)
                 parsed_track.position = offset + index + 1
                 result.append(parsed_track)
         return result
@@ -481,7 +535,17 @@ class AppleMusicProvider(MusicProvider):
             # Some artists do not have albums, return empty list
             self.logger.info("No albums found for artist %s", prov_artist_id)
             return []
-        return [self._parse_album(album) for album in response if album["id"]]
+        album_ids = [album["id"] for album in response if album["id"]]
+        rating_response = await self._get_ratings(album_ids, MediaType.ALBUM)
+        albums = []
+        for album in response:
+            if not album["id"]:
+                continue
+            is_favourite = rating_response.get(album["id"])
+            parsed_album = self._parse_album(album, is_favourite)
+            if parsed_album:
+                albums.append(parsed_album)
+        return albums
 
     @use_cache(3600 * 24 * 7)  # cache for 7 days
     async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
@@ -493,7 +557,15 @@ class AppleMusicProvider(MusicProvider):
             # Some artists do not have top tracks, return empty list
             self.logger.info("No top tracks found for artist %s", prov_artist_id)
             return []
-        return [self._parse_track(track) for track in response["data"] if track["id"]]
+        track_ids = [track["id"] for track in response["data"] if track["id"]]
+        rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
+        tracks = []
+        for track in response["data"]:
+            if not track["id"]:
+                continue
+            is_favourite = rating_response.get(track["id"])
+            tracks.append(self._parse_track(track, is_favourite))
+        return tracks
 
     async def library_add(self, item: MediaItemType) -> None:
         """Add item to library."""
@@ -548,9 +620,12 @@ class AppleMusicProvider(MusicProvider):
             response = await self._post_data(endpoint, include="artists")
             if not response or "data" not in response:
                 break
+            track_ids = [track["id"] for track in response["data"] if track and track["id"]]
+            rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
             for track in response["data"]:
                 if track and track["id"]:
-                    found_tracks.append(self._parse_track(track))
+                    is_favourite = rating_response.get(track["id"])
+                    found_tracks.append(self._parse_track(track, is_favourite))
         return found_tracks
 
     async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
@@ -655,7 +730,9 @@ class AppleMusicProvider(MusicProvider):
             artist.metadata.description = notes.get("standard") or notes.get("short")
         return artist
 
-    def _parse_album(self, album_obj: dict) -> Album | ItemMapping | None:
+    def _parse_album(
+        self, album_obj: dict, is_favourite: bool | None = None
+    ) -> Album | ItemMapping | None:
         """Parse album object to generic layout."""
         relationships = album_obj.get("relationships", {})
         response_type = album_obj.get("type")
@@ -747,7 +824,7 @@ class AppleMusicProvider(MusicProvider):
         inferred_type = infer_album_type(album.name, "")
         if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
             album.album_type = inferred_type
-
+        album.favorite = is_favourite or False
         return album
 
     def _parse_track(
@@ -827,7 +904,9 @@ class AppleMusicProvider(MusicProvider):
         track.favorite = is_favourite or False
         return track
 
-    def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
+    def _parse_playlist(
+        self, playlist_obj: dict[str, Any], is_favourite: bool | None = None
+    ) -> Playlist:
         """Parse Apple Music playlist object to generic layout."""
         attributes = playlist_obj["attributes"]
         playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
@@ -861,6 +940,7 @@ class AppleMusicProvider(MusicProvider):
             )
         if description := attributes.get("description"):
             playlist.metadata.description = description.get("standard")
+        playlist.favorite = is_favourite or False
         return playlist
 
     async def _get_all_items(self, endpoint, key="data", **kwargs) -> list[dict]:
@@ -994,7 +1074,10 @@ class AppleMusicProvider(MusicProvider):
             raise NotImplementedError(
                 "Ratings are not available for artist in the Apple Music API."
             )
-        endpoint = self._translate_media_type_to_apple_type(media_type)
+        if len(item_ids) == 0:
+            return {}
+        apple_type = self._translate_media_type_to_apple_type(media_type)
+        endpoint = apple_type if not self.is_library_id(item_ids[0]) else f"library-{apple_type}"
         # Apple Music limits to 200 ids per request
         max_ids_per_request = 200
         results = {}