Add get_album_tracks and get_playlist_tracks
authorMarvin Schenkel <marvinschenkel@gmail.com>
Sun, 3 Jul 2022 10:27:00 +0000 (12:27 +0200)
committerMarvin Schenkel <marvinschenkel@gmail.com>
Wed, 6 Jul 2022 14:05:25 +0000 (16:05 +0200)
examples/ytmusic.py
music_assistant/controllers/music/__init__.py
music_assistant/music_providers/ytmusic.py

index d5b8279935425aee9ea16b5382716880aaf7cf0d..2195eff7d582c21ba60b3de75ef0817a3ca22bf3 100644 (file)
@@ -38,11 +38,8 @@ mass_conf = MassConfig(
     database_url=f"sqlite:///{db_file}",
 )
 
-mass_conf.providers.append(
-    MusicProviderConfig(
-        ProviderType.YTMUSIC
-    )
-)
+mass_conf.providers.append(MusicProviderConfig(ProviderType.YTMUSIC))
+
 
 class TestPlayer(Player):
     """Demonstatration player implementation."""
@@ -98,6 +95,7 @@ class TestPlayer(Player):
         self._attr_volume_level = volume_level
         self.update_state()
 
+
 async def main():
     """Handle main execution."""
 
@@ -106,20 +104,21 @@ async def main():
     async with MusicAssistant(mass_conf) as mass:
         # get some data
         yt = mass.music.get_provider(ProviderType.YTMUSIC)
-        track = await yt.get_track("pE3ju1qS848")      
-        await yt.get_album("MPREb_AYetWMZunqA")
-        artist = await yt.get_artist("UC-T-wUsgssr7Vy-BiJD4-2g")
-        #print(artist)
-        for album in await yt.search("dark side of the moon", [MediaType.ALBUM]):
-            print(album)
-            print("**********")
         
-        #test_player1 = TestPlayer("test1")
-        #await mass.players.register_player(test_player1)
-        #await test_player1.active_queue.play_media(track)
+        results = await yt.search("dark side of the moon", [MediaType.ALBUM])
+        tracks = await yt.get_album_tracks(results[0].item_id)
 
-        #await asyncio.sleep(3600)
-        
+        stream_details = await yt.get_stream_details(tracks[0].item_id)
+        print(stream_details.data)
+
+        #artist = await yt.get_artist("UC-T-wUsgssr7Vy-BiJD4-2g")
+        # print(artist)
+
+        # test_player1 = TestPlayer("test1")
+        # await mass.players.register_player(test_player1)
+        # await test_player1.active_queue.play_media(track)
+
+        # await asyncio.sleep(3600)
 
 
 if __name__ == "__main__":
index 26b344d1f31a3fd476a1dce3722ea3dcf7d47a75..d3f080b54d87bb0c6e14bb659f771671f43f59e6 100755 (executable)
@@ -42,7 +42,7 @@ PROV_MAP = {
     ProviderType.SPOTIFY: SpotifyProvider,
     ProviderType.QOBUZ: QobuzProvider,
     ProviderType.TUNEIN: TuneInProvider,
-    ProviderType.YTMUSIC: YTMusic
+    ProviderType.YTMUSIC: YTMusic,
 }
 
 
index 56cd4ce71abaa1ee3e24d6a7c9a88b818a5f2f4c..9d3802e6143607672dff9057a1ea3faa37189d6a 100644 (file)
@@ -1,17 +1,13 @@
 """YT Music support for MusicAssistant"""
+from email.utils import parseaddr
 import json
-import requests
 import re
 from requests.structures import CaseInsensitiveDict
 from typing import AsyncGenerator, Dict, List, Optional
 from urllib.parse import unquote
+from yaml import parse
 
 import ytmusicapi
-from ytmusicapi.navigation import (
-    nav,
-    SINGLE_COLUMN_TAB,
-    SECTION_LIST
-)
 import pytube
 
 from music_assistant.models.enums import ProviderType
@@ -23,9 +19,7 @@ from music_assistant.models.media_items import (
     ContentType,
     ImageType,
     MediaItemImage,
-    MediaItemProviderId,
     MediaItemType,
-    MediaQuality,
     MediaType,
     Playlist,
     StreamDetails,
@@ -57,7 +51,7 @@ class YTMusic(MusicProvider):
         """Sets up the YTMusic provider"""
         self._headers = await self._initialize_headers()
         self._context = await self._initialize_context()
-        self._cookies = {'CONSENT': 'YES+1'}
+        self._cookies = {"CONSENT": "YES+1"}
         return True
 
     async def search(
@@ -69,7 +63,7 @@ class YTMusic(MusicProvider):
             :param media_types: A list of media_types to include. All types if None.
             :param limit: Number of items to return in the search (per type).
         """
-        data = {'query': search_query}
+        data = {"query": search_query}
         filter = None
         if len(media_types) == 1:
             # YTM does not support multiple searchtypes, falls back to all if no type given
@@ -81,86 +75,98 @@ class YTMusic(MusicProvider):
                 filter = "songs"
             if media_types[0] == MediaType.PLAYLIST:
                 filter = "playlists"
-        params = ytmusicapi.parsers.search_params.get_search_params(filter=filter, scope=None, ignore_spelling=False)
+        params = ytmusicapi.parsers.search_params.get_search_params(
+            filter=filter, scope=None, ignore_spelling=False
+        )
         data["params"] = params
         search_results = await self._post_data(endpoint="search", data=data)
-        if 'tabbedSearchResultsRenderer' in search_results['contents']:
-            results = search_results['contents']['tabbedSearchResultsRenderer']['tabs'][0][
-                'tabRenderer']['content']
+        if "tabbedSearchResultsRenderer" in search_results["contents"]:
+            results = search_results["contents"]["tabbedSearchResultsRenderer"]["tabs"][
+                0
+            ]["tabRenderer"]["content"]
         else:
-            results = search_results['contents']
+            results = search_results["contents"]
         parsed_results = []
         for result in results["sectionListRenderer"]["contents"]:
             if "musicShelfRenderer" in result:
-                category = result["musicShelfRenderer"]["title"]["runs"][0]["text"]                
+                category = result["musicShelfRenderer"]["title"]["runs"][0]["text"]
                 if category == "Artists":
-                    for artist in result['musicShelfRenderer']['contents']:
-                        artist_id = artist["musicResponsiveListItemRenderer"]["navigationEndpoint"]["browseEndpoint"]["browseId"]
+                    for artist in result["musicShelfRenderer"]["contents"]:
+                        artist_id = artist["musicResponsiveListItemRenderer"][
+                            "navigationEndpoint"
+                        ]["browseEndpoint"]["browseId"]
                         parsed_results.append(await self.get_artist(artist_id))
                 elif category == "Albums":
-                    for album in result['musicShelfRenderer']['contents']:
-                        album_id = album["musicResponsiveListItemRenderer"]["navigationEndpoint"]["browseEndpoint"]["browseId"]
+                    for album in result["musicShelfRenderer"]["contents"]:
+                        album_id = album["musicResponsiveListItemRenderer"][
+                            "navigationEndpoint"
+                        ]["browseEndpoint"]["browseId"]
                         parsed_results.append(await self.get_album(album_id))
-
+                elif category == "Songs":
+                    for song in result["musicShelfRenderer"]["contents"]:
+                        song_id = song["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
+                        parsed_results.append(await self.get_track(song_id))
                 else:
                     print(category)
         return parsed_results
 
-
     async def get_album(self, prov_album_id) -> Album:
         """Get full album details by id."""
         data = {"browseId": prov_album_id}
         album_obj = await self._post_data(endpoint="browse", data=data)
-        return (
-            await self._parse_album(album_obj=album_obj)
-            if album_obj
-            else None
-        )
+        return await self._parse_album(album_obj=album_obj, album_id=prov_album_id) if album_obj else None
+
+    async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
+        """Get album tracks for given album id."""
+        data = {"browseId": prov_album_id}
+        album_obj = await self._post_data(endpoint="browse", data=data)        
+        parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
+        album_playlist_id = parsed_album["audioPlaylistId"]
+        return await self.get_playlist_tracks(album_playlist_id)
 
     async def get_artist(self, prov_artist_id) -> Artist:
         """Get full artist details by id"""
         data = {"browseId": prov_artist_id}
         artist_obj = await self._post_data(endpoint="browse", data=data)
-        return (
-            await self._parse_artist(artist_obj=artist_obj)
-            if artist_obj
-            else None
-        )
+        return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
 
     async def get_track(self, prov_track_id) -> Track:
         """Get full track details by id."""
         signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
         data = {
             "playbackContext": {
-                "contentPlaybackContext": {
-                    "signatureTimestamp": signature_timestamp
-                }
+                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
             },
-            "video_id": prov_track_id
+            "video_id": prov_track_id,
         }
         track_obj = await self._post_data("player", data=data)
         return await self._parse_track(track_obj) if track_obj else None
 
+    async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
+        """Get all playlist tracks for given playlist id."""
+        browseId = "VL" + prov_playlist_id if not prov_playlist_id.startswith("VL") else prov_playlist_id
+        data = {'browseId': browseId}
+        playlist_obj = await self._post_data("browse", data=data)
+        tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0]["musicPlaylistShelfRenderer"]["contents"]
+        return [
+            await self.get_track(track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"])
+            for track in tracks
+        ]
+
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
         signature_timestamp = await self._get_signature_timestamp()
         data = {
             "playbackContext": {
-                "contentPlaybackContext": {
-                    "signatureTimestamp": signature_timestamp
-                }
+                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
             },
-            "video_id": item_id
+            "video_id": item_id,
         }
         track_obj = await self._post_data("player", data=data)
         stream_format = await self._parse_stream_format(track_obj)
         url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
-
         return StreamDetails(
-            provider=self.type,
-            item_id=item_id,
-            data=url,
-            content_type=ContentType.AAC
+            provider=self.type, item_id=item_id, data=url, content_type=ContentType.AAC
         )
 
     async def get_audio_stream(
@@ -175,9 +181,12 @@ class YTMusic(MusicProvider):
     async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
         url = f"{YTM_BASE_URL}{endpoint}"
         data.update(self._context)
-
         async with self.mass.http_session.post(
-            url, headers=self._headers, json=data, verify_ssl=False, cookies=self._cookies
+            url,
+            headers=self._headers,
+            json=data,
+            verify_ssl=False,
+            cookies=self._cookies,
         ) as response:
             return await response.json()
 
@@ -196,8 +205,10 @@ class YTMusic(MusicProvider):
             headers = CaseInsensitiveDict(json.load(json_file))
         cookie = headers.get("cookie")
         sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
-        origin = headers.get('origin', headers.get('x-origin'))
-        headers["Authorization"] = ytmusicapi.helpers.get_authorization(sapisid + ' ' + origin)
+        origin = headers.get("origin", headers.get("x-origin"))
+        headers["Authorization"] = ytmusicapi.helpers.get_authorization(
+            sapisid + " " + origin
+        )
 
         return headers
 
@@ -205,23 +216,20 @@ class YTMusic(MusicProvider):
         """Returns a dict to use as a context in requests"""
         return {
             "context": {
-                "client": {
-                    "clientName": "WEB_REMIX",
-                    "clientVersion": "0.1"
-                },
-                "user": {}
+                "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
+                "user": {},
             }
         }
 
-    async def _parse_album(self, album_obj: dict) -> Album:
+    async def _parse_album(self, album_obj: dict, album_id: str) -> Album:
         """Parses a YT Album response to an Album model object"""
         parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
         album = Album(
-            item_id = parsed_album["audioPlaylistId"],
-            name = parsed_album["title"],
-            year = parsed_album["year"],
-            album_type = AlbumType.ALBUM,
-            provider = self.type
+            item_id=album_id,
+            name=parsed_album["title"],
+            year=parsed_album["year"],
+            album_type=AlbumType.ALBUM,
+            provider=self.type,
         )
         images = []
         for thumb in parsed_album["thumbnails"]:
@@ -231,22 +239,30 @@ class YTMusic(MusicProvider):
             album.metadata.description = unquote(parsed_album["description"])
         artists = []
         for artist in parsed_album["artists"]:
-            artists.append(await self.get_artist(artist["id"]))
+            if artist["id"]:
+                artists.append(await self.get_artist(artist["id"]))
         album.artists = artists
         return album
 
     async def _parse_artist(self, artist_obj: dict) -> Artist:
         """Parse a YT Artist response to Artist model object"""
-        print(json.dumps(artist_obj))
-        name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0]["text"]
-        id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"]["subscribeButtonRenderer"]["channelId"]
-        artist = Artist(
-           item_id=str(id), provider=self.type, name=name
-        )
+        name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0][
+            "text"
+        ]
+        artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"][
+            "subscribeButtonRenderer"
+        ]["channelId"]
+        artist = Artist(item_id=str(artist_id), provider=self.type, name=name)
         if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
-            artist.metadata.description = unquote(artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"]["runs"][0]["text"])
+            artist.metadata.description = unquote(
+                artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"][
+                    "runs"
+                ][0]["text"]
+            )
         images = []
-        for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"]["thumbnail"]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]:
+        for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"]["thumbnail"][
+            "musicThumbnailRenderer"
+        ]["thumbnail"]["thumbnails"]:
             images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
         artist.metadata.images = images
         return artist
@@ -257,12 +273,18 @@ class YTMusic(MusicProvider):
             item_id=track_obj["videoDetails"]["videoId"],
             provider=self.type,
             name=track_obj["videoDetails"]["title"],
-            duration=track_obj["videoDetails"]["lengthSeconds"]
+            duration=track_obj["videoDetails"]["lengthSeconds"],
+        )
+        artist = await self.get_artist(
+            track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][
+                "externalChannelId"
+            ]
         )
-        artist = await self.get_artist(track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"]["externalChannelId"])
         track.artists = [artist]
         images = []
-        for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"]["thumbnails"]:
+        for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][
+            "thumbnails"
+        ]:
             images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
         return track
 
@@ -270,23 +292,26 @@ class YTMusic(MusicProvider):
         """Grabs the highes available audio stream from available streams"""
         stream_format = None
         for format in track_obj["streamingData"]["adaptiveFormats"]:
-            if format["mimeType"].startswith("audio") and format["audioQuality"] == "AUDIO_QUALITY_HIGH":
-                stream_format = format        
+            if (
+                format["mimeType"].startswith("audio")
+                and format["audioQuality"] == "AUDIO_QUALITY_HIGH"
+            ):
+                stream_format = format
         if stream_format is None:
             raise MediaNotFoundError("No stream found for this track")
         return stream_format
 
     async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
         """Figures out the stream URL to use based on the YT track object"""
-        cipherParts = dict()
+        cipher_parts = dict()
         for part in stream_format["signatureCipher"].split("&"):
             k, v = part.split("=", maxsplit=1)
-            cipherParts[k] = unquote(v)
-
-        signature = await self._decipher_signature(ciphered_signature=cipherParts["s"], item_id=item_id)        
-        url = cipherParts["url"] + "&sig=" + signature
-        
-        return url        
+            cipher_parts[k] = unquote(v)
+        signature = await self._decipher_signature(
+            ciphered_signature=cipher_parts["s"], item_id=item_id
+        )
+        url = cipher_parts["url"] + "&sig=" + signature
+        return url
 
     async def _decipher_signature(self, ciphered_signature: str, item_id: str):
         """Decipher the signature, required to build the Stream URL"""
@@ -303,11 +328,9 @@ class YTMusic(MusicProvider):
         match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
         if match is None:
             raise Exception("Could not identify the URL for base.js player.")
-
         url = YTM_DOMAIN + match.group(1)
         response = await self._get_data(url=url)
         match = re.search(r"signatureTimestamp[:=](\d+)", response)
         if match is None:
             raise Exception("Unable to identify the signatureTimestamp.")
-
         return int(match.group(1))