From: Marvin Schenkel Date: Fri, 8 Jul 2022 23:47:39 +0000 (+0200) Subject: Improve Youtube Music provider (#402) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=ab8afa25273b0aead23036408be8c3ecd5238155;p=music-assistant-server.git Improve Youtube Music provider (#402) * Add new YTM parser * Fix get_track * Fix parsing bugs * Fix artist albums without artist * Increase limit for people with a music addiction :-) * Update music_assistant/music_providers/ytmusic/ytmusic.py Co-authored-by: Marcel van der Veldt * Only return highest quality thumb * Fix missing artist in album search results * Add duration to album tracks * ignore albumartists without id Co-authored-by: Marcel van der Veldt --- diff --git a/music_assistant/music_providers/ytmusic.py b/music_assistant/music_providers/ytmusic.py deleted file mode 100644 index 45d862b6..00000000 --- a/music_assistant/music_providers/ytmusic.py +++ /dev/null @@ -1,721 +0,0 @@ -"""Youtube Music support for MusicAssistant.""" -import re -from datetime import date -from typing import AsyncGenerator, Dict, List, Optional -from urllib.parse import unquote - -import pytube -import ytmusicapi - -from music_assistant.helpers.audio import get_http_stream -from music_assistant.models.enums import ProviderType -from music_assistant.models.errors import LoginFailed, MediaNotFoundError -from music_assistant.models.media_items import ( - Album, - AlbumType, - Artist, - ContentType, - ImageType, - MediaItemImage, - MediaItemProviderId, - MediaItemType, - MediaType, - Playlist, - StreamDetails, - Track, -) -from music_assistant.models.music_provider import MusicProvider - -YTM_DOMAIN = "https://music.youtube.com" -YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/" - - -class YoutubeMusicProvider(MusicProvider): - """Provider for Youtube Music.""" - - _attr_type = ProviderType.YTMUSIC - _attr_name = "Youtube Music" - _attr_supported_mediatypes = [ - MediaType.ARTIST, - MediaType.ALBUM, - MediaType.TRACK, - MediaType.PLAYLIST, - ] - _headers = None - _context = None - _cookies = None - - async def setup(self) -> bool: - """Set up the YTMusic provider.""" - if not self.config.enabled: - return False - if not self.config.username or not self.config.password: - raise LoginFailed("Invalid login credentials") - await self._initialize_headers(cookie=self.config.password) - await self._initialize_context() - self._cookies = {"CONSENT": "YES+1"} - return True - - async def search( - self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 - ) -> List[MediaItemType]: - """ - Perform search on musicprovider. - - :param search_query: Search query. - :param media_types: A list of media_types to include. All types if None. - :param limit: Number of items to return in the search (per type). - """ - data = {"query": search_query} - ytm_filter = None - if len(media_types) == 1: - # YTM does not support multiple searchtypes, falls back to all if no type given - if media_types[0] == MediaType.ARTIST: - ytm_filter = "artists" - if media_types[0] == MediaType.ALBUM: - ytm_filter = "albums" - if media_types[0] == MediaType.TRACK: - ytm_filter = "songs" - if media_types[0] == MediaType.PLAYLIST: - ytm_filter = "playlists" - params = ytmusicapi.parsers.search_params.get_search_params( - filter=ytm_filter, scope=None, ignore_spelling=False - ) - data["params"] = params - search_results = await self._post_data(endpoint="search", data=data) - if "tabbedSearchResultsRenderer" in search_results["contents"]: - results = search_results["contents"]["tabbedSearchResultsRenderer"]["tabs"][ - 0 - ]["tabRenderer"]["content"] - else: - results = search_results["contents"] - parsed_results = [] - for result in results["sectionListRenderer"]["contents"]: - if "musicShelfRenderer" in result: - category = result["musicShelfRenderer"]["title"]["runs"][0]["text"] - if category == "Artists": - for artist in result["musicShelfRenderer"]["contents"]: - artist_id = artist["musicResponsiveListItemRenderer"][ - "navigationEndpoint" - ]["browseEndpoint"]["browseId"] - parsed_results.append(await self.get_artist(artist_id)) - elif category == "Albums": - for album in result["musicShelfRenderer"]["contents"]: - album_id = album["musicResponsiveListItemRenderer"][ - "navigationEndpoint" - ]["browseEndpoint"]["browseId"] - parsed_results.append(await self.get_album(album_id)) - elif category == "Songs": - for song in result["musicShelfRenderer"]["contents"]: - song_id = song["musicResponsiveListItemRenderer"][ - "playlistItemData" - ]["videoId"] - parsed_results.append(await self.get_track(song_id)) - else: - print(category) - return parsed_results - - async def get_library_artists(self) -> AsyncGenerator[Artist, None]: - """Retrieve all library artists from Youtube Music.""" - data = {"browseId": "FEmusic_library_corpus_track_artists"} - response = await self._post_data(endpoint="browse", data=data) - response_artist = response["contents"]["singleColumnBrowseResultsRenderer"][ - "tabs" - ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ - "itemSectionRenderer" - ][ - "contents" - ][ - 0 - ][ - "musicShelfRenderer" - ][ - "contents" - ] - parsed_artists = ytmusicapi.parsers.library.parse_artists(response_artist) - for item in parsed_artists: - artist = Artist( - item_id=item["browseId"], provider=self.type, name=item["artist"] - ) - artist.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) - for thumb in item["thumbnails"] - ] - artist.add_provider_id( - MediaItemProviderId( - item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id - ) - ) - yield artist - - async def get_library_albums(self) -> AsyncGenerator[Album, None]: - """Retrieve all library albums from Youtube Music.""" - data = {"browseId": "FEmusic_liked_albums"} - response = await self._post_data(endpoint="browse", data=data) - response_albums = response["contents"]["singleColumnBrowseResultsRenderer"][ - "tabs" - ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ - "itemSectionRenderer" - ][ - "contents" - ][ - 0 - ][ - "gridRenderer" - ][ - "items" - ] - parsed_albums = ytmusicapi.parsers.library.parse_albums(response_albums) - for item in parsed_albums: - if item["type"] == "Single": - album_type = AlbumType.SINGLE - elif item["type"] == "EP": - album_type = AlbumType.EP - elif item["type"] == "Album": - album_type = AlbumType.ALBUM - else: - album_type = AlbumType.UNKNOWN - album = Album( - item_id=item["browseId"], - name=item["title"], - album_type=album_type, - provider=self.type, - ) - if item["year"].isdigit(): - album.year = item["year"] - artists = [] - for artist in item["artists"]: - artist_id = artist["id"] - if not artist_id: - artist_id = "ytm_va" - album_artist = Artist( - item_id=artist_id, name=artist["name"], provider=self.type - ) - album_artist.add_provider_id( - MediaItemProviderId( - item_id=str(artist["id"]), prov_type=self.type, prov_id=self.id - ) - ) - artists.append(album_artist) - album.artists = artists - album.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) - for thumb in item["thumbnails"] - ] - album.add_provider_id( - MediaItemProviderId( - item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id - ) - ) - yield album - - async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: - """Retrieve all library playlists from the provider.""" - data = {"browseId": "FEmusic_liked_playlists"} - response = await self._post_data(endpoint="browse", data=data) - response_playlists = response["contents"]["singleColumnBrowseResultsRenderer"][ - "tabs" - ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ - "itemSectionRenderer" - ][ - "contents" - ][ - 0 - ][ - "gridRenderer" - ] - playlists = ytmusicapi.parsers.browsing.parse_content_list( - response_playlists["items"][1:], ytmusicapi.parsers.browsing.parse_playlist - ) - for item in playlists: - playlist = Playlist( - item_id=str(item["playlistId"]), provider=self.type, name=item["title"] - ) - playlist.metadata.description = item["description"] - playlist.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) - for thumb in item["thumbnails"] - ] - playlist.add_provider_id( - MediaItemProviderId( - item_id=str(item["playlistId"]), - prov_type=self.type, - prov_id=self.id, - ) - ) - yield playlist - - async def get_library_tracks(self) -> AsyncGenerator[Track, None]: - """Retrieve library tracks from Youtube Music.""" - data = {"browseId": "FEmusic_liked_videos"} - response = await self._post_data(endpoint="browse", data=data) - parsed_tracks = ytmusicapi.parsers.library.parse_library_songs(response)[ - "parsed" - ] - for item in parsed_tracks: - track = await self.get_track(item["videoId"]) - album = await self.get_album(item["album"]["id"]) - track.album = album - track.metadata.explicit = item["isExplicit"] - track.add_provider_id( - MediaItemProviderId( - item_id=str(item["videoId"]), - prov_type=self.type, - prov_id=self.id, - ) - ) - yield track - - async def get_album(self, prov_album_id) -> Album: - """Get full album details by id.""" - data = {"browseId": prov_album_id} - album_obj = await self._post_data(endpoint="browse", data=data) - return ( - await self._parse_album(album_obj=album_obj, album_id=prov_album_id) - if album_obj - else None - ) - - async def get_album_tracks(self, prov_album_id: str) -> List[Track]: - """Get album tracks for given album id.""" - data = {"browseId": prov_album_id} - album_obj = await self._post_data(endpoint="browse", data=data) - parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) - album_playlist_id = parsed_album["audioPlaylistId"] - return await self.get_playlist_tracks(album_playlist_id) - - async def get_artist(self, prov_artist_id) -> Artist: - """Get full artist details by id.""" - data = {"browseId": prov_artist_id} - artist_obj = await self._post_data(endpoint="browse", data=data) - return ( - await self._parse_artist(artist_obj=artist_obj, artist_id=prov_artist_id) - if artist_obj - else None - ) - - async def get_track(self, prov_track_id) -> Track: - """Get full track details by id.""" - signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1 - data = { - "playbackContext": { - "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} - }, - "video_id": prov_track_id, - } - track_obj = await self._post_data("player", data=data) - return await self._parse_track(track_obj) if track_obj else None - - async def get_playlist(self, prov_playlist_id) -> Playlist: - """Get full playlist details by id.""" - browse_id = ( - "VL" + prov_playlist_id - if not prov_playlist_id.startswith("VL") - else prov_playlist_id - ) - data = {"browseId": browse_id} - playlist_response = await self._post_data("browse", data=data) - return await self._parse_playlist(playlist_response) - - async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: - """Get all playlist tracks for given playlist id.""" - browse_id = ( - "VL" + prov_playlist_id - if not prov_playlist_id.startswith("VL") - else prov_playlist_id - ) - data = {"browseId": browse_id} - playlist_obj = await self._post_data("browse", data=data) - tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][ - 0 - ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ - "musicPlaylistShelfRenderer" - ][ - "contents" - ] - return [ - await self.get_track( - track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"] - ) - for track in tracks - if "playlistItemData" in track["musicResponsiveListItemRenderer"] - ] - - async def get_artist_albums(self, prov_artist_id) -> List[Album]: - """Get a list of albums for the given artist.""" - data = {"browseId": prov_artist_id} - response = await self._post_data("browse", data=data) - album_response = response["contents"]["singleColumnBrowseResultsRenderer"][ - "tabs" - ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ - "musicCarouselShelfRenderer" - ][ - "contents" - ] - albums = [] - for item in album_response: - album_id = item["musicTwoRowItemRenderer"]["navigationEndpoint"][ - "browseEndpoint" - ]["browseId"] - album_name = item["musicTwoRowItemRenderer"]["title"]["runs"][0]["text"] - thumbnails = item["musicTwoRowItemRenderer"]["thumbnailRenderer"][ - "musicThumbnailRenderer" - ]["thumbnail"]["thumbnails"] - album = album = Album( - item_id=album_id, - name=album_name, - provider=self.type, - ) - album.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails - ] - album.add_provider_id( - MediaItemProviderId( - item_id=str(album_id), prov_type=self.type, prov_id=self.id - ) - ) - albums.append(album) - return albums - - async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: - """Get a list of 5 most popular tracks for the given artist.""" - data = {"browseId": prov_artist_id} - response = await self._post_data("browse", data=data) - # Check if we are dealing with an actual artist with songs, rather than a user - if ( - "musicShelfRenderer" - in response["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0][ - "tabRenderer" - ]["content"]["sectionListRenderer"]["contents"][0] - ): - songs_response = response["contents"]["singleColumnBrowseResultsRenderer"][ - "tabs" - ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ - "musicShelfRenderer" - ][ - "contents" - ] - return [ - await self.get_track( - prov_track_id=song["musicResponsiveListItemRenderer"]["overlay"][ - "musicItemThumbnailOverlayRenderer" - ]["content"]["musicPlayButtonRenderer"]["playNavigationEndpoint"][ - "watchEndpoint" - ][ - "videoId" - ] - ) - for song in songs_response - ] - return [] - - async def get_stream_details(self, item_id: str) -> StreamDetails: - """Return the content details for the given track when it will be streamed.""" - signature_timestamp = await self._get_signature_timestamp() - data = { - "playbackContext": { - "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} - }, - "video_id": item_id, - } - track_obj = await self._post_data("player", data=data) - stream_format = await self._parse_stream_format(track_obj) - url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id) - return StreamDetails( - provider=self.type, - item_id=item_id, - data=url, - content_type=ContentType.try_parse(stream_format["mimeType"]), - ) - - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - async for chunk in get_http_stream( - self.mass, streamdetails.data, streamdetails, seek_position - ): - yield chunk - - async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs): - url = f"{YTM_BASE_URL}{endpoint}" - data.update(self._context) - async with self.mass.http_session.post( - url, - headers=self._headers, - json=data, - verify_ssl=False, - cookies=self._cookies, - ) as response: - return await response.json() - - async def _get_data(self, url: str, params: Dict = None): - async with self.mass.http_session.get( - url, headers=self._headers, params=params, cookies=self._cookies - ) as response: - return await response.text() - - async def _initialize_headers(self, cookie: str) -> Dict[str, str]: - """Return headers to include in the requests.""" - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", - "Accept": "*/*", - "Accept-Language": "en-US,en;q=0.5", - "Content-Type": "application/json", - "X-Goog-AuthUser": "0", - "x-origin": "https://music.youtube.com", - "Cookie": cookie, - } - sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie) - origin = headers.get("origin", headers.get("x-origin")) - headers["Authorization"] = ytmusicapi.helpers.get_authorization( - sapisid + " " + origin - ) - self._headers = headers - - async def _initialize_context(self) -> Dict[str, str]: - """Return a dict to use as a context in requests.""" - self._context = { - "context": { - "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"}, - "user": {}, - } - } - - async def _parse_album(self, album_obj: dict, album_id: str) -> Album: - """Parse a YT Album response to an Album model object.""" - parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) - album = Album( - item_id=album_id, - name=parsed_album["title"], - album_type=AlbumType.ALBUM, - provider=self.type, - ) - if parsed_album["year"].isdigit(): - album.year = parsed_album["year"] - images = [] - for thumb in parsed_album["thumbnails"]: - images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) - album.metadata.images = images - if "description" in parsed_album: - album.metadata.description = unquote(parsed_album["description"]) - artists = [] - for parsed_artist in parsed_album["artists"]: - if parsed_artist["id"]: - artist_id = parsed_artist["id"] - if not artist_id: - artist_id = "ytm_va" - artist = Artist( - item_id=artist_id, - provider=self.type, - name=parsed_artist["name"], - ) - artist.add_provider_id( - MediaItemProviderId( - item_id=str(parsed_artist["id"]), - prov_type=self.type, - prov_id=self.id, - ) - ) - artists.append(artist) - album.artists = artists - album.add_provider_id( - MediaItemProviderId( - item_id=str(album_id), prov_type=self.type, prov_id=self.id - ) - ) - return album - - async def _parse_artist(self, artist_obj: dict, artist_id: str) -> Artist: - """Parse a YT Artist response to Artist model object.""" - if "musicImmersiveHeaderRenderer" in artist_obj["header"]: - # Actual artist - name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"][ - "runs" - ][0]["text"] - artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][ - "subscriptionButton" - ]["subscribeButtonRenderer"]["channelId"] - artist = Artist(item_id=str(artist_id), provider=self.type, name=name) - if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]: - artist.metadata.description = unquote( - artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"][ - "runs" - ][0]["text"] - ) - images = [] - if "thumbnail" in artist_obj["header"]["musicImmersiveHeaderRenderer"]: - for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"][ - "thumbnail" - ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]: - images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) - artist.metadata.images = images - artist.add_provider_id( - MediaItemProviderId( - item_id=str(artist_id), prov_type=self.type, prov_id=self.id - ) - ) - return artist - if "musicVisualHeaderRenderer" in artist_obj["header"]: - # Artist that is actually a user - name = artist_obj["header"]["musicVisualHeaderRenderer"]["title"]["runs"][ - 0 - ]["text"] - artist = Artist(item_id=str(artist_id), name=name, provider=self.type) - if "thumbnail" in artist_obj["header"]["musicVisualHeaderRenderer"]: - thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][ - "thumbnail" - ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"] - elif ( - "foregroundThumbnail" - in artist_obj["header"]["musicVisualHeaderRenderer"] - ): - thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][ - "foregroundThumbnail" - ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"] - artist.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails - ] - artist.add_provider_id( - MediaItemProviderId( - item_id=str(artist_id), prov_type=self.type, prov_id=self.id - ) - ) - return artist - - async def _parse_playlist(self, playlist_response: dict) -> Playlist: - """Parse a YT Playlist response to a Playlist object.""" - playlist_id = playlist_response["contents"][ - "singleColumnBrowseResultsRenderer" - ]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ - "musicPlaylistShelfRenderer" - ][ - "playlistId" - ] - own_playlist = ( - "musicEditablePlaylistDetailHeaderRenderer" in playlist_response["header"] - ) - if own_playlist: - playlist_response = playlist_response["header"][ - "musicEditablePlaylistDetailHeaderRenderer" - ] - title = playlist_response["header"]["musicDetailHeaderRenderer"]["title"][ - "runs" - ][0]["text"] - thumbnails = playlist_response["header"]["musicDetailHeaderRenderer"][ - "thumbnail" - ]["croppedSquareThumbnailRenderer"]["thumbnail"]["thumbnails"] - playlist = Playlist(item_id=str(playlist_id), provider=self.type, name=title) - if "description" in playlist_response["header"]["musicDetailHeaderRenderer"]: - playlist.metadata.description = playlist_response["header"][ - "musicDetailHeaderRenderer" - ]["description"]["runs"][0]["text"] - playlist.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails - ] - playlist.add_provider_id( - MediaItemProviderId( - item_id=str(playlist_id), prov_type=self.type, prov_id=self.id - ) - ) - return playlist - - async def _parse_track(self, track_obj: dict) -> Track: - """Parse a YT Track response to a Track model object.""" - track = Track( - item_id=track_obj["videoDetails"]["videoId"], - provider=self.type, - name=track_obj["videoDetails"]["title"], - duration=track_obj["videoDetails"]["lengthSeconds"], - ) - artist_id = track_obj["microformat"]["microformatDataRenderer"][ - "pageOwnerDetails" - ]["externalChannelId"] - if artist_id == "UCUTXlgdcKU5vfzFqHOWIvkA": - artist = Artist( - item_id=artist_id, name="Various Artists", provider=self.type - ) - artist.add_provider_id( - MediaItemProviderId( - item_id=str(artist_id), prov_type=self.type, prov_id=self.id - ) - ) - else: - artist = await self.get_artist(artist_id) - track.artists = [artist] - track.metadata.images = [ - MediaItemImage(ImageType.THUMB, thumb["url"]) - for thumb in track_obj["microformat"]["microformatDataRenderer"][ - "thumbnail" - ]["thumbnails"] - ] - available = False - if track_obj["playabilityStatus"]["status"] == "OK": - available = True - track.add_provider_id( - MediaItemProviderId( - item_id=str(track_obj["videoDetails"]["videoId"]), - prov_type=self.type, - prov_id=self.id, - available=available, - ) - ) - return track - - async def _get_signature_timestamp(self): - """Get a signature timestamp required to generate valid stream URLs.""" - response = await self._get_data(url=YTM_DOMAIN) - match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response) - if match is None: - raise Exception("Could not identify the URL for base.js player.") - url = YTM_DOMAIN + match.group(1) - response = await self._get_data(url=url) - match = re.search(r"signatureTimestamp[:=](\d+)", response) - if match is None: - raise Exception("Unable to identify the signatureTimestamp.") - return int(match.group(1)) - - async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str: - """Figure out the stream URL to use based on the YT track object.""" - cipher_parts = {} - for part in stream_format["signatureCipher"].split("&"): - key, val = part.split("=", maxsplit=1) - cipher_parts[key] = unquote(val) - signature = await self._decipher_signature( - ciphered_signature=cipher_parts["s"], item_id=item_id - ) - url = cipher_parts["url"] + "&sig=" + signature - return url - - @classmethod - async def _parse_stream_format(cls, track_obj: dict) -> dict: - """Grab the highes available audio stream from available streams.""" - stream_format = {} - quality_mapper = { - "AUDIO_QUALITY_LOW": 1, - "AUDIO_QUALITY_MEDIUM": 2, - "AUDIO_QUALITY_HIGH": 3, - } - for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]: - if adaptive_format["mimeType"].startswith("audio") and ( - not stream_format - or quality_mapper.get(adaptive_format["audioQuality"], 0) - > quality_mapper.get(stream_format["audioQuality"], 0) - ): - stream_format = adaptive_format - if stream_format is None: - raise MediaNotFoundError("No stream found for this track") - return stream_format - - async def _decipher_signature(self, ciphered_signature: str, item_id: str): - """Decipher the signature, required to build the Stream URL.""" - - def _decipher(): - embed_url = f"https://www.youtube.com/embed/{item_id}" - embed_html = pytube.request.get(embed_url) - js_url = pytube.extract.js_url(embed_html) - ytm_js = pytube.request.get(js_url) - cipher = pytube.cipher.Cipher(js=ytm_js) - return cipher.get_signature(ciphered_signature) - - return await self.mass.loop.run_in_executor(None, _decipher) diff --git a/music_assistant/music_providers/ytmusic/__init__.py b/music_assistant/music_providers/ytmusic/__init__.py new file mode 100644 index 00000000..dedf90ba --- /dev/null +++ b/music_assistant/music_providers/ytmusic/__init__.py @@ -0,0 +1,3 @@ +"""Package with Youtube Music provider.""" + +from .ytmusic import YoutubeMusicProvider # noqa diff --git a/music_assistant/music_providers/ytmusic/helpers.py b/music_assistant/music_providers/ytmusic/helpers.py new file mode 100644 index 00000000..4acbe795 --- /dev/null +++ b/music_assistant/music_providers/ytmusic/helpers.py @@ -0,0 +1,163 @@ +""" +Helper module for parsing the Youtube Music API. + +This helpers file is an async wrapper around the excellent ytmusicapi package. +While the ytmusicapi package does an excellent job at parsing the Youtube Music results, +it is unfortunately not async, which is required for Music Assistant to run smoothly. +This also nicely separates the parsing logic from the Youtube Music provider logic. +""" + +import asyncio +import json +from typing import Dict, List + +import ytmusicapi + + +async def get_artist(prov_artist_id: str) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_artist function.""" + + def _get_artist(): + ytm = ytmusicapi.YTMusic() + try: + artist = ytm.get_artist(channelId=prov_artist_id) + except KeyError: + user = ytm.get_user(channelId=prov_artist_id) + artist = {"channelId": prov_artist_id, "name": user["name"]} + return artist + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_artist) + + +async def get_album(prov_album_id: str) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_album function.""" + + def _get_album(): + ytm = ytmusicapi.YTMusic() + return ytm.get_album(browseId=prov_album_id) + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_album) + + +async def get_playlist( + prov_playlist_id: str, headers: Dict[str, str] +) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_playlist function.""" + + def _get_playlist(): + ytm = ytmusicapi.YTMusic(auth=json.dumps(headers)) + return ytm.get_playlist(playlistId=prov_playlist_id) + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_playlist) + + +async def get_track(prov_track_id: str) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_playlist function.""" + + def _get_song(): + ytm = ytmusicapi.YTMusic() + track_obj = ytm.get_song(videoId=prov_track_id) + track = {} + track["videoId"] = track_obj["videoDetails"]["videoId"] + track["title"] = track_obj["videoDetails"]["title"] + track["artists"] = [ + { + "channelId": track_obj["videoDetails"]["channelId"], + "name": track_obj["videoDetails"]["author"], + } + ] + track["duration"] = track_obj["videoDetails"]["lengthSeconds"] + track["thumbnails"] = track_obj["microformat"]["microformatDataRenderer"][ + "thumbnail" + ]["thumbnails"] + track["isAvailable"] = track_obj["playabilityStatus"]["status"] == "OK" + return track + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_song) + + +async def get_library_artists(headers: Dict[str, str]) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_library_artists function.""" + + def _get_library_artists(): + ytm = ytmusicapi.YTMusic(auth=json.dumps(headers)) + artists = ytm.get_library_artists(limit=9999) + # Sync properties with uniformal artist object + for artist in artists: + artist["id"] = artist["browseId"] + artist["name"] = artist["artist"] + del artist["browseId"] + del artist["artist"] + return artists + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_library_artists) + + +async def get_library_albums(headers: Dict[str, str]) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_library_albums function.""" + + def _get_library_albums(): + ytm = ytmusicapi.YTMusic(auth=json.dumps(headers)) + return ytm.get_library_albums(limit=9999) + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_library_albums) + + +async def get_library_playlists(headers: Dict[str, str]) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_library_playlists function.""" + + def _get_library_playlists(): + ytm = ytmusicapi.YTMusic(auth=json.dumps(headers)) + playlists = ytm.get_library_playlists(limit=9999) + # Sync properties with uniformal playlist object + for playlist in playlists: + playlist["id"] = playlist["playlistId"] + del playlist["playlistId"] + return playlists + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_library_playlists) + + +async def get_library_tracks(headers: Dict[str, str]) -> Dict[str, str]: + """Async wrapper around the ytmusicapi get_library_tracks function.""" + + def _get_library_tracks(): + ytm = ytmusicapi.YTMusic(auth=json.dumps(headers)) + tracks = ytm.get_library_songs(limit=9999) + return tracks + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _get_library_tracks) + + +async def search(query: str, ytm_filter: str = None, limit: int = 20) -> List[Dict]: + """Async wrapper around the ytmusicapi search function.""" + + def _search(): + ytm = ytmusicapi.YTMusic() + results = ytm.search(query=query, filter=ytm_filter, limit=limit) + # Sync result properties with uniformal objects + for result in results: + if result["resultType"] == "artist": + result["id"] = result["browseId"] + result["name"] = result["artist"] + del result["browseId"] + del result["artist"] + elif result["resultType"] == "playlist": + if "playlistId" in result: + result["id"] = result["playlistId"] + del result["playlistId"] + elif "browseId" in result: + result["id"] = result["browseId"] + del result["browseId"] + return results + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _search) diff --git a/music_assistant/music_providers/ytmusic/ytmusic.py b/music_assistant/music_providers/ytmusic/ytmusic.py new file mode 100644 index 00000000..ed4910a4 --- /dev/null +++ b/music_assistant/music_providers/ytmusic/ytmusic.py @@ -0,0 +1,500 @@ +"""Youtube Music support for MusicAssistant.""" +import re +from operator import itemgetter +from typing import AsyncGenerator, Dict, List, Optional +from urllib.parse import unquote + +import pytube +import ytmusicapi + +from music_assistant.helpers.audio import get_http_stream +from music_assistant.models.enums import ProviderType +from music_assistant.models.errors import ( + InvalidDataError, + LoginFailed, + MediaNotFoundError, +) +from music_assistant.models.media_items import ( + Album, + AlbumType, + Artist, + ContentType, + ImageType, + MediaItemImage, + MediaItemProviderId, + MediaItemType, + MediaType, + Playlist, + StreamDetails, + Track, +) +from music_assistant.models.music_provider import MusicProvider +from music_assistant.music_providers.ytmusic.helpers import ( + get_album, + get_artist, + get_library_albums, + get_library_artists, + get_library_playlists, + get_library_tracks, + get_playlist, + get_track, + search, +) + +YTM_DOMAIN = "https://music.youtube.com" +YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/" + + +class YoutubeMusicProvider(MusicProvider): + """Provider for Youtube Music.""" + + _attr_type = ProviderType.YTMUSIC + _attr_name = "Youtube Music" + _attr_supported_mediatypes = [ + MediaType.ARTIST, + MediaType.ALBUM, + MediaType.TRACK, + MediaType.PLAYLIST, + ] + _headers = None + _context = None + _cookies = None + + async def setup(self) -> bool: + """Set up the YTMusic provider.""" + if not self.config.enabled: + return False + if not self.config.username or not self.config.password: + raise LoginFailed("Invalid login credentials") + await self._initialize_headers(cookie=self.config.password) + await self._initialize_context() + self._cookies = {"CONSENT": "YES+1"} + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """ + Perform search on musicprovider. + + :param search_query: Search query. + :param media_types: A list of media_types to include. All types if None. + :param limit: Number of items to return in the search (per type). + """ + ytm_filter = None + if len(media_types) == 1: + # YTM does not support multiple searchtypes, falls back to all if no type given + if media_types[0] == MediaType.ARTIST: + ytm_filter = "artists" + if media_types[0] == MediaType.ALBUM: + ytm_filter = "albums" + if media_types[0] == MediaType.TRACK: + ytm_filter = "songs" + if media_types[0] == MediaType.PLAYLIST: + ytm_filter = "playlists" + results = await search(query=search_query, ytm_filter=ytm_filter, limit=limit) + parsed_results = [] + for result in results: + if result["resultType"] == "artist": + parsed_results.append(await self._parse_artist(result)) + elif result["resultType"] == "album": + parsed_results.append( + # Search result for albums contain invalid artists + # Use a get_album to get full details + await self.get_album(result["browseId"]) + ) + elif result["resultType"] == "playlist": + parsed_results.append(await self._parse_playlist(result)) + elif result["resultType"] == "song": + # Tracks from search results sometimes do not have a valid artist id + # In that case, call the API for track details based on track id + try: + track = await self._parse_track(result) + if track: + parsed_results.append(track) + except InvalidDataError: + track = await self.get_track(result["videoId"]) + if track: + parsed_results.append(track) + return parsed_results + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve all library artists from Youtube Music.""" + artists_obj = await get_library_artists(headers=self._headers) + for artist in artists_obj: + yield await self._parse_artist(artist) + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """Retrieve all library albums from Youtube Music.""" + albums_obj = await get_library_albums(headers=self._headers) + for album in albums_obj: + yield await self._parse_album(album, album["browseId"]) + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Retrieve all library playlists from the provider.""" + playlists_obj = await get_library_playlists(headers=self._headers) + for playlist in playlists_obj: + yield await self._parse_playlist(playlist) + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """Retrieve library tracks from Youtube Music.""" + tracks_obj = await get_library_tracks(headers=self._headers) + for track in tracks_obj: + # Library tracks sometimes do not have a valid artist id + # In that case, call the API for track details based on track id + try: + yield await self._parse_track(track) + except InvalidDataError: + track = await self.get_track(track["videoId"]) + yield track + + async def get_album(self, prov_album_id) -> Album: + """Get full album details by id.""" + album_obj = await get_album(prov_album_id=prov_album_id) + return ( + await self._parse_album(album_obj=album_obj, album_id=prov_album_id) + if album_obj + else None + ) + + async def get_album_tracks(self, prov_album_id: str) -> List[Track]: + """Get album tracks for given album id.""" + album_obj = await get_album(prov_album_id=prov_album_id) + return [ + await self._parse_track(track) + for track in album_obj["tracks"] + if "tracks" in album_obj + ] + + async def get_artist(self, prov_artist_id) -> Artist: + """Get full artist details by id.""" + artist_obj = await get_artist(prov_artist_id=prov_artist_id) + return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None + + async def get_track(self, prov_track_id) -> Track: + """Get full track details by id.""" + track_obj = await get_track(prov_track_id=prov_track_id) + return await self._parse_track(track_obj) + + async def get_playlist(self, prov_playlist_id) -> Playlist: + """Get full playlist details by id.""" + playlist_obj = await get_playlist( + prov_playlist_id=prov_playlist_id, headers=self._headers + ) + return await self._parse_playlist(playlist_obj) + + async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: + """Get all playlist tracks for given playlist id.""" + playlist_obj = await get_playlist( + prov_playlist_id=prov_playlist_id, headers=self._headers + ) + if "tracks" in playlist_obj: + tracks = [] + for track in playlist_obj["tracks"]: + if track["isAvailable"]: + # Playlist tracks sometimes do not have a valid artist id + # In that case, call the API for track details based on track id + try: + track = await self._parse_track(track) + if track: + tracks.append(track) + except InvalidDataError: + track = await self.get_track(track["videoId"]) + if track: + tracks.append(track) + return tracks + return [] + + async def get_artist_albums(self, prov_artist_id) -> List[Album]: + """Get a list of albums for the given artist.""" + artist_obj = await get_artist(prov_artist_id=prov_artist_id) + if "albums" in artist_obj and "results" in artist_obj["albums"]: + albums = [] + for album_obj in artist_obj["albums"]["results"]: + if "artists" not in album_obj: + album_obj["artists"] = [ + {"id": artist_obj["channelId"], "name": artist_obj["name"]} + ] + albums.append(await self._parse_album(album_obj, album_obj["browseId"])) + return albums + return [] + + async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: + """Get a list of 5 most popular tracks for the given artist.""" + artist_obj = await get_artist(prov_artist_id=prov_artist_id) + if "songs" in artist_obj and "results" in artist_obj["songs"]: + return [ + await self._parse_track(track) + for track in artist_obj["songs"]["results"] + ] + return [] + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Return the content details for the given track when it will be streamed.""" + signature_timestamp = await self._get_signature_timestamp() + data = { + "playbackContext": { + "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} + }, + "video_id": item_id, + } + track_obj = await self._post_data("player", data=data) + stream_format = await self._parse_stream_format(track_obj) + url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id) + return StreamDetails( + provider=self.type, + item_id=item_id, + data=url, + content_type=ContentType.try_parse(stream_format["mimeType"]), + ) + + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + async for chunk in get_http_stream( + self.mass, streamdetails.data, streamdetails, seek_position + ): + yield chunk + + async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs): + url = f"{YTM_BASE_URL}{endpoint}" + data.update(self._context) + async with self.mass.http_session.post( + url, + headers=self._headers, + json=data, + verify_ssl=False, + cookies=self._cookies, + ) as response: + return await response.json() + + async def _get_data(self, url: str, params: Dict = None): + async with self.mass.http_session.get( + url, headers=self._headers, params=params, cookies=self._cookies + ) as response: + return await response.text() + + async def _initialize_headers(self, cookie: str) -> Dict[str, str]: + """Return headers to include in the requests.""" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.5", + "Content-Type": "application/json", + "X-Goog-AuthUser": "0", + "x-origin": "https://music.youtube.com", + "Cookie": cookie, + } + sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie) + origin = headers.get("origin", headers.get("x-origin")) + headers["Authorization"] = ytmusicapi.helpers.get_authorization( + sapisid + " " + origin + ) + self._headers = headers + + async def _initialize_context(self) -> Dict[str, str]: + """Return a dict to use as a context in requests.""" + self._context = { + "context": { + "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"}, + "user": {}, + } + } + + async def _parse_album(self, album_obj: dict, album_id: str) -> Album: + """Parse a YT Album response to an Album model object.""" + if "title" in album_obj: + name = album_obj["title"] + elif "name" in album_obj: + name = album_obj["name"] + album = Album( + item_id=album_id, + name=name, + provider=self.type, + ) + if "year" in album_obj and album_obj["year"].isdigit(): + album.year = album_obj["year"] + if "thumbnails" in album_obj: + album.metadata.images = await self._parse_thumbnails( + album_obj["thumbnails"] + ) + if "description" in album_obj: + album.metadata.description = unquote(album_obj["description"]) + if "artists" in album_obj: + album.artists = [ + await self._parse_artist(artist) + for artist in album_obj["artists"] + if artist.get("id") + ] + if "type" in album_obj: + if album_obj["type"] == "Single": + album_type = AlbumType.SINGLE + elif album_obj["type"] == "EP": + album_type = AlbumType.EP + elif album_obj["type"] == "Album": + album_type = AlbumType.ALBUM + else: + album_type = AlbumType.UNKNOWN + album.album_type = album_type + album.add_provider_id( + MediaItemProviderId( + item_id=str(album_id), prov_type=self.type, prov_id=self.id + ) + ) + return album + + async def _parse_artist(self, artist_obj: dict) -> Artist: + """Parse a YT Artist response to Artist model object.""" + artist_id = None + if "channelId" in artist_obj: + artist_id = artist_obj["channelId"] + elif "id" in artist_obj and artist_obj["id"]: + artist_id = artist_obj["id"] + elif artist_obj["name"] == "Various Artists": + artist_id = "UCUTXlgdcKU5vfzFqHOWIvkA" + if not artist_id: + raise InvalidDataError("Artist does not have a valid ID") + artist = Artist(item_id=artist_id, name=artist_obj["name"], provider=self.type) + if "description" in artist_obj: + artist.metadata.description = artist_obj["description"] + if "thumbnails" in artist_obj and artist_obj["thumbnails"]: + artist.metadata.images = await self._parse_thumbnails( + artist_obj["thumbnails"] + ) + artist.add_provider_id( + MediaItemProviderId( + item_id=str(artist_id), + prov_type=self.type, + prov_id=self.id, + url=f"https://music.youtube.com/channel/{artist_id}", + ) + ) + return artist + + async def _parse_playlist(self, playlist_obj: dict) -> Playlist: + """Parse a YT Playlist response to a Playlist object.""" + playlist = Playlist( + item_id=playlist_obj["id"], provider=self.type, name=playlist_obj["title"] + ) + if "description" in playlist_obj: + playlist.metadata.description = playlist_obj["description"] + if "thumbnails" in playlist_obj and playlist_obj["thumbnails"]: + playlist.metadata.images = await self._parse_thumbnails( + playlist_obj["thumbnails"] + ) + playlist.add_provider_id( + MediaItemProviderId( + item_id=playlist_obj["id"], prov_type=self.type, prov_id=self.id + ) + ) + return playlist + + async def _parse_track(self, track_obj: dict) -> Track: + """Parse a YT Track response to a Track model object.""" + track = Track( + item_id=track_obj["videoId"], provider=self.type, name=track_obj["title"] + ) + if "artists" in track_obj: + track.artists = [ + await self._parse_artist(artist) for artist in track_obj["artists"] + ] + if "thumbnails" in track_obj and track_obj["thumbnails"]: + track.metadata.images = await self._parse_thumbnails( + track_obj["thumbnails"] + ) + if ( + "album" in track_obj + and track_obj["album"] + and "id" in track_obj["album"] + and track_obj["album"]["id"] + and "artists" in track_obj + ): + album = track_obj["album"] + album["artists"] = track_obj["artists"] + track.album = await self._parse_album(album, album["id"]) + if "isExplicit" in track_obj: + track.metadata.explicit = track_obj["isExplicit"] + if "duration" in track_obj and track_obj["duration"].isdigit(): + track.duration = track_obj["duration"] + elif ( + "duration_seconds" in track_obj + and str(track_obj["duration_seconds"]).isdigit() + ): + track.duration = track_obj["duration_seconds"] + available = True + if "isAvailable" in track_obj: + available = track_obj["isAvailable"] + track.add_provider_id( + MediaItemProviderId( + item_id=str(track_obj["videoId"]), + prov_type=self.type, + prov_id=self.id, + available=available, + ) + ) + return track + + async def _get_signature_timestamp(self): + """Get a signature timestamp required to generate valid stream URLs.""" + response = await self._get_data(url=YTM_DOMAIN) + match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response) + if match is None: + raise Exception("Could not identify the URL for base.js player.") + url = YTM_DOMAIN + match.group(1) + response = await self._get_data(url=url) + match = re.search(r"signatureTimestamp[:=](\d+)", response) + if match is None: + raise Exception("Unable to identify the signatureTimestamp.") + return int(match.group(1)) + + async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str: + """Figure out the stream URL to use based on the YT track object.""" + cipher_parts = {} + for part in stream_format["signatureCipher"].split("&"): + key, val = part.split("=", maxsplit=1) + cipher_parts[key] = unquote(val) + signature = await self._decipher_signature( + ciphered_signature=cipher_parts["s"], item_id=item_id + ) + url = cipher_parts["url"] + "&sig=" + signature + return url + + @classmethod + async def _parse_thumbnails(cls, thumbnails_obj: dict) -> List[MediaItemImage]: + """Parse and sort a list of thumbnails and return the highest quality.""" + thumb = sorted(thumbnails_obj, key=itemgetter("width"), reverse=True)[0] + return [MediaItemImage(ImageType.THUMB, thumb["url"])] + + @classmethod + async def _parse_stream_format(cls, track_obj: dict) -> dict: + """Grab the highest available audio stream from available streams.""" + stream_format = {} + quality_mapper = { + "AUDIO_QUALITY_LOW": 1, + "AUDIO_QUALITY_MEDIUM": 2, + "AUDIO_QUALITY_HIGH": 3, + } + for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]: + if adaptive_format["mimeType"].startswith("audio") and ( + not stream_format + or quality_mapper.get(adaptive_format["audioQuality"], 0) + > quality_mapper.get(stream_format["audioQuality"], 0) + ): + stream_format = adaptive_format + if stream_format is None: + raise MediaNotFoundError("No stream found for this track") + return stream_format + + async def _decipher_signature(self, ciphered_signature: str, item_id: str): + """Decipher the signature, required to build the Stream URL.""" + + def _decipher(): + embed_url = f"https://www.youtube.com/embed/{item_id}" + embed_html = pytube.request.get(embed_url) + js_url = pytube.extract.js_url(embed_html) + ytm_js = pytube.request.get(js_url) + cipher = pytube.cipher.Cipher(js=ytm_js) + return cipher.get_signature(ciphered_signature) + + return await self.mass.loop.run_in_executor(None, _decipher)