Youtube Music: Add library add/remove and playlist track add/remove functionality...
authorMarvin Schenkel <marvinschenkel@gmail.com>
Sat, 16 Jul 2022 10:01:54 +0000 (12:01 +0200)
committerGitHub <noreply@github.com>
Sat, 16 Jul 2022 10:01:54 +0000 (12:01 +0200)
* Add library add/remove functions

* Add library add/remove functions

* Add lib add/remove album, playlist and artist

* Add playlist track add/remove

* Add auth for private playlists

* Fix playlist track remove

music_assistant/music_providers/ytmusic/helpers.py
music_assistant/music_providers/ytmusic/ytmusic.py

index 5b987ce777e5b58a35c31a7534ad3cdea64e9904..b1bb0fec571cf7e22b9afda459e2ba821cf0a8b1 100644 (file)
@@ -43,12 +43,13 @@ async def get_album(prov_album_id: str) -> Dict[str, str]:
 
 
 async def get_playlist(
-    prov_playlist_id: str, headers: Dict[str, str]
+    prov_playlist_id: str, headers: Dict[str, str], username: str
 ) -> Dict[str, str]:
     """Async wrapper around the ytmusicapi get_playlist function."""
 
     def _get_playlist():
-        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
+        user = username if is_brand_account(username) else None
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
         playlist = ytm.get_playlist(playlistId=prov_playlist_id)
         playlist["checksum"] = get_playlist_checksum(playlist)
         return playlist
@@ -89,7 +90,7 @@ async def get_library_artists(headers: Dict[str, str], username: str) -> Dict[st
     def _get_library_artists():
         user = username if is_brand_account(username) else None
         ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
-        artists = ytm.get_library_artists(limit=9999)
+        artists = ytm.get_library_subscriptions(limit=9999)
         # Sync properties with uniformal artist object
         for artist in artists:
             artist["id"] = artist["browseId"]
@@ -147,6 +148,84 @@ async def get_library_tracks(headers: Dict[str, str], username: str) -> Dict[str
     return await loop.run_in_executor(None, _get_library_tracks)
 
 
+async def library_add_remove_artist(
+    headers: Dict[str, str], prov_artist_id: str, add: bool = True, username: str = None
+) -> bool:
+    """Add or remove an artist to the user's library."""
+
+    def _library_add_remove_artist():
+        user = username if is_brand_account(username) else None
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+        if add:
+            return "actions" in ytm.subscribe_artists(channelIds=[prov_artist_id])
+        if not add:
+            return "actions" in ytm.unsubscribe_artists(channelIds=[prov_artist_id])
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _library_add_remove_artist)
+
+
+async def library_add_remove_album(
+    headers: Dict[str, str], prov_item_id: str, add: bool = True, username: str = None
+) -> bool:
+    """Add or remove an album or playlist to the user's library."""
+    album = await get_album(prov_album_id=prov_item_id)
+
+    def _library_add_remove_album():
+        user = username if is_brand_account(username) else None
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+        playlist_id = album["audioPlaylistId"]
+        if add:
+            return ytm.rate_playlist(playlist_id, "LIKE")
+        if not add:
+            return ytm.rate_playlist(playlist_id, "INDIFFERENT")
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _library_add_remove_album)
+
+
+async def library_add_remove_playlist(
+    headers: Dict[str, str], prov_item_id: str, add: bool = True, username: str = None
+) -> bool:
+    """Add or remove an album or playlist to the user's library."""
+
+    def _library_add_remove_playlist():
+        user = username if is_brand_account(username) else None
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+        if add:
+            return "actions" in ytm.rate_playlist(prov_item_id, "LIKE")
+        if not add:
+            return "actions" in ytm.rate_playlist(prov_item_id, "INDIFFERENT")
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _library_add_remove_playlist)
+
+
+async def add_remove_playlist_tracks(
+    headers: Dict[str, str],
+    prov_playlist_id: str,
+    prov_track_ids: List[str],
+    add: bool,
+    username: str = None,
+) -> bool:
+    """Async wrapper around adding/removing tracks to a playlist."""
+
+    def _add_playlist_tracks():
+        user = username if is_brand_account(username) else None
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+        if add:
+            return ytm.add_playlist_items(
+                playlistId=prov_playlist_id, videoIds=prov_track_ids
+            )
+        if not add:
+            return ytm.remove_playlist_items(
+                playlistId=prov_playlist_id, videos=prov_track_ids
+            )
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _add_playlist_tracks)
+
+
 async def search(query: str, ytm_filter: str = None, limit: int = 20) -> List[Dict]:
     """Async wrapper around the ytmusicapi search function."""
 
index df74edd22250e0022973692d08fdf511074a39c6..fc7184241af72d94e21e1d52310458ff1f127dca 100644 (file)
@@ -30,6 +30,7 @@ from music_assistant.models.media_items import (
 )
 from music_assistant.models.music_provider import MusicProvider
 from music_assistant.music_providers.ytmusic.helpers import (
+    add_remove_playlist_tracks,
     get_album,
     get_artist,
     get_library_albums,
@@ -38,6 +39,9 @@ from music_assistant.music_providers.ytmusic.helpers import (
     get_library_tracks,
     get_playlist,
     get_track,
+    library_add_remove_album,
+    library_add_remove_artist,
+    library_add_remove_playlist,
     search,
 )
 
@@ -198,14 +202,18 @@ class YoutubeMusicProvider(MusicProvider):
     async def get_playlist(self, prov_playlist_id) -> Playlist:
         """Get full playlist details by id."""
         playlist_obj = await get_playlist(
-            prov_playlist_id=prov_playlist_id, headers=self._headers
+            prov_playlist_id=prov_playlist_id,
+            headers=self._headers,
+            username=self.config.username,
         )
         return await self._parse_playlist(playlist_obj)
 
     async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
         """Get all playlist tracks for given playlist id."""
         playlist_obj = await get_playlist(
-            prov_playlist_id=prov_playlist_id, headers=self._headers
+            prov_playlist_id=prov_playlist_id,
+            headers=self._headers,
+            username=self.config.username,
         )
         if "tracks" in playlist_obj:
             tracks = []
@@ -249,6 +257,100 @@ class YoutubeMusicProvider(MusicProvider):
             ]
         return []
 
+    async def library_add(self, prov_item_id, media_type: MediaType) -> None:
+        """Add an item to the library."""
+        result = False
+        if media_type == MediaType.ARTIST:
+            result = await library_add_remove_artist(
+                headers=self._headers,
+                prov_artist_id=prov_item_id,
+                add=True,
+                username=self.config.username,
+            )
+        elif media_type == MediaType.ALBUM:
+            result = await library_add_remove_album(
+                headers=self._headers,
+                prov_item_id=prov_item_id,
+                add=True,
+                username=self.config.username,
+            )
+        elif media_type == MediaType.PLAYLIST:
+            result = await library_add_remove_playlist(
+                headers=self._headers,
+                prov_item_id=prov_item_id,
+                add=True,
+                username=self.config.username,
+            )
+        elif media_type == MediaType.TRACK:
+            raise NotImplementedError
+        return result
+
+    async def library_remove(self, prov_item_id, media_type: MediaType):
+        """Remove an item from the library."""
+        result = False
+        if media_type == MediaType.ARTIST:
+            result = await library_add_remove_artist(
+                headers=self._headers,
+                prov_artist_id=prov_item_id,
+                add=False,
+                username=self.config.username,
+            )
+        elif media_type == MediaType.ALBUM:
+            result = await library_add_remove_album(
+                headers=self._headers,
+                prov_item_id=prov_item_id,
+                add=False,
+                username=self.config.username,
+            )
+        elif media_type == MediaType.PLAYLIST:
+            result = await library_add_remove_playlist(
+                headers=self._headers,
+                prov_item_id=prov_item_id,
+                add=False,
+                username=self.config.username,
+            )
+        elif media_type == MediaType.TRACK:
+            raise NotImplementedError
+        return result
+
+    async def add_playlist_tracks(
+        self, prov_playlist_id: str, prov_track_ids: List[str]
+    ) -> None:
+        """Add track(s) to playlist."""
+        return await add_remove_playlist_tracks(
+            headers=self._headers,
+            prov_playlist_id=prov_playlist_id,
+            prov_track_ids=prov_track_ids,
+            add=True,
+            username=self.config.username,
+        )
+
+    async def remove_playlist_tracks(
+        self, prov_playlist_id: str, prov_track_ids: List[str]
+    ) -> None:
+        """Remove track(s) from playlist."""
+        # YT needs both the videoId and de setVideoId in order to remove
+        # the track. Thus, we need to obtain the playlist details and
+        # grab the info from there.
+        playlist_obj = await get_playlist(
+            prov_playlist_id=prov_playlist_id,
+            headers=self._headers,
+            username=self.config.username,
+        )
+        if playlist_obj.get("tracks"):
+            tracks_to_delete = [
+                {"videoId": track["videoId"], "setVideoId": track["setVideoId"]}
+                for track in playlist_obj.get("tracks")
+                if track.get("videoId") in prov_track_ids
+            ]
+            return await add_remove_playlist_tracks(
+                headers=self._headers,
+                prov_playlist_id=prov_playlist_id,
+                prov_track_ids=tracks_to_delete,
+                add=False,
+                username=self.config.username,
+            )
+
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
         data = {
@@ -416,6 +518,10 @@ class YoutubeMusicProvider(MusicProvider):
             playlist.metadata.images = await self._parse_thumbnails(
                 playlist_obj["thumbnails"]
             )
+        is_editable = False
+        if playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE":
+            is_editable = True
+        playlist.is_editable = is_editable
         playlist.add_provider_id(
             MediaItemProviderId(
                 item_id=playlist_obj["id"], prov_type=self.type, prov_id=self.id