From: Marvin Schenkel Date: Sun, 3 Jul 2022 10:27:00 +0000 (+0200) Subject: Add get_album_tracks and get_playlist_tracks X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=b6a34483127e9de26f097e4744d2c309685f6f4d;p=music-assistant-server.git Add get_album_tracks and get_playlist_tracks --- diff --git a/examples/ytmusic.py b/examples/ytmusic.py index d5b82799..2195eff7 100644 --- a/examples/ytmusic.py +++ b/examples/ytmusic.py @@ -38,11 +38,8 @@ mass_conf = MassConfig( database_url=f"sqlite:///{db_file}", ) -mass_conf.providers.append( - MusicProviderConfig( - ProviderType.YTMUSIC - ) -) +mass_conf.providers.append(MusicProviderConfig(ProviderType.YTMUSIC)) + class TestPlayer(Player): """Demonstatration player implementation.""" @@ -98,6 +95,7 @@ class TestPlayer(Player): self._attr_volume_level = volume_level self.update_state() + async def main(): """Handle main execution.""" @@ -106,20 +104,21 @@ async def main(): async with MusicAssistant(mass_conf) as mass: # get some data yt = mass.music.get_provider(ProviderType.YTMUSIC) - track = await yt.get_track("pE3ju1qS848") - await yt.get_album("MPREb_AYetWMZunqA") - artist = await yt.get_artist("UC-T-wUsgssr7Vy-BiJD4-2g") - #print(artist) - for album in await yt.search("dark side of the moon", [MediaType.ALBUM]): - print(album) - print("**********") - #test_player1 = TestPlayer("test1") - #await mass.players.register_player(test_player1) - #await test_player1.active_queue.play_media(track) + results = await yt.search("dark side of the moon", [MediaType.ALBUM]) + tracks = await yt.get_album_tracks(results[0].item_id) - #await asyncio.sleep(3600) - + stream_details = await yt.get_stream_details(tracks[0].item_id) + print(stream_details.data) + + #artist = await yt.get_artist("UC-T-wUsgssr7Vy-BiJD4-2g") + # print(artist) + + # test_player1 = TestPlayer("test1") + # await mass.players.register_player(test_player1) + # await test_player1.active_queue.play_media(track) + + # await asyncio.sleep(3600) if __name__ == "__main__": diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 26b344d1..d3f080b5 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -42,7 +42,7 @@ PROV_MAP = { ProviderType.SPOTIFY: SpotifyProvider, ProviderType.QOBUZ: QobuzProvider, ProviderType.TUNEIN: TuneInProvider, - ProviderType.YTMUSIC: YTMusic + ProviderType.YTMUSIC: YTMusic, } diff --git a/music_assistant/music_providers/ytmusic.py b/music_assistant/music_providers/ytmusic.py index 56cd4ce7..9d3802e6 100644 --- a/music_assistant/music_providers/ytmusic.py +++ b/music_assistant/music_providers/ytmusic.py @@ -1,17 +1,13 @@ """YT Music support for MusicAssistant""" +from email.utils import parseaddr import json -import requests import re from requests.structures import CaseInsensitiveDict from typing import AsyncGenerator, Dict, List, Optional from urllib.parse import unquote +from yaml import parse import ytmusicapi -from ytmusicapi.navigation import ( - nav, - SINGLE_COLUMN_TAB, - SECTION_LIST -) import pytube from music_assistant.models.enums import ProviderType @@ -23,9 +19,7 @@ from music_assistant.models.media_items import ( ContentType, ImageType, MediaItemImage, - MediaItemProviderId, MediaItemType, - MediaQuality, MediaType, Playlist, StreamDetails, @@ -57,7 +51,7 @@ class YTMusic(MusicProvider): """Sets up the YTMusic provider""" self._headers = await self._initialize_headers() self._context = await self._initialize_context() - self._cookies = {'CONSENT': 'YES+1'} + self._cookies = {"CONSENT": "YES+1"} return True async def search( @@ -69,7 +63,7 @@ class YTMusic(MusicProvider): :param media_types: A list of media_types to include. All types if None. :param limit: Number of items to return in the search (per type). """ - data = {'query': search_query} + data = {"query": search_query} filter = None if len(media_types) == 1: # YTM does not support multiple searchtypes, falls back to all if no type given @@ -81,86 +75,98 @@ class YTMusic(MusicProvider): filter = "songs" if media_types[0] == MediaType.PLAYLIST: filter = "playlists" - params = ytmusicapi.parsers.search_params.get_search_params(filter=filter, scope=None, ignore_spelling=False) + params = ytmusicapi.parsers.search_params.get_search_params( + filter=filter, scope=None, ignore_spelling=False + ) data["params"] = params search_results = await self._post_data(endpoint="search", data=data) - if 'tabbedSearchResultsRenderer' in search_results['contents']: - results = search_results['contents']['tabbedSearchResultsRenderer']['tabs'][0][ - 'tabRenderer']['content'] + if "tabbedSearchResultsRenderer" in search_results["contents"]: + results = search_results["contents"]["tabbedSearchResultsRenderer"]["tabs"][ + 0 + ]["tabRenderer"]["content"] else: - results = search_results['contents'] + results = search_results["contents"] parsed_results = [] for result in results["sectionListRenderer"]["contents"]: if "musicShelfRenderer" in result: - category = result["musicShelfRenderer"]["title"]["runs"][0]["text"] + category = result["musicShelfRenderer"]["title"]["runs"][0]["text"] if category == "Artists": - for artist in result['musicShelfRenderer']['contents']: - artist_id = artist["musicResponsiveListItemRenderer"]["navigationEndpoint"]["browseEndpoint"]["browseId"] + for artist in result["musicShelfRenderer"]["contents"]: + artist_id = artist["musicResponsiveListItemRenderer"][ + "navigationEndpoint" + ]["browseEndpoint"]["browseId"] parsed_results.append(await self.get_artist(artist_id)) elif category == "Albums": - for album in result['musicShelfRenderer']['contents']: - album_id = album["musicResponsiveListItemRenderer"]["navigationEndpoint"]["browseEndpoint"]["browseId"] + for album in result["musicShelfRenderer"]["contents"]: + album_id = album["musicResponsiveListItemRenderer"][ + "navigationEndpoint" + ]["browseEndpoint"]["browseId"] parsed_results.append(await self.get_album(album_id)) - + elif category == "Songs": + for song in result["musicShelfRenderer"]["contents"]: + song_id = song["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"] + parsed_results.append(await self.get_track(song_id)) else: print(category) return parsed_results - async def get_album(self, prov_album_id) -> Album: """Get full album details by id.""" data = {"browseId": prov_album_id} album_obj = await self._post_data(endpoint="browse", data=data) - return ( - await self._parse_album(album_obj=album_obj) - if album_obj - else None - ) + return await self._parse_album(album_obj=album_obj, album_id=prov_album_id) if album_obj else None + + async def get_album_tracks(self, prov_album_id: str) -> List[Track]: + """Get album tracks for given album id.""" + data = {"browseId": prov_album_id} + album_obj = await self._post_data(endpoint="browse", data=data) + parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) + album_playlist_id = parsed_album["audioPlaylistId"] + return await self.get_playlist_tracks(album_playlist_id) async def get_artist(self, prov_artist_id) -> Artist: """Get full artist details by id""" data = {"browseId": prov_artist_id} artist_obj = await self._post_data(endpoint="browse", data=data) - return ( - await self._parse_artist(artist_obj=artist_obj) - if artist_obj - else None - ) + return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None async def get_track(self, prov_track_id) -> Track: """Get full track details by id.""" signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1 data = { "playbackContext": { - "contentPlaybackContext": { - "signatureTimestamp": signature_timestamp - } + "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} }, - "video_id": prov_track_id + "video_id": prov_track_id, } track_obj = await self._post_data("player", data=data) return await self._parse_track(track_obj) if track_obj else None + async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: + """Get all playlist tracks for given playlist id.""" + browseId = "VL" + prov_playlist_id if not prov_playlist_id.startswith("VL") else prov_playlist_id + data = {'browseId': browseId} + playlist_obj = await self._post_data("browse", data=data) + tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0]["musicPlaylistShelfRenderer"]["contents"] + return [ + await self.get_track(track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]) + for track in tracks + ] + async def get_stream_details(self, item_id: str) -> StreamDetails: """Return the content details for the given track when it will be streamed.""" signature_timestamp = await self._get_signature_timestamp() data = { "playbackContext": { - "contentPlaybackContext": { - "signatureTimestamp": signature_timestamp - } + "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} }, - "video_id": item_id + "video_id": item_id, } track_obj = await self._post_data("player", data=data) stream_format = await self._parse_stream_format(track_obj) url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id) - return StreamDetails( - provider=self.type, - item_id=item_id, - data=url, - content_type=ContentType.AAC + provider=self.type, item_id=item_id, data=url, content_type=ContentType.AAC ) async def get_audio_stream( @@ -175,9 +181,12 @@ class YTMusic(MusicProvider): async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs): url = f"{YTM_BASE_URL}{endpoint}" data.update(self._context) - async with self.mass.http_session.post( - url, headers=self._headers, json=data, verify_ssl=False, cookies=self._cookies + url, + headers=self._headers, + json=data, + verify_ssl=False, + cookies=self._cookies, ) as response: return await response.json() @@ -196,8 +205,10 @@ class YTMusic(MusicProvider): headers = CaseInsensitiveDict(json.load(json_file)) cookie = headers.get("cookie") sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie) - origin = headers.get('origin', headers.get('x-origin')) - headers["Authorization"] = ytmusicapi.helpers.get_authorization(sapisid + ' ' + origin) + origin = headers.get("origin", headers.get("x-origin")) + headers["Authorization"] = ytmusicapi.helpers.get_authorization( + sapisid + " " + origin + ) return headers @@ -205,23 +216,20 @@ class YTMusic(MusicProvider): """Returns a dict to use as a context in requests""" return { "context": { - "client": { - "clientName": "WEB_REMIX", - "clientVersion": "0.1" - }, - "user": {} + "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"}, + "user": {}, } } - async def _parse_album(self, album_obj: dict) -> Album: + async def _parse_album(self, album_obj: dict, album_id: str) -> Album: """Parses a YT Album response to an Album model object""" parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) album = Album( - item_id = parsed_album["audioPlaylistId"], - name = parsed_album["title"], - year = parsed_album["year"], - album_type = AlbumType.ALBUM, - provider = self.type + item_id=album_id, + name=parsed_album["title"], + year=parsed_album["year"], + album_type=AlbumType.ALBUM, + provider=self.type, ) images = [] for thumb in parsed_album["thumbnails"]: @@ -231,22 +239,30 @@ class YTMusic(MusicProvider): album.metadata.description = unquote(parsed_album["description"]) artists = [] for artist in parsed_album["artists"]: - artists.append(await self.get_artist(artist["id"])) + if artist["id"]: + artists.append(await self.get_artist(artist["id"])) album.artists = artists return album async def _parse_artist(self, artist_obj: dict) -> Artist: """Parse a YT Artist response to Artist model object""" - print(json.dumps(artist_obj)) - name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0]["text"] - id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"]["subscribeButtonRenderer"]["channelId"] - artist = Artist( - item_id=str(id), provider=self.type, name=name - ) + name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0][ + "text" + ] + artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"][ + "subscribeButtonRenderer" + ]["channelId"] + artist = Artist(item_id=str(artist_id), provider=self.type, name=name) if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]: - artist.metadata.description = unquote(artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"]["runs"][0]["text"]) + artist.metadata.description = unquote( + artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"][ + "runs" + ][0]["text"] + ) images = [] - for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"]["thumbnail"]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]: + for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"]["thumbnail"][ + "musicThumbnailRenderer" + ]["thumbnail"]["thumbnails"]: images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) artist.metadata.images = images return artist @@ -257,12 +273,18 @@ class YTMusic(MusicProvider): item_id=track_obj["videoDetails"]["videoId"], provider=self.type, name=track_obj["videoDetails"]["title"], - duration=track_obj["videoDetails"]["lengthSeconds"] + duration=track_obj["videoDetails"]["lengthSeconds"], + ) + artist = await self.get_artist( + track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][ + "externalChannelId" + ] ) - artist = await self.get_artist(track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"]["externalChannelId"]) track.artists = [artist] images = [] - for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"]["thumbnails"]: + for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][ + "thumbnails" + ]: images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) return track @@ -270,23 +292,26 @@ class YTMusic(MusicProvider): """Grabs the highes available audio stream from available streams""" stream_format = None for format in track_obj["streamingData"]["adaptiveFormats"]: - if format["mimeType"].startswith("audio") and format["audioQuality"] == "AUDIO_QUALITY_HIGH": - stream_format = format + if ( + format["mimeType"].startswith("audio") + and format["audioQuality"] == "AUDIO_QUALITY_HIGH" + ): + stream_format = format if stream_format is None: raise MediaNotFoundError("No stream found for this track") return stream_format async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str: """Figures out the stream URL to use based on the YT track object""" - cipherParts = dict() + cipher_parts = dict() for part in stream_format["signatureCipher"].split("&"): k, v = part.split("=", maxsplit=1) - cipherParts[k] = unquote(v) - - signature = await self._decipher_signature(ciphered_signature=cipherParts["s"], item_id=item_id) - url = cipherParts["url"] + "&sig=" + signature - - return url + cipher_parts[k] = unquote(v) + signature = await self._decipher_signature( + ciphered_signature=cipher_parts["s"], item_id=item_id + ) + url = cipher_parts["url"] + "&sig=" + signature + return url async def _decipher_signature(self, ciphered_signature: str, item_id: str): """Decipher the signature, required to build the Stream URL""" @@ -303,11 +328,9 @@ class YTMusic(MusicProvider): match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response) if match is None: raise Exception("Could not identify the URL for base.js player.") - url = YTM_DOMAIN + match.group(1) response = await self._get_data(url=url) match = re.search(r"signatureTimestamp[:=](\d+)", response) if match is None: raise Exception("Unable to identify the signatureTimestamp.") - return int(match.group(1))