From a211eaeb4761e52c5f158ccfd2652f631d7d8c4e Mon Sep 17 00:00:00 2001 From: Marvin Schenkel Date: Thu, 7 Jul 2022 22:23:11 +0200 Subject: [PATCH] Add YoutubeMusic MusicProvider (#397) Co-authored-by: Marvin Schenkel Co-authored-by: Marcel van der Veldt --- .vscode/settings.json | 5 + examples/full.py | 27 +- .../controllers/metadata/__init__.py | 2 +- music_assistant/controllers/music/__init__.py | 2 + music_assistant/controllers/music/artists.py | 2 +- music_assistant/models/enums.py | 21 +- music_assistant/models/music_provider.py | 1 + music_assistant/music_providers/ytmusic.py | 721 ++++++++++++++++++ requirements.txt | 2 + 9 files changed, 769 insertions(+), 14 deletions(-) create mode 100644 music_assistant/music_providers/ytmusic.py diff --git a/.vscode/settings.json b/.vscode/settings.json index d63891a6..0460b1ae 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,4 +6,9 @@ "python.linting.flake8Enabled": true, "python.linting.flake8Args": ["--config=${workspaceFolder}/setup.cfg"], "python.linting.mypyEnabled": false, + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, } diff --git a/examples/full.py b/examples/full.py index af14765c..c4d62e69 100644 --- a/examples/full.py +++ b/examples/full.py @@ -48,6 +48,16 @@ parser.add_argument( required=False, help="Directory on disk for local music library", ) +parser.add_argument( + "--ytmusic-username", + required=False, + help="YoutubeMusic username", +) +parser.add_argument( + "--ytmusic-cookie", + required=False, + help="YoutubeMusic cookie", +) parser.add_argument( "--debug", action="store_true", @@ -102,6 +112,15 @@ if args.tunein_username: username=args.tunein_username, ) ) + +if args.ytmusic_username and args.ytmusic_cookie: + mass_conf.providers.append( + MusicProviderConfig( + ProviderType.YTMUSIC, + username=args.ytmusic_username, + password=args.ytmusic_cookie, + ) + ) if args.musicdir: mass_conf.providers.append( MusicProviderConfig(type=ProviderType.FILESYSTEM_LOCAL, path=args.musicdir) @@ -188,8 +207,8 @@ async def main(): print(f"Got {track_count} tracks ({track_count_lib} in library)") radio_count = await mass.music.radio.count(True) print(f"Got {radio_count} radio stations in library") - playlist_count = await mass.music.playlists.db_items(True) - print(f"Got {len(playlist_count)} playlists in library") + playlists = await mass.music.playlists.db_items(True) + print(f"Got {len(playlists)} playlists in library") # register a player test_player1 = TestPlayer("test1") test_player2 = TestPlayer("test2") @@ -204,8 +223,8 @@ async def main(): # we can also send an uri, such as spotify://track/abcdfefgh # or database://playlist/1 # or a list of items - artist = await mass.music.artists.get("2", ProviderType.DATABASE) - await test_player1.active_queue.play_media(artist) + if len(playlists) > 0: + await test_player1.active_queue.play_media(playlists[0]) await asyncio.sleep(3600) diff --git a/music_assistant/controllers/metadata/__init__.py b/music_assistant/controllers/metadata/__init__.py index 1097286e..39afe618 100755 --- a/music_assistant/controllers/metadata/__init__.py +++ b/music_assistant/controllers/metadata/__init__.py @@ -104,7 +104,7 @@ class MetaDataController: ): if track.metadata.genres: playlist.metadata.genres.update(track.metadata.genres) - elif track.album.metadata.genres: + elif track.album and track.album.metadata.genres: playlist.metadata.genres.update(track.album.metadata.genres) # TODO: create mosaic thumb/fanart from playlist tracks playlist.metadata.last_refresh = int(time()) diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 092469e6..81073cde 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -37,6 +37,7 @@ from music_assistant.music_providers.spotify import SpotifyProvider from music_assistant.music_providers.tunein import TuneInProvider from music_assistant.music_providers.url import PROVIDER_CONFIG as URL_CONFIG from music_assistant.music_providers.url import URLProvider +from music_assistant.music_providers.ytmusic import YoutubeMusicProvider if TYPE_CHECKING: from music_assistant.mass import MusicAssistant @@ -46,6 +47,7 @@ PROV_MAP = { ProviderType.SPOTIFY: SpotifyProvider, ProviderType.QOBUZ: QobuzProvider, ProviderType.TUNEIN: TuneInProvider, + ProviderType.YTMUSIC: YoutubeMusicProvider, } diff --git a/music_assistant/controllers/music/artists.py b/music_assistant/controllers/music/artists.py index 2a1fb7b5..be390e02 100644 --- a/music_assistant/controllers/music/artists.py +++ b/music_assistant/controllers/music/artists.py @@ -176,7 +176,7 @@ class ArtistsController(MediaControllerBase[Artist]): self, item: Artist, overwrite_existing: bool = False ) -> Artist: """Add a new item record to the database.""" - assert item.provider_ids, "Album is missing provider id(s)" + assert item.provider_ids, "Artist is missing provider id(s)" # always try to grab existing item by musicbrainz_id cur_item = None if item.musicbrainz_id: diff --git a/music_assistant/models/enums.py b/music_assistant/models/enums.py index 5312f82d..a8fb7a0c 100644 --- a/music_assistant/models/enums.py +++ b/music_assistant/models/enums.py @@ -68,6 +68,7 @@ class AlbumType(Enum): ALBUM = "album" SINGLE = "single" COMPILATION = "compilation" + EP = "ep" UNKNOWN = "unknown" @@ -96,17 +97,20 @@ class ContentType(Enum): def try_parse(cls: "ContentType", string: str) -> "ContentType": """Try to parse ContentType from (url)string/extension.""" tempstr = string.lower() - if "." in tempstr: - tempstr = tempstr.split(".")[-1] - if "," in tempstr: - for val in tempstr.split(","): - try: - return cls(val.strip()) - except ValueError: - pass + if "audio/" in tempstr: + tempstr = tempstr.split("/")[1] + for splitter in (".", ","): + if splitter in tempstr: + for val in tempstr.split(splitter): + try: + return cls(val.strip()) + except ValueError: + pass tempstr = tempstr.split("?")[0] tempstr = tempstr.split("&")[0] + tempstr = tempstr.split(";")[0] + tempstr = tempstr.replace("mp4", "m4a") try: return cls(tempstr) except ValueError: @@ -209,6 +213,7 @@ class ProviderType(Enum): SPOTIFY = "spotify" QOBUZ = "qobuz" TUNEIN = "tunein" + YTMUSIC = "ytmusic" DATABASE = "database" # internal only URL = "url" # internal only diff --git a/music_assistant/models/music_provider.py b/music_assistant/models/music_provider.py index 936e24da..b4f8f7cb 100644 --- a/music_assistant/models/music_provider.py +++ b/music_assistant/models/music_provider.py @@ -321,6 +321,7 @@ class MusicProvider: ) if not db_item: # dump the item in the db, rich metadata is lazy loaded later + print(prov_item) db_item = await controller.add_db_item(prov_item) elif ( db_item.metadata.checksum and prov_item.metadata.checksum diff --git a/music_assistant/music_providers/ytmusic.py b/music_assistant/music_providers/ytmusic.py new file mode 100644 index 00000000..d976efa5 --- /dev/null +++ b/music_assistant/music_providers/ytmusic.py @@ -0,0 +1,721 @@ +"""Youtube Music support for MusicAssistant.""" +import re +from datetime import date +from typing import AsyncGenerator, Dict, List, Optional +from urllib.parse import unquote + +import pytube +import ytmusicapi + +from music_assistant.helpers.audio import get_http_stream +from music_assistant.models.enums import ProviderType +from music_assistant.models.errors import LoginFailed, MediaNotFoundError +from music_assistant.models.media_items import ( + Album, + AlbumType, + Artist, + ContentType, + ImageType, + MediaItemImage, + MediaItemProviderId, + MediaItemType, + MediaType, + Playlist, + StreamDetails, + Track, +) +from music_assistant.models.music_provider import MusicProvider + +YTM_DOMAIN = "https://music.youtube.com" +YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/" + + +class YoutubeMusicProvider(MusicProvider): + """Provider for Youtube Music.""" + + _attr_type = ProviderType.YTMUSIC + _attr_name = "YTMusic" + _attr_supported_mediatypes = [ + MediaType.ARTIST, + MediaType.ALBUM, + MediaType.TRACK, + MediaType.PLAYLIST, + ] + _headers = None + _context = None + _cookies = None + + async def setup(self) -> bool: + """Set up the YTMusic provider.""" + if not self.config.enabled: + return False + if not self.config.username or not self.config.password: + raise LoginFailed("Invalid login credentials") + await self._initialize_headers(cookie=self.config.password) + await self._initialize_context() + self._cookies = {"CONSENT": "YES+1"} + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """ + Perform search on musicprovider. + + :param search_query: Search query. + :param media_types: A list of media_types to include. All types if None. + :param limit: Number of items to return in the search (per type). + """ + data = {"query": search_query} + ytm_filter = None + if len(media_types) == 1: + # YTM does not support multiple searchtypes, falls back to all if no type given + if media_types[0] == MediaType.ARTIST: + ytm_filter = "artists" + if media_types[0] == MediaType.ALBUM: + ytm_filter = "albums" + if media_types[0] == MediaType.TRACK: + ytm_filter = "songs" + if media_types[0] == MediaType.PLAYLIST: + ytm_filter = "playlists" + params = ytmusicapi.parsers.search_params.get_search_params( + filter=ytm_filter, scope=None, ignore_spelling=False + ) + data["params"] = params + search_results = await self._post_data(endpoint="search", data=data) + if "tabbedSearchResultsRenderer" in search_results["contents"]: + results = search_results["contents"]["tabbedSearchResultsRenderer"]["tabs"][ + 0 + ]["tabRenderer"]["content"] + else: + results = search_results["contents"] + parsed_results = [] + for result in results["sectionListRenderer"]["contents"]: + if "musicShelfRenderer" in result: + category = result["musicShelfRenderer"]["title"]["runs"][0]["text"] + if category == "Artists": + for artist in result["musicShelfRenderer"]["contents"]: + artist_id = artist["musicResponsiveListItemRenderer"][ + "navigationEndpoint" + ]["browseEndpoint"]["browseId"] + parsed_results.append(await self.get_artist(artist_id)) + elif category == "Albums": + for album in result["musicShelfRenderer"]["contents"]: + album_id = album["musicResponsiveListItemRenderer"][ + "navigationEndpoint" + ]["browseEndpoint"]["browseId"] + parsed_results.append(await self.get_album(album_id)) + elif category == "Songs": + for song in result["musicShelfRenderer"]["contents"]: + song_id = song["musicResponsiveListItemRenderer"][ + "playlistItemData" + ]["videoId"] + parsed_results.append(await self.get_track(song_id)) + else: + print(category) + return parsed_results + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve all library artists from Youtube Music.""" + data = {"browseId": "FEmusic_library_corpus_track_artists"} + response = await self._post_data(endpoint="browse", data=data) + response_artist = response["contents"]["singleColumnBrowseResultsRenderer"][ + "tabs" + ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ + "itemSectionRenderer" + ][ + "contents" + ][ + 0 + ][ + "musicShelfRenderer" + ][ + "contents" + ] + parsed_artists = ytmusicapi.parsers.library.parse_artists(response_artist) + for item in parsed_artists: + artist = Artist( + item_id=item["browseId"], provider=self.type, name=item["artist"] + ) + artist.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) + for thumb in item["thumbnails"] + ] + artist.add_provider_id( + MediaItemProviderId( + item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id + ) + ) + yield artist + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """Retrieve all library albums from Youtube Music.""" + data = {"browseId": "FEmusic_liked_albums"} + response = await self._post_data(endpoint="browse", data=data) + response_albums = response["contents"]["singleColumnBrowseResultsRenderer"][ + "tabs" + ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ + "itemSectionRenderer" + ][ + "contents" + ][ + 0 + ][ + "gridRenderer" + ][ + "items" + ] + parsed_albums = ytmusicapi.parsers.library.parse_albums(response_albums) + for item in parsed_albums: + if item["type"] == "Single": + album_type = AlbumType.SINGLE + elif item["type"] == "EP": + album_type = AlbumType.EP + elif item["type"] == "Album": + album_type = AlbumType.ALBUM + else: + album_type = AlbumType.UNKNOWN + album = Album( + item_id=item["browseId"], + name=item["title"], + album_type=album_type, + provider=self.type, + ) + if item["year"].isdigit(): + album.year = item["year"] + artists = [] + for artist in item["artists"]: + artist_id = artist["id"] + if not artist_id: + artist_id = "ytm_va" + album_artist = Artist( + item_id=artist_id, name=artist["name"], provider=self.type + ) + album_artist.add_provider_id( + MediaItemProviderId( + item_id=str(artist["id"]), prov_type=self.type, prov_id=self.id + ) + ) + artists.append(album_artist) + album.artists = artists + album.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) + for thumb in item["thumbnails"] + ] + album.add_provider_id( + MediaItemProviderId( + item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id + ) + ) + yield album + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Retrieve all library playlists from the provider.""" + data = {"browseId": "FEmusic_liked_playlists"} + response = await self._post_data(endpoint="browse", data=data) + response_playlists = response["contents"]["singleColumnBrowseResultsRenderer"][ + "tabs" + ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ + "itemSectionRenderer" + ][ + "contents" + ][ + 0 + ][ + "gridRenderer" + ] + playlists = ytmusicapi.parsers.browsing.parse_content_list( + response_playlists["items"][1:], ytmusicapi.parsers.browsing.parse_playlist + ) + for item in playlists: + playlist = Playlist( + item_id=str(item["playlistId"]), provider=self.type, name=item["title"] + ) + playlist.metadata.description = item["description"] + playlist.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) + for thumb in item["thumbnails"] + ] + playlist.add_provider_id( + MediaItemProviderId( + item_id=str(item["playlistId"]), + prov_type=self.type, + prov_id=self.id, + ) + ) + yield playlist + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """Retrieve library tracks from Youtube Music.""" + data = {"browseId": "FEmusic_liked_videos"} + response = await self._post_data(endpoint="browse", data=data) + parsed_tracks = ytmusicapi.parsers.library.parse_library_songs(response)[ + "parsed" + ] + for item in parsed_tracks: + track = await self.get_track(item["videoId"]) + album = await self.get_album(item["album"]["id"]) + track.album = album + track.metadata.explicit = item["isExplicit"] + track.add_provider_id( + MediaItemProviderId( + item_id=str(item["videoId"]), + prov_type=self.type, + prov_id=self.id, + ) + ) + yield track + + async def get_album(self, prov_album_id) -> Album: + """Get full album details by id.""" + data = {"browseId": prov_album_id} + album_obj = await self._post_data(endpoint="browse", data=data) + return ( + await self._parse_album(album_obj=album_obj, album_id=prov_album_id) + if album_obj + else None + ) + + async def get_album_tracks(self, prov_album_id: str) -> List[Track]: + """Get album tracks for given album id.""" + data = {"browseId": prov_album_id} + album_obj = await self._post_data(endpoint="browse", data=data) + parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) + album_playlist_id = parsed_album["audioPlaylistId"] + return await self.get_playlist_tracks(album_playlist_id) + + async def get_artist(self, prov_artist_id) -> Artist: + """Get full artist details by id.""" + data = {"browseId": prov_artist_id} + artist_obj = await self._post_data(endpoint="browse", data=data) + return ( + await self._parse_artist(artist_obj=artist_obj, artist_id=prov_artist_id) + if artist_obj + else None + ) + + async def get_track(self, prov_track_id) -> Track: + """Get full track details by id.""" + signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1 + data = { + "playbackContext": { + "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} + }, + "video_id": prov_track_id, + } + track_obj = await self._post_data("player", data=data) + return await self._parse_track(track_obj) if track_obj else None + + async def get_playlist(self, prov_playlist_id) -> Playlist: + """Get full playlist details by id.""" + browse_id = ( + "VL" + prov_playlist_id + if not prov_playlist_id.startswith("VL") + else prov_playlist_id + ) + data = {"browseId": browse_id} + playlist_response = await self._post_data("browse", data=data) + return await self._parse_playlist(playlist_response) + + async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: + """Get all playlist tracks for given playlist id.""" + browse_id = ( + "VL" + prov_playlist_id + if not prov_playlist_id.startswith("VL") + else prov_playlist_id + ) + data = {"browseId": browse_id} + playlist_obj = await self._post_data("browse", data=data) + tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][ + 0 + ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ + "musicPlaylistShelfRenderer" + ][ + "contents" + ] + return [ + await self.get_track( + track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"] + ) + for track in tracks + if "playlistItemData" in track["musicResponsiveListItemRenderer"] + ] + + async def get_artist_albums(self, prov_artist_id) -> List[Album]: + """Get a list of albums for the given artist.""" + data = {"browseId": prov_artist_id} + response = await self._post_data("browse", data=data) + album_response = response["contents"]["singleColumnBrowseResultsRenderer"][ + "tabs" + ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][ + "musicCarouselShelfRenderer" + ][ + "contents" + ] + albums = [] + for item in album_response: + album_id = item["musicTwoRowItemRenderer"]["navigationEndpoint"][ + "browseEndpoint" + ]["browseId"] + album_name = item["musicTwoRowItemRenderer"]["title"]["runs"][0]["text"] + thumbnails = item["musicTwoRowItemRenderer"]["thumbnailRenderer"][ + "musicThumbnailRenderer" + ]["thumbnail"]["thumbnails"] + album = album = Album( + item_id=album_id, + name=album_name, + provider=self.type, + ) + album.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails + ] + album.add_provider_id( + MediaItemProviderId( + item_id=str(album_id), prov_type=self.type, prov_id=self.id + ) + ) + albums.append(album) + return albums + + async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: + """Get a list of 5 most popular tracks for the given artist.""" + data = {"browseId": prov_artist_id} + response = await self._post_data("browse", data=data) + # Check if we are dealing with an actual artist with songs, rather than a user + if ( + "musicShelfRenderer" + in response["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0][ + "tabRenderer" + ]["content"]["sectionListRenderer"]["contents"][0] + ): + songs_response = response["contents"]["singleColumnBrowseResultsRenderer"][ + "tabs" + ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ + "musicShelfRenderer" + ][ + "contents" + ] + return [ + await self.get_track( + prov_track_id=song["musicResponsiveListItemRenderer"]["overlay"][ + "musicItemThumbnailOverlayRenderer" + ]["content"]["musicPlayButtonRenderer"]["playNavigationEndpoint"][ + "watchEndpoint" + ][ + "videoId" + ] + ) + for song in songs_response + ] + return [] + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Return the content details for the given track when it will be streamed.""" + signature_timestamp = await self._get_signature_timestamp() + data = { + "playbackContext": { + "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} + }, + "video_id": item_id, + } + track_obj = await self._post_data("player", data=data) + stream_format = await self._parse_stream_format(track_obj) + url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id) + return StreamDetails( + provider=self.type, + item_id=item_id, + data=url, + content_type=ContentType.try_parse(stream_format["mimeType"]), + ) + + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + async for chunk in get_http_stream( + self.mass, streamdetails.data, streamdetails, seek_position + ): + yield chunk + + async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs): + url = f"{YTM_BASE_URL}{endpoint}" + data.update(self._context) + async with self.mass.http_session.post( + url, + headers=self._headers, + json=data, + verify_ssl=False, + cookies=self._cookies, + ) as response: + return await response.json() + + async def _get_data(self, url: str, params: Dict = None): + async with self.mass.http_session.get( + url, headers=self._headers, params=params, cookies=self._cookies + ) as response: + return await response.text() + + async def _initialize_headers(self, cookie: str) -> Dict[str, str]: + """Return headers to include in the requests.""" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.5", + "Content-Type": "application/json", + "X-Goog-AuthUser": "0", + "x-origin": "https://music.youtube.com", + "Cookie": cookie, + } + sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie) + origin = headers.get("origin", headers.get("x-origin")) + headers["Authorization"] = ytmusicapi.helpers.get_authorization( + sapisid + " " + origin + ) + self._headers = headers + + async def _initialize_context(self) -> Dict[str, str]: + """Return a dict to use as a context in requests.""" + self._context = { + "context": { + "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"}, + "user": {}, + } + } + + async def _parse_album(self, album_obj: dict, album_id: str) -> Album: + """Parse a YT Album response to an Album model object.""" + parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) + album = Album( + item_id=album_id, + name=parsed_album["title"], + album_type=AlbumType.ALBUM, + provider=self.type, + ) + if parsed_album["year"].isdigit(): + album.year = parsed_album["year"] + images = [] + for thumb in parsed_album["thumbnails"]: + images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) + album.metadata.images = images + if "description" in parsed_album: + album.metadata.description = unquote(parsed_album["description"]) + artists = [] + for parsed_artist in parsed_album["artists"]: + if parsed_artist["id"]: + artist_id = parsed_artist["id"] + if not artist_id: + artist_id = "ytm_va" + artist = Artist( + item_id=artist_id, + provider=self.type, + name=parsed_artist["name"], + ) + artist.add_provider_id( + MediaItemProviderId( + item_id=str(parsed_artist["id"]), + prov_type=self.type, + prov_id=self.id, + ) + ) + artists.append(artist) + album.artists = artists + album.add_provider_id( + MediaItemProviderId( + item_id=str(album_id), prov_type=self.type, prov_id=self.id + ) + ) + return album + + async def _parse_artist(self, artist_obj: dict, artist_id: str) -> Artist: + """Parse a YT Artist response to Artist model object.""" + if "musicImmersiveHeaderRenderer" in artist_obj["header"]: + # Actual artist + name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"][ + "runs" + ][0]["text"] + artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][ + "subscriptionButton" + ]["subscribeButtonRenderer"]["channelId"] + artist = Artist(item_id=str(artist_id), provider=self.type, name=name) + if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]: + artist.metadata.description = unquote( + artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"][ + "runs" + ][0]["text"] + ) + images = [] + if "thumbnail" in artist_obj["header"]["musicImmersiveHeaderRenderer"]: + for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"][ + "thumbnail" + ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]: + images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) + artist.metadata.images = images + artist.add_provider_id( + MediaItemProviderId( + item_id=str(artist_id), prov_type=self.type, prov_id=self.id + ) + ) + return artist + if "musicVisualHeaderRenderer" in artist_obj["header"]: + # Artist that is actually a user + name = artist_obj["header"]["musicVisualHeaderRenderer"]["title"]["runs"][ + 0 + ]["text"] + artist = Artist(item_id=str(artist_id), name=name, provider=self.type) + if "thumbnail" in artist_obj["header"]["musicVisualHeaderRenderer"]: + thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][ + "thumbnail" + ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"] + elif ( + "foregroundThumbnail" + in artist_obj["header"]["musicVisualHeaderRenderer"] + ): + thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][ + "foregroundThumbnail" + ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"] + artist.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails + ] + artist.add_provider_id( + MediaItemProviderId( + item_id=str(artist_id), prov_type=self.type, prov_id=self.id + ) + ) + return artist + + async def _parse_playlist(self, playlist_response: dict) -> Playlist: + """Parse a YT Playlist response to a Playlist object.""" + playlist_id = playlist_response["contents"][ + "singleColumnBrowseResultsRenderer" + ]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ + "musicPlaylistShelfRenderer" + ][ + "playlistId" + ] + own_playlist = ( + "musicEditablePlaylistDetailHeaderRenderer" in playlist_response["header"] + ) + if own_playlist: + playlist_response = playlist_response["header"][ + "musicEditablePlaylistDetailHeaderRenderer" + ] + title = playlist_response["header"]["musicDetailHeaderRenderer"]["title"][ + "runs" + ][0]["text"] + thumbnails = playlist_response["header"]["musicDetailHeaderRenderer"][ + "thumbnail" + ]["croppedSquareThumbnailRenderer"]["thumbnail"]["thumbnails"] + playlist = Playlist(item_id=str(playlist_id), provider=self.type, name=title) + if "description" in playlist_response["header"]["musicDetailHeaderRenderer"]: + playlist.metadata.description = playlist_response["header"][ + "musicDetailHeaderRenderer" + ]["description"]["runs"][0]["text"] + playlist.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails + ] + playlist.add_provider_id( + MediaItemProviderId( + item_id=str(playlist_id), prov_type=self.type, prov_id=self.id + ) + ) + return playlist + + async def _parse_track(self, track_obj: dict) -> Track: + """Parse a YT Track response to a Track model object.""" + track = Track( + item_id=track_obj["videoDetails"]["videoId"], + provider=self.type, + name=track_obj["videoDetails"]["title"], + duration=track_obj["videoDetails"]["lengthSeconds"], + ) + artist_id = track_obj["microformat"]["microformatDataRenderer"][ + "pageOwnerDetails" + ]["externalChannelId"] + if artist_id == "UCUTXlgdcKU5vfzFqHOWIvkA": + artist = Artist( + item_id=artist_id, name="Various Artists", provider=self.type + ) + artist.add_provider_id( + MediaItemProviderId( + item_id=str(artist_id), prov_type=self.type, prov_id=self.id + ) + ) + else: + artist = await self.get_artist(artist_id) + track.artists = [artist] + track.metadata.images = [ + MediaItemImage(ImageType.THUMB, thumb["url"]) + for thumb in track_obj["microformat"]["microformatDataRenderer"][ + "thumbnail" + ]["thumbnails"] + ] + available = False + if track_obj["playabilityStatus"]["status"] == "OK": + available = True + track.add_provider_id( + MediaItemProviderId( + item_id=str(track_obj["videoDetails"]["videoId"]), + prov_type=self.type, + prov_id=self.id, + available=available, + ) + ) + return track + + async def _get_signature_timestamp(self): + """Get a signature timestamp required to generate valid stream URLs.""" + response = await self._get_data(url=YTM_DOMAIN) + match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response) + if match is None: + raise Exception("Could not identify the URL for base.js player.") + url = YTM_DOMAIN + match.group(1) + response = await self._get_data(url=url) + match = re.search(r"signatureTimestamp[:=](\d+)", response) + if match is None: + raise Exception("Unable to identify the signatureTimestamp.") + return int(match.group(1)) + + async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str: + """Figure out the stream URL to use based on the YT track object.""" + cipher_parts = {} + for part in stream_format["signatureCipher"].split("&"): + key, val = part.split("=", maxsplit=1) + cipher_parts[key] = unquote(val) + signature = await self._decipher_signature( + ciphered_signature=cipher_parts["s"], item_id=item_id + ) + url = cipher_parts["url"] + "&sig=" + signature + return url + + @classmethod + async def _parse_stream_format(cls, track_obj: dict) -> dict: + """Grab the highes available audio stream from available streams.""" + stream_format = {} + quality_mapper = { + "AUDIO_QUALITY_LOW": 1, + "AUDIO_QUALITY_MEDIUM": 2, + "AUDIO_QUALITY_HIGH": 3, + } + for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]: + if adaptive_format["mimeType"].startswith("audio") and ( + not stream_format + or quality_mapper.get(adaptive_format["audioQuality"], 0) + > quality_mapper.get(stream_format["audioQuality"], 0) + ): + stream_format = adaptive_format + if stream_format is None: + raise MediaNotFoundError("No stream found for this track") + return stream_format + + async def _decipher_signature(self, ciphered_signature: str, item_id: str): + """Decipher the signature, required to build the Stream URL.""" + + def _decipher(): + embed_url = f"https://www.youtube.com/embed/{item_id}" + embed_html = pytube.request.get(embed_url) + js_url = pytube.extract.js_url(embed_html) + ytm_js = pytube.request.get(js_url) + cipher = pytube.cipher.Cipher(js=ytm_js) + return cipher.get_signature(ciphered_signature) + + return await self.mass.loop.run_in_executor(None, _decipher) diff --git a/requirements.txt b/requirements.txt index 29218d4a..323c5866 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,5 @@ pillow>=8.0,<=9.2.0 unidecode>=1.0,<=1.3.4 mashumaro>=3.0,<=3.1 xmltodict>=0.12.0,<=0.13.0 +ytmusicapi>=0.22.0,<=0.23.0 +pytube>=12.1.0,<=12.2.0 -- 2.34.1