Add YoutubeMusic MusicProvider (#397)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Thu, 7 Jul 2022 20:23:11 +0000 (22:23 +0200)
committerGitHub <noreply@github.com>
Thu, 7 Jul 2022 20:23:11 +0000 (22:23 +0200)
Co-authored-by: Marvin Schenkel <marvin@MacBook-Pro.local>
Co-authored-by: Marcel van der Veldt <m.vanderveldt@outlook.com>
.vscode/settings.json
examples/full.py
music_assistant/controllers/metadata/__init__.py
music_assistant/controllers/music/__init__.py
music_assistant/controllers/music/artists.py
music_assistant/models/enums.py
music_assistant/models/music_provider.py
music_assistant/music_providers/ytmusic.py [new file with mode: 0644]
requirements.txt

index d63891a61df84fa04c043faee19c018c21b40c11..0460b1ae76c66f7bb1d1988d67fa61f26f787eb4 100644 (file)
@@ -6,4 +6,9 @@
     "python.linting.flake8Enabled": true,
     "python.linting.flake8Args": ["--config=${workspaceFolder}/setup.cfg"],
     "python.linting.mypyEnabled": false,
+    "python.testing.pytestArgs": [
+        "tests"
+    ],
+    "python.testing.unittestEnabled": false,
+    "python.testing.pytestEnabled": true,
 }
index af14765c9e0654c53afaebbbd107631395b32a19..c4d62e699026f694acb32b2a521db78b38f1ca55 100644 (file)
@@ -48,6 +48,16 @@ parser.add_argument(
     required=False,
     help="Directory on disk for local music library",
 )
+parser.add_argument(
+    "--ytmusic-username",
+    required=False,
+    help="YoutubeMusic username",
+)
+parser.add_argument(
+    "--ytmusic-cookie",
+    required=False,
+    help="YoutubeMusic cookie",
+)
 parser.add_argument(
     "--debug",
     action="store_true",
@@ -102,6 +112,15 @@ if args.tunein_username:
             username=args.tunein_username,
         )
     )
+
+if args.ytmusic_username and args.ytmusic_cookie:
+    mass_conf.providers.append(
+        MusicProviderConfig(
+            ProviderType.YTMUSIC,
+            username=args.ytmusic_username,
+            password=args.ytmusic_cookie,
+        )
+    )
 if args.musicdir:
     mass_conf.providers.append(
         MusicProviderConfig(type=ProviderType.FILESYSTEM_LOCAL, path=args.musicdir)
@@ -188,8 +207,8 @@ async def main():
         print(f"Got {track_count} tracks ({track_count_lib} in library)")
         radio_count = await mass.music.radio.count(True)
         print(f"Got {radio_count} radio stations in library")
-        playlist_count = await mass.music.playlists.db_items(True)
-        print(f"Got {len(playlist_count)} playlists in library")
+        playlists = await mass.music.playlists.db_items(True)
+        print(f"Got {len(playlists)} playlists in library")
         # register a player
         test_player1 = TestPlayer("test1")
         test_player2 = TestPlayer("test2")
@@ -204,8 +223,8 @@ async def main():
         # we can also send an uri, such as spotify://track/abcdfefgh
         # or database://playlist/1
         # or a list of items
-        artist = await mass.music.artists.get("2", ProviderType.DATABASE)
-        await test_player1.active_queue.play_media(artist)
+        if len(playlists) > 0:
+            await test_player1.active_queue.play_media(playlists[0])
 
         await asyncio.sleep(3600)
 
index 1097286e3f3c9c315a913be2a8f36061a0ddaa4d..39afe618c255846373542e93770077982f6633d5 100755 (executable)
@@ -104,7 +104,7 @@ class MetaDataController:
         ):
             if track.metadata.genres:
                 playlist.metadata.genres.update(track.metadata.genres)
-            elif track.album.metadata.genres:
+            elif track.album and track.album.metadata.genres:
                 playlist.metadata.genres.update(track.album.metadata.genres)
         # TODO: create mosaic thumb/fanart from playlist tracks
         playlist.metadata.last_refresh = int(time())
index 092469e6a46a0aaf7a064fbc0ebc4e61a6f353b8..81073cdefb426a23ae3deb4bdc63b61f8d08db4e 100755 (executable)
@@ -37,6 +37,7 @@ from music_assistant.music_providers.spotify import SpotifyProvider
 from music_assistant.music_providers.tunein import TuneInProvider
 from music_assistant.music_providers.url import PROVIDER_CONFIG as URL_CONFIG
 from music_assistant.music_providers.url import URLProvider
+from music_assistant.music_providers.ytmusic import YoutubeMusicProvider
 
 if TYPE_CHECKING:
     from music_assistant.mass import MusicAssistant
@@ -46,6 +47,7 @@ PROV_MAP = {
     ProviderType.SPOTIFY: SpotifyProvider,
     ProviderType.QOBUZ: QobuzProvider,
     ProviderType.TUNEIN: TuneInProvider,
+    ProviderType.YTMUSIC: YoutubeMusicProvider,
 }
 
 
index 2a1fb7b5219c05d3bd3c74420c42804008ecdf74..be390e02cc91cf7ac1ebacaa5c247cdf5c5c5a3d 100644 (file)
@@ -176,7 +176,7 @@ class ArtistsController(MediaControllerBase[Artist]):
         self, item: Artist, overwrite_existing: bool = False
     ) -> Artist:
         """Add a new item record to the database."""
-        assert item.provider_ids, "Album is missing provider id(s)"
+        assert item.provider_ids, "Artist is missing provider id(s)"
         # always try to grab existing item by musicbrainz_id
         cur_item = None
         if item.musicbrainz_id:
index 5312f82d4609cbca077e407bd4aa553806d533e0..a8fb7a0c93a0319987f5c230bcb4e78e2abf1e9c 100644 (file)
@@ -68,6 +68,7 @@ class AlbumType(Enum):
     ALBUM = "album"
     SINGLE = "single"
     COMPILATION = "compilation"
+    EP = "ep"
     UNKNOWN = "unknown"
 
 
@@ -96,17 +97,20 @@ class ContentType(Enum):
     def try_parse(cls: "ContentType", string: str) -> "ContentType":
         """Try to parse ContentType from (url)string/extension."""
         tempstr = string.lower()
-        if "." in tempstr:
-            tempstr = tempstr.split(".")[-1]
-        if "," in tempstr:
-            for val in tempstr.split(","):
-                try:
-                    return cls(val.strip())
-                except ValueError:
-                    pass
+        if "audio/" in tempstr:
+            tempstr = tempstr.split("/")[1]
+        for splitter in (".", ","):
+            if splitter in tempstr:
+                for val in tempstr.split(splitter):
+                    try:
+                        return cls(val.strip())
+                    except ValueError:
+                        pass
 
         tempstr = tempstr.split("?")[0]
         tempstr = tempstr.split("&")[0]
+        tempstr = tempstr.split(";")[0]
+        tempstr = tempstr.replace("mp4", "m4a")
         try:
             return cls(tempstr)
         except ValueError:
@@ -209,6 +213,7 @@ class ProviderType(Enum):
     SPOTIFY = "spotify"
     QOBUZ = "qobuz"
     TUNEIN = "tunein"
+    YTMUSIC = "ytmusic"
     DATABASE = "database"  # internal only
     URL = "url"  # internal only
 
index 936e24da7c04b1db78cc41122feac418b6480a33..b4f8f7cb6801d49b64bdd790c7a3803142c91603 100644 (file)
@@ -321,6 +321,7 @@ class MusicProvider:
                 )
                 if not db_item:
                     # dump the item in the db, rich metadata is lazy loaded later
+                    print(prov_item)
                     db_item = await controller.add_db_item(prov_item)
                 elif (
                     db_item.metadata.checksum and prov_item.metadata.checksum
diff --git a/music_assistant/music_providers/ytmusic.py b/music_assistant/music_providers/ytmusic.py
new file mode 100644 (file)
index 0000000..d976efa
--- /dev/null
@@ -0,0 +1,721 @@
+"""Youtube Music support for MusicAssistant."""
+import re
+from datetime import date
+from typing import AsyncGenerator, Dict, List, Optional
+from urllib.parse import unquote
+
+import pytube
+import ytmusicapi
+
+from music_assistant.helpers.audio import get_http_stream
+from music_assistant.models.enums import ProviderType
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.models.media_items import (
+    Album,
+    AlbumType,
+    Artist,
+    ContentType,
+    ImageType,
+    MediaItemImage,
+    MediaItemProviderId,
+    MediaItemType,
+    MediaType,
+    Playlist,
+    StreamDetails,
+    Track,
+)
+from music_assistant.models.music_provider import MusicProvider
+
+YTM_DOMAIN = "https://music.youtube.com"
+YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
+
+
+class YoutubeMusicProvider(MusicProvider):
+    """Provider for Youtube Music."""
+
+    _attr_type = ProviderType.YTMUSIC
+    _attr_name = "YTMusic"
+    _attr_supported_mediatypes = [
+        MediaType.ARTIST,
+        MediaType.ALBUM,
+        MediaType.TRACK,
+        MediaType.PLAYLIST,
+    ]
+    _headers = None
+    _context = None
+    _cookies = None
+
+    async def setup(self) -> bool:
+        """Set up the YTMusic provider."""
+        if not self.config.enabled:
+            return False
+        if not self.config.username or not self.config.password:
+            raise LoginFailed("Invalid login credentials")
+        await self._initialize_headers(cookie=self.config.password)
+        await self._initialize_context()
+        self._cookies = {"CONSENT": "YES+1"}
+        return True
+
+    async def search(
+        self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
+    ) -> List[MediaItemType]:
+        """
+        Perform search on musicprovider.
+
+            :param search_query: Search query.
+            :param media_types: A list of media_types to include. All types if None.
+            :param limit: Number of items to return in the search (per type).
+        """
+        data = {"query": search_query}
+        ytm_filter = None
+        if len(media_types) == 1:
+            # YTM does not support multiple searchtypes, falls back to all if no type given
+            if media_types[0] == MediaType.ARTIST:
+                ytm_filter = "artists"
+            if media_types[0] == MediaType.ALBUM:
+                ytm_filter = "albums"
+            if media_types[0] == MediaType.TRACK:
+                ytm_filter = "songs"
+            if media_types[0] == MediaType.PLAYLIST:
+                ytm_filter = "playlists"
+        params = ytmusicapi.parsers.search_params.get_search_params(
+            filter=ytm_filter, scope=None, ignore_spelling=False
+        )
+        data["params"] = params
+        search_results = await self._post_data(endpoint="search", data=data)
+        if "tabbedSearchResultsRenderer" in search_results["contents"]:
+            results = search_results["contents"]["tabbedSearchResultsRenderer"]["tabs"][
+                0
+            ]["tabRenderer"]["content"]
+        else:
+            results = search_results["contents"]
+        parsed_results = []
+        for result in results["sectionListRenderer"]["contents"]:
+            if "musicShelfRenderer" in result:
+                category = result["musicShelfRenderer"]["title"]["runs"][0]["text"]
+                if category == "Artists":
+                    for artist in result["musicShelfRenderer"]["contents"]:
+                        artist_id = artist["musicResponsiveListItemRenderer"][
+                            "navigationEndpoint"
+                        ]["browseEndpoint"]["browseId"]
+                        parsed_results.append(await self.get_artist(artist_id))
+                elif category == "Albums":
+                    for album in result["musicShelfRenderer"]["contents"]:
+                        album_id = album["musicResponsiveListItemRenderer"][
+                            "navigationEndpoint"
+                        ]["browseEndpoint"]["browseId"]
+                        parsed_results.append(await self.get_album(album_id))
+                elif category == "Songs":
+                    for song in result["musicShelfRenderer"]["contents"]:
+                        song_id = song["musicResponsiveListItemRenderer"][
+                            "playlistItemData"
+                        ]["videoId"]
+                        parsed_results.append(await self.get_track(song_id))
+                else:
+                    print(category)
+        return parsed_results
+
+    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+        """Retrieve all library artists from Youtube Music."""
+        data = {"browseId": "FEmusic_library_corpus_track_artists"}
+        response = await self._post_data(endpoint="browse", data=data)
+        response_artist = response["contents"]["singleColumnBrowseResultsRenderer"][
+            "tabs"
+        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+            "itemSectionRenderer"
+        ][
+            "contents"
+        ][
+            0
+        ][
+            "musicShelfRenderer"
+        ][
+            "contents"
+        ]
+        parsed_artists = ytmusicapi.parsers.library.parse_artists(response_artist)
+        for item in parsed_artists:
+            artist = Artist(
+                item_id=item["browseId"], provider=self.type, name=item["artist"]
+            )
+            artist.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"])
+                for thumb in item["thumbnails"]
+            ]
+            artist.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id
+                )
+            )
+            yield artist
+
+    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+        """Retrieve all library albums from Youtube Music."""
+        data = {"browseId": "FEmusic_liked_albums"}
+        response = await self._post_data(endpoint="browse", data=data)
+        response_albums = response["contents"]["singleColumnBrowseResultsRenderer"][
+            "tabs"
+        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+            "itemSectionRenderer"
+        ][
+            "contents"
+        ][
+            0
+        ][
+            "gridRenderer"
+        ][
+            "items"
+        ]
+        parsed_albums = ytmusicapi.parsers.library.parse_albums(response_albums)
+        for item in parsed_albums:
+            if item["type"] == "Single":
+                album_type = AlbumType.SINGLE
+            elif item["type"] == "EP":
+                album_type = AlbumType.EP
+            elif item["type"] == "Album":
+                album_type = AlbumType.ALBUM
+            else:
+                album_type = AlbumType.UNKNOWN
+            album = Album(
+                item_id=item["browseId"],
+                name=item["title"],
+                album_type=album_type,
+                provider=self.type,
+            )
+            if item["year"].isdigit():
+                album.year = item["year"]
+            artists = []
+            for artist in item["artists"]:
+                artist_id = artist["id"]
+                if not artist_id:
+                    artist_id = "ytm_va"
+                album_artist = Artist(
+                    item_id=artist_id, name=artist["name"], provider=self.type
+                )
+                album_artist.add_provider_id(
+                    MediaItemProviderId(
+                        item_id=str(artist["id"]), prov_type=self.type, prov_id=self.id
+                    )
+                )
+                artists.append(album_artist)
+            album.artists = artists
+            album.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"])
+                for thumb in item["thumbnails"]
+            ]
+            album.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id
+                )
+            )
+            yield album
+
+    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+        """Retrieve all library playlists from the provider."""
+        data = {"browseId": "FEmusic_liked_playlists"}
+        response = await self._post_data(endpoint="browse", data=data)
+        response_playlists = response["contents"]["singleColumnBrowseResultsRenderer"][
+            "tabs"
+        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+            "itemSectionRenderer"
+        ][
+            "contents"
+        ][
+            0
+        ][
+            "gridRenderer"
+        ]
+        playlists = ytmusicapi.parsers.browsing.parse_content_list(
+            response_playlists["items"][1:], ytmusicapi.parsers.browsing.parse_playlist
+        )
+        for item in playlists:
+            playlist = Playlist(
+                item_id=str(item["playlistId"]), provider=self.type, name=item["title"]
+            )
+            playlist.metadata.description = item["description"]
+            playlist.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"])
+                for thumb in item["thumbnails"]
+            ]
+            playlist.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["playlistId"]),
+                    prov_type=self.type,
+                    prov_id=self.id,
+                )
+            )
+            yield playlist
+
+    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+        """Retrieve library tracks from Youtube Music."""
+        data = {"browseId": "FEmusic_liked_videos"}
+        response = await self._post_data(endpoint="browse", data=data)
+        parsed_tracks = ytmusicapi.parsers.library.parse_library_songs(response)[
+            "parsed"
+        ]
+        for item in parsed_tracks:
+            track = await self.get_track(item["videoId"])
+            album = await self.get_album(item["album"]["id"])
+            track.album = album
+            track.metadata.explicit = item["isExplicit"]
+            track.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(item["videoId"]),
+                    prov_type=self.type,
+                    prov_id=self.id,
+                )
+            )
+            yield track
+
+    async def get_album(self, prov_album_id) -> Album:
+        """Get full album details by id."""
+        data = {"browseId": prov_album_id}
+        album_obj = await self._post_data(endpoint="browse", data=data)
+        return (
+            await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
+            if album_obj
+            else None
+        )
+
+    async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
+        """Get album tracks for given album id."""
+        data = {"browseId": prov_album_id}
+        album_obj = await self._post_data(endpoint="browse", data=data)
+        parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
+        album_playlist_id = parsed_album["audioPlaylistId"]
+        return await self.get_playlist_tracks(album_playlist_id)
+
+    async def get_artist(self, prov_artist_id) -> Artist:
+        """Get full artist details by id."""
+        data = {"browseId": prov_artist_id}
+        artist_obj = await self._post_data(endpoint="browse", data=data)
+        return (
+            await self._parse_artist(artist_obj=artist_obj, artist_id=prov_artist_id)
+            if artist_obj
+            else None
+        )
+
+    async def get_track(self, prov_track_id) -> Track:
+        """Get full track details by id."""
+        signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1
+        data = {
+            "playbackContext": {
+                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
+            },
+            "video_id": prov_track_id,
+        }
+        track_obj = await self._post_data("player", data=data)
+        return await self._parse_track(track_obj) if track_obj else None
+
+    async def get_playlist(self, prov_playlist_id) -> Playlist:
+        """Get full playlist details by id."""
+        browse_id = (
+            "VL" + prov_playlist_id
+            if not prov_playlist_id.startswith("VL")
+            else prov_playlist_id
+        )
+        data = {"browseId": browse_id}
+        playlist_response = await self._post_data("browse", data=data)
+        return await self._parse_playlist(playlist_response)
+
+    async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
+        """Get all playlist tracks for given playlist id."""
+        browse_id = (
+            "VL" + prov_playlist_id
+            if not prov_playlist_id.startswith("VL")
+            else prov_playlist_id
+        )
+        data = {"browseId": browse_id}
+        playlist_obj = await self._post_data("browse", data=data)
+        tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][
+            0
+        ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
+            "musicPlaylistShelfRenderer"
+        ][
+            "contents"
+        ]
+        return [
+            await self.get_track(
+                track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
+            )
+            for track in tracks
+            if "playlistItemData" in track["musicResponsiveListItemRenderer"]
+        ]
+
+    async def get_artist_albums(self, prov_artist_id) -> List[Album]:
+        """Get a list of albums for the given artist."""
+        data = {"browseId": prov_artist_id}
+        response = await self._post_data("browse", data=data)
+        album_response = response["contents"]["singleColumnBrowseResultsRenderer"][
+            "tabs"
+        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+            "musicCarouselShelfRenderer"
+        ][
+            "contents"
+        ]
+        albums = []
+        for item in album_response:
+            album_id = item["musicTwoRowItemRenderer"]["navigationEndpoint"][
+                "browseEndpoint"
+            ]["browseId"]
+            album_name = item["musicTwoRowItemRenderer"]["title"]["runs"][0]["text"]
+            thumbnails = item["musicTwoRowItemRenderer"]["thumbnailRenderer"][
+                "musicThumbnailRenderer"
+            ]["thumbnail"]["thumbnails"]
+            album = album = Album(
+                item_id=album_id,
+                name=album_name,
+                provider=self.type,
+            )
+            album.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
+            ]
+            album.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(album_id), prov_type=self.type, prov_id=self.id
+                )
+            )
+            albums.append(album)
+        return albums
+
+    async def get_artist_toptracks(self, prov_artist_id) -> List[Track]:
+        """Get a list of 5 most popular tracks for the given artist."""
+        data = {"browseId": prov_artist_id}
+        response = await self._post_data("browse", data=data)
+        # Check if we are dealing with an actual artist with songs, rather than a user
+        if (
+            "musicShelfRenderer"
+            in response["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0][
+                "tabRenderer"
+            ]["content"]["sectionListRenderer"]["contents"][0]
+        ):
+            songs_response = response["contents"]["singleColumnBrowseResultsRenderer"][
+                "tabs"
+            ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
+                "musicShelfRenderer"
+            ][
+                "contents"
+            ]
+            return [
+                await self.get_track(
+                    prov_track_id=song["musicResponsiveListItemRenderer"]["overlay"][
+                        "musicItemThumbnailOverlayRenderer"
+                    ]["content"]["musicPlayButtonRenderer"]["playNavigationEndpoint"][
+                        "watchEndpoint"
+                    ][
+                        "videoId"
+                    ]
+                )
+                for song in songs_response
+            ]
+        return []
+
+    async def get_stream_details(self, item_id: str) -> StreamDetails:
+        """Return the content details for the given track when it will be streamed."""
+        signature_timestamp = await self._get_signature_timestamp()
+        data = {
+            "playbackContext": {
+                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
+            },
+            "video_id": item_id,
+        }
+        track_obj = await self._post_data("player", data=data)
+        stream_format = await self._parse_stream_format(track_obj)
+        url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
+        return StreamDetails(
+            provider=self.type,
+            item_id=item_id,
+            data=url,
+            content_type=ContentType.try_parse(stream_format["mimeType"]),
+        )
+
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        async for chunk in get_http_stream(
+            self.mass, streamdetails.data, streamdetails, seek_position
+        ):
+            yield chunk
+
+    async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
+        url = f"{YTM_BASE_URL}{endpoint}"
+        data.update(self._context)
+        async with self.mass.http_session.post(
+            url,
+            headers=self._headers,
+            json=data,
+            verify_ssl=False,
+            cookies=self._cookies,
+        ) as response:
+            return await response.json()
+
+    async def _get_data(self, url: str, params: Dict = None):
+        async with self.mass.http_session.get(
+            url, headers=self._headers, params=params, cookies=self._cookies
+        ) as response:
+            return await response.text()
+
+    async def _initialize_headers(self, cookie: str) -> Dict[str, str]:
+        """Return headers to include in the requests."""
+        headers = {
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
+            "Accept": "*/*",
+            "Accept-Language": "en-US,en;q=0.5",
+            "Content-Type": "application/json",
+            "X-Goog-AuthUser": "0",
+            "x-origin": "https://music.youtube.com",
+            "Cookie": cookie,
+        }
+        sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
+        origin = headers.get("origin", headers.get("x-origin"))
+        headers["Authorization"] = ytmusicapi.helpers.get_authorization(
+            sapisid + " " + origin
+        )
+        self._headers = headers
+
+    async def _initialize_context(self) -> Dict[str, str]:
+        """Return a dict to use as a context in requests."""
+        self._context = {
+            "context": {
+                "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
+                "user": {},
+            }
+        }
+
+    async def _parse_album(self, album_obj: dict, album_id: str) -> Album:
+        """Parse a YT Album response to an Album model object."""
+        parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
+        album = Album(
+            item_id=album_id,
+            name=parsed_album["title"],
+            album_type=AlbumType.ALBUM,
+            provider=self.type,
+        )
+        if parsed_album["year"].isdigit():
+            album.year = parsed_album["year"]
+        images = []
+        for thumb in parsed_album["thumbnails"]:
+            images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
+        album.metadata.images = images
+        if "description" in parsed_album:
+            album.metadata.description = unquote(parsed_album["description"])
+        artists = []
+        for parsed_artist in parsed_album["artists"]:
+            if parsed_artist["id"]:
+                artist_id = parsed_artist["id"]
+                if not artist_id:
+                    artist_id = "ytm_va"
+                artist = Artist(
+                    item_id=artist_id,
+                    provider=self.type,
+                    name=parsed_artist["name"],
+                )
+                artist.add_provider_id(
+                    MediaItemProviderId(
+                        item_id=str(parsed_artist["id"]),
+                        prov_type=self.type,
+                        prov_id=self.id,
+                    )
+                )
+                artists.append(artist)
+        album.artists = artists
+        album.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(album_id), prov_type=self.type, prov_id=self.id
+            )
+        )
+        return album
+
+    async def _parse_artist(self, artist_obj: dict, artist_id: str) -> Artist:
+        """Parse a YT Artist response to Artist model object."""
+        if "musicImmersiveHeaderRenderer" in artist_obj["header"]:
+            # Actual artist
+            name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"][
+                "runs"
+            ][0]["text"]
+            artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][
+                "subscriptionButton"
+            ]["subscribeButtonRenderer"]["channelId"]
+            artist = Artist(item_id=str(artist_id), provider=self.type, name=name)
+            if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
+                artist.metadata.description = unquote(
+                    artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"][
+                        "runs"
+                    ][0]["text"]
+                )
+            images = []
+            if "thumbnail" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
+                for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"][
+                    "thumbnail"
+                ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]:
+                    images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
+                artist.metadata.images = images
+            artist.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(artist_id), prov_type=self.type, prov_id=self.id
+                )
+            )
+            return artist
+        if "musicVisualHeaderRenderer" in artist_obj["header"]:
+            # Artist that is actually a user
+            name = artist_obj["header"]["musicVisualHeaderRenderer"]["title"]["runs"][
+                0
+            ]["text"]
+            artist = Artist(item_id=str(artist_id), name=name, provider=self.type)
+            if "thumbnail" in artist_obj["header"]["musicVisualHeaderRenderer"]:
+                thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][
+                    "thumbnail"
+                ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]
+            elif (
+                "foregroundThumbnail"
+                in artist_obj["header"]["musicVisualHeaderRenderer"]
+            ):
+                thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][
+                    "foregroundThumbnail"
+                ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]
+            artist.metadata.images = [
+                MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
+            ]
+            artist.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(artist_id), prov_type=self.type, prov_id=self.id
+                )
+            )
+            return artist
+
+    async def _parse_playlist(self, playlist_response: dict) -> Playlist:
+        """Parse a YT Playlist response to a Playlist object."""
+        playlist_id = playlist_response["contents"][
+            "singleColumnBrowseResultsRenderer"
+        ]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
+            "musicPlaylistShelfRenderer"
+        ][
+            "playlistId"
+        ]
+        own_playlist = (
+            "musicEditablePlaylistDetailHeaderRenderer" in playlist_response["header"]
+        )
+        if own_playlist:
+            playlist_response = playlist_response["header"][
+                "musicEditablePlaylistDetailHeaderRenderer"
+            ]
+        title = playlist_response["header"]["musicDetailHeaderRenderer"]["title"][
+            "runs"
+        ][0]["text"]
+        thumbnails = playlist_response["header"]["musicDetailHeaderRenderer"][
+            "thumbnail"
+        ]["croppedSquareThumbnailRenderer"]["thumbnail"]["thumbnails"]
+        playlist = Playlist(item_id=str(playlist_id), provider=self.type, name=title)
+        if "description" in playlist_response["header"]["musicDetailHeaderRenderer"]:
+            playlist.metadata.description = playlist_response["header"][
+                "musicDetailHeaderRenderer"
+            ]["description"]["runs"][0]["text"]
+        playlist.metadata.images = [
+            MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
+        ]
+        playlist.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(playlist_id), prov_type=self.type, prov_id=self.id
+            )
+        )
+        return playlist
+
+    async def _parse_track(self, track_obj: dict) -> Track:
+        """Parse a YT Track response to a Track model object."""
+        track = Track(
+            item_id=track_obj["videoDetails"]["videoId"],
+            provider=self.type,
+            name=track_obj["videoDetails"]["title"],
+            duration=track_obj["videoDetails"]["lengthSeconds"],
+        )
+        artist_id = track_obj["microformat"]["microformatDataRenderer"][
+            "pageOwnerDetails"
+        ]["externalChannelId"]
+        if artist_id == "UCUTXlgdcKU5vfzFqHOWIvkA":
+            artist = Artist(
+                item_id=artist_id, name="Various Artists", provider=self.type
+            )
+            artist.add_provider_id(
+                MediaItemProviderId(
+                    item_id=str(artist_id), prov_type=self.type, prov_id=self.id
+                )
+            )
+        else:
+            artist = await self.get_artist(artist_id)
+        track.artists = [artist]
+        track.metadata.images = [
+            MediaItemImage(ImageType.THUMB, thumb["url"])
+            for thumb in track_obj["microformat"]["microformatDataRenderer"][
+                "thumbnail"
+            ]["thumbnails"]
+        ]
+        available = False
+        if track_obj["playabilityStatus"]["status"] == "OK":
+            available = True
+        track.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(track_obj["videoDetails"]["videoId"]),
+                prov_type=self.type,
+                prov_id=self.id,
+                available=available,
+            )
+        )
+        return track
+
+    async def _get_signature_timestamp(self):
+        """Get a signature timestamp required to generate valid stream URLs."""
+        response = await self._get_data(url=YTM_DOMAIN)
+        match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
+        if match is None:
+            raise Exception("Could not identify the URL for base.js player.")
+        url = YTM_DOMAIN + match.group(1)
+        response = await self._get_data(url=url)
+        match = re.search(r"signatureTimestamp[:=](\d+)", response)
+        if match is None:
+            raise Exception("Unable to identify the signatureTimestamp.")
+        return int(match.group(1))
+
+    async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
+        """Figure out the stream URL to use based on the YT track object."""
+        cipher_parts = {}
+        for part in stream_format["signatureCipher"].split("&"):
+            key, val = part.split("=", maxsplit=1)
+            cipher_parts[key] = unquote(val)
+        signature = await self._decipher_signature(
+            ciphered_signature=cipher_parts["s"], item_id=item_id
+        )
+        url = cipher_parts["url"] + "&sig=" + signature
+        return url
+
+    @classmethod
+    async def _parse_stream_format(cls, track_obj: dict) -> dict:
+        """Grab the highes available audio stream from available streams."""
+        stream_format = {}
+        quality_mapper = {
+            "AUDIO_QUALITY_LOW": 1,
+            "AUDIO_QUALITY_MEDIUM": 2,
+            "AUDIO_QUALITY_HIGH": 3,
+        }
+        for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
+            if adaptive_format["mimeType"].startswith("audio") and (
+                not stream_format
+                or quality_mapper.get(adaptive_format["audioQuality"], 0)
+                > quality_mapper.get(stream_format["audioQuality"], 0)
+            ):
+                stream_format = adaptive_format
+        if stream_format is None:
+            raise MediaNotFoundError("No stream found for this track")
+        return stream_format
+
+    async def _decipher_signature(self, ciphered_signature: str, item_id: str):
+        """Decipher the signature, required to build the Stream URL."""
+
+        def _decipher():
+            embed_url = f"https://www.youtube.com/embed/{item_id}"
+            embed_html = pytube.request.get(embed_url)
+            js_url = pytube.extract.js_url(embed_html)
+            ytm_js = pytube.request.get(js_url)
+            cipher = pytube.cipher.Cipher(js=ytm_js)
+            return cipher.get_signature(ciphered_signature)
+
+        return await self.mass.loop.run_in_executor(None, _decipher)
index 29218d4a26c7ad24d04fd784dfd3697e58013a4d..323c5866bdb1994bd33b4478a774645612b68eda 100644 (file)
@@ -10,3 +10,5 @@ pillow>=8.0,<=9.2.0
 unidecode>=1.0,<=1.3.4
 mashumaro>=3.0,<=3.1
 xmltodict>=0.12.0,<=0.13.0
+ytmusicapi>=0.22.0,<=0.23.0
+pytube>=12.1.0,<=12.2.0