Add library playlist, library songs and artist albums
authorMarvin Schenkel <marvinschenkel@gmail.com>
Wed, 6 Jul 2022 12:57:46 +0000 (14:57 +0200)
committerMarvin Schenkel <marvinschenkel@gmail.com>
Wed, 6 Jul 2022 12:57:46 +0000 (14:57 +0200)
examples/ytmusic.py
music_assistant/music_providers/ytmusic.py

index 60b6408a12e379cac99e7c9f16fd39526f98cff9..e6247f48a867930758360febe9291f3055262c44 100644 (file)
@@ -126,13 +126,22 @@ async def main():
         # ytm = mass.music.get_provider(ProviderType.YTMUSIC)
         # track = await ytm.get_track("pE3ju1qS848")
         # album = await ytm.get_album("MPREb_AYetWMZunqA")
-        # print(album)
+        # await ytm.get_artist_albums("UCFpQsnvFBNlXi7rwQo6IW7g")
+        # albums = await ytm.get_artist_albums("UCFpQsnvFBNlXi7rwQo6IW7g")*")
         # start sync
-        await mass.music.start_sync(schedule=3)
+        await mass.music.start_sync(schedule=3)
         artists = await mass.music.artists.count()
         print(f"Got {artists} artists in library")
         albums = await mass.music.albums.count()
         print(f"Got {albums} albums in library")
+        tracks = await mass.music.tracks.count()
+        print(f"Got {tracks} tracks in library")
+        playlists = await mass.music.playlists.library()
+        print(f"Got {len(playlists)} playlists in library")
+        # artists = await mass.music.artists.count()
+        # print(f"Got {artists} artists in library")
+        # albums = await mass.music.albums.count()
+        # print(f"Got {albums} albums in library")
         # sd = await yt.get_stream_details(track.item_id)
         # print(sd.data)
 
index dbeb99c1bad6716a533249458d07eddc7835f463..2f7c2cb750f3d54555e9b13752e7ed1c5c3e5b00 100644 (file)
@@ -21,6 +21,7 @@ from music_assistant.models.media_items import (
     MediaItemProviderId,
     MediaItemType,
     MediaType,
+    Playlist,
     StreamDetails,
     Track,
 )
@@ -205,6 +206,96 @@ class YTMusic(MusicProvider):
             )
             yield album
 
+    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+        """Retrieve all library playlists from the provider."""
+        data = {"browseId": "FEmusic_liked_playlists"}
+        response = await self._post_data(endpoint="browse", data=data)
+        response_playlists = response["contents"]["singleColumnBrowseResultsRenderer"][
+            "tabs"
+        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+            "itemSectionRenderer"
+        ][
+            "contents"
+        ][
+            0
+        ][
+            "gridRenderer"
+        ]
+        playlists = ytmusicapi.parsers.browsing.parse_content_list(
+            response_playlists["items"][1:], ytmusicapi.parsers.browsing.parse_playlist
+        )
+        for item in playlists:
+            playlist = Playlist(
+                item_id=str(item["playlistId"]), provider=self.type, name=item["title"]
+            )
+            playlist.metadata.description = item["description"]
+            playlist.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"])
+                for thumb in item["thumbnails"]
+            ]
+            playlist.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["playlistId"]),
+                    prov_type=self.type,
+                    prov_id=self.id,
+                )
+            )
+            yield playlist
+
+    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+        """Retrieve library tracks from Youtube Music."""
+        data = {"browseId": "FEmusic_liked_videos"}
+        response = await self._post_data(endpoint="browse", data=data)
+        parsed_tracks = ytmusicapi.parsers.library.parse_library_songs(response)[
+            "parsed"
+        ]
+        for item in parsed_tracks:
+            track = Track(
+                item_id=str(item["videoId"]),
+                provider=self.type,
+                name=item["title"],
+                duration=item["duration"],
+            )
+            artists = []
+            for artist in item["artists"]:
+                album_artist = Artist(
+                    item_id=artist["id"], name=artist["name"], provider=self.type
+                )
+                album_artist.add_provider_id(
+                    MediaItemProviderId(
+                        item_id=str(artist["id"]), prov_type=self.type, prov_id=self.id
+                    )
+                )
+                artists.append(album_artist)
+            track.artists = artists
+            track.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"])
+                for thumb in item["thumbnails"]
+            ]
+            album = Album(
+                item_id=str(item["album"]["id"]),
+                name=item["album"]["name"],
+                provider=self.type,
+            )
+            album.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["album"]["id"]),
+                    prov_type=self.type,
+                    prov_id=self.id,
+                )
+            )
+            track.album = album
+            track.metadata.explicit = item["isExplicit"]
+            track.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["videoId"]),
+                    prov_type=self.type,
+                    prov_id=self.id,
+                    available=item["isAvailable"],
+                )
+            )
+            yield track
+
     async def get_album(self, prov_album_id) -> Album:
         """Get full album details by id."""
         data = {"browseId": prov_album_id}
@@ -241,6 +332,17 @@ class YTMusic(MusicProvider):
         track_obj = await self._post_data("player", data=data)
         return await self._parse_track(track_obj) if track_obj else None
 
+    async def get_playlist(self, prov_playlist_id) -> Playlist:
+        """Get full playlist details by id."""
+        browse_id = (
+            "VL" + prov_playlist_id
+            if not prov_playlist_id.startswith("VL")
+            else prov_playlist_id
+        )
+        data = {"browseId": browse_id}
+        playlist_response = await self._post_data("browse", data=data)
+        return await self._parse_playlist(playlist_response)
+
     async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
         """Get all playlist tracks for given playlist id."""
         browse_id = (
@@ -264,6 +366,42 @@ class YTMusic(MusicProvider):
             for track in tracks
         ]
 
+    async def get_artist_albums(self, prov_artist_id) -> List[Album]:
+        """Get a list of albums for the given artist."""
+        data = {"browseId": prov_artist_id}
+        response = await self._post_data("browse", data=data)
+        album_response = response["contents"]["singleColumnBrowseResultsRenderer"][
+            "tabs"
+        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+            "musicCarouselShelfRenderer"
+        ][
+            "contents"
+        ]
+        albums = []
+        for item in album_response:
+            album_id = item["musicTwoRowItemRenderer"]["navigationEndpoint"][
+                "browseEndpoint"
+            ]["browseId"]
+            album_name = item["musicTwoRowItemRenderer"]["title"]["runs"][0]["text"]
+            thumbnails = item["musicTwoRowItemRenderer"]["thumbnailRenderer"][
+                "musicThumbnailRenderer"
+            ]["thumbnail"]["thumbnails"]
+            album = album = Album(
+                item_id=album_id,
+                name=album_name,
+                provider=self.type,
+            )
+            album.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
+            ]
+            album.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(album_id), prov_type=self.type, prov_id=self.id
+                )
+            )
+            albums.append(album)
+        return albums
+
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
         signature_timestamp = await self._get_signature_timestamp()
@@ -402,6 +540,36 @@ class YTMusic(MusicProvider):
         )
         return artist
 
+    async def _parse_playlist(self, playlist_response: dict) -> Playlist:
+        """Parse a YT Playlist response to a Playlist object."""
+        playlist_id = playlist_response["contents"][
+            "singleColumnBrowseResultsRenderer"
+        ]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
+            "musicPlaylistShelfRenderer"
+        ][
+            "playlistId"
+        ]
+        title = playlist_response["header"]["musicDetailHeaderRenderer"]["title"][
+            "runs"
+        ][0]["text"]
+        description = playlist_response["header"]["musicDetailHeaderRenderer"][
+            "description"
+        ]["runs"][0]["text"]
+        thumbnails = playlist_response["header"]["musicDetailHeaderRenderer"][
+            "thumbnail"
+        ]["croppedSquareThumbnailRenderer"]["thumbnail"]["thumbnails"]
+        playlist = Playlist(item_id=str(playlist_id), provider=self.type, name=title)
+        playlist.metadata.description = description
+        playlist.metadata.images = [
+            MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
+        ]
+        playlist.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(playlist_id), prov_type=self.type, prov_id=self.id
+            )
+        )
+        return playlist
+
     async def _parse_track(self, track_obj: dict) -> Track:
         """Parse a YT Track response to a Track model object."""
         track = Track(