Add new YTM parser
authorMarvin Schenkel <marvinschenkel@gmail.com>
Fri, 8 Jul 2022 15:07:57 +0000 (17:07 +0200)
committerMarvin Schenkel <marvinschenkel@gmail.com>
Fri, 8 Jul 2022 15:07:57 +0000 (17:07 +0200)
music_assistant/controllers/music/__init__.py
music_assistant/music_providers/ytmusic.py [deleted file]
music_assistant/music_providers/ytmusic/__init__.py [new file with mode: 0644]
music_assistant/music_providers/ytmusic/helpers.py [new file with mode: 0644]
music_assistant/music_providers/ytmusic/ytmusic.py [new file with mode: 0644]

index 81073cdefb426a23ae3deb4bdc63b61f8d08db4e..7dcce7b0e85f7e73c651e41f9a57e26950c76eab 100755 (executable)
@@ -37,7 +37,7 @@ from music_assistant.music_providers.spotify import SpotifyProvider
 from music_assistant.music_providers.tunein import TuneInProvider
 from music_assistant.music_providers.url import PROVIDER_CONFIG as URL_CONFIG
 from music_assistant.music_providers.url import URLProvider
-from music_assistant.music_providers.ytmusic import YoutubeMusicProvider
+from music_assistant.music_providers.ytmusic.ytmusic import YoutubeMusicProvider
 
 if TYPE_CHECKING:
     from music_assistant.mass import MusicAssistant
diff --git a/music_assistant/music_providers/ytmusic.py b/music_assistant/music_providers/ytmusic.py
deleted file mode 100644 (file)
index 45d862b..0000000
+++ /dev/null
@@ -1,721 +0,0 @@
-"""Youtube Music support for MusicAssistant."""
-import re
-from datetime import date
-from typing import AsyncGenerator, Dict, List, Optional
-from urllib.parse import unquote
-
-import pytube
-import ytmusicapi
-
-from music_assistant.helpers.audio import get_http_stream
-from music_assistant.models.enums import ProviderType
-from music_assistant.models.errors import LoginFailed, MediaNotFoundError
-from music_assistant.models.media_items import (
-    Album,
-    AlbumType,
-    Artist,
-    ContentType,
-    ImageType,
-    MediaItemImage,
-    MediaItemProviderId,
-    MediaItemType,
-    MediaType,
-    Playlist,
-    StreamDetails,
-    Track,
-)
-from music_assistant.models.music_provider import MusicProvider
-
-YTM_DOMAIN = "https://music.youtube.com"
-YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
-
-
-class YoutubeMusicProvider(MusicProvider):
-    """Provider for Youtube Music."""
-
-    _attr_type = ProviderType.YTMUSIC
-    _attr_name = "Youtube Music"
-    _attr_supported_mediatypes = [
-        MediaType.ARTIST,
-        MediaType.ALBUM,
-        MediaType.TRACK,
-        MediaType.PLAYLIST,
-    ]
-    _headers = None
-    _context = None
-    _cookies = None
-
-    async def setup(self) -> bool:
-        """Set up the YTMusic provider."""
-        if not self.config.enabled:
-            return False
-        if not self.config.username or not self.config.password:
-            raise LoginFailed("Invalid login credentials")
-        await self._initialize_headers(cookie=self.config.password)
-        await self._initialize_context()
-        self._cookies = {"CONSENT": "YES+1"}
-        return True
-
-    async def search(
-        self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
-    ) -> List[MediaItemType]:
-        """
-        Perform search on musicprovider.
-
-            :param search_query: Search query.
-            :param media_types: A list of media_types to include. All types if None.
-            :param limit: Number of items to return in the search (per type).
-        """
-        data = {"query": search_query}
-        ytm_filter = None
-        if len(media_types) == 1:
-            # YTM does not support multiple searchtypes, falls back to all if no type given
-            if media_types[0] == MediaType.ARTIST:
-                ytm_filter = "artists"
-            if media_types[0] == MediaType.ALBUM:
-                ytm_filter = "albums"
-            if media_types[0] == MediaType.TRACK:
-                ytm_filter = "songs"
-            if media_types[0] == MediaType.PLAYLIST:
-                ytm_filter = "playlists"
-        params = ytmusicapi.parsers.search_params.get_search_params(
-            filter=ytm_filter, scope=None, ignore_spelling=False
-        )
-        data["params"] = params
-        search_results = await self._post_data(endpoint="search", data=data)
-        if "tabbedSearchResultsRenderer" in search_results["contents"]:
-            results = search_results["contents"]["tabbedSearchResultsRenderer"]["tabs"][
-                0
-            ]["tabRenderer"]["content"]
-        else:
-            results = search_results["contents"]
-        parsed_results = []
-        for result in results["sectionListRenderer"]["contents"]:
-            if "musicShelfRenderer" in result:
-                category = result["musicShelfRenderer"]["title"]["runs"][0]["text"]
-                if category == "Artists":
-                    for artist in result["musicShelfRenderer"]["contents"]:
-                        artist_id = artist["musicResponsiveListItemRenderer"][
-                            "navigationEndpoint"
-                        ]["browseEndpoint"]["browseId"]
-                        parsed_results.append(await self.get_artist(artist_id))
-                elif category == "Albums":
-                    for album in result["musicShelfRenderer"]["contents"]:
-                        album_id = album["musicResponsiveListItemRenderer"][
-                            "navigationEndpoint"
-                        ]["browseEndpoint"]["browseId"]
-                        parsed_results.append(await self.get_album(album_id))
-                elif category == "Songs":
-                    for song in result["musicShelfRenderer"]["contents"]:
-                        song_id = song["musicResponsiveListItemRenderer"][
-                            "playlistItemData"
-                        ]["videoId"]
-                        parsed_results.append(await self.get_track(song_id))
-                else:
-                    print(category)
-        return parsed_results
-
-    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
-        """Retrieve all library artists from Youtube Music."""
-        data = {"browseId": "FEmusic_library_corpus_track_artists"}
-        response = await self._post_data(endpoint="browse", data=data)
-        response_artist = response["contents"]["singleColumnBrowseResultsRenderer"][
-            "tabs"
-        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
-            "itemSectionRenderer"
-        ][
-            "contents"
-        ][
-            0
-        ][
-            "musicShelfRenderer"
-        ][
-            "contents"
-        ]
-        parsed_artists = ytmusicapi.parsers.library.parse_artists(response_artist)
-        for item in parsed_artists:
-            artist = Artist(
-                item_id=item["browseId"], provider=self.type, name=item["artist"]
-            )
-            artist.metadata.images = [
-                MediaItemImage(ImageType.THUMB, thumb["url"])
-                for thumb in item["thumbnails"]
-            ]
-            artist.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id
-                )
-            )
-            yield artist
-
-    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
-        """Retrieve all library albums from Youtube Music."""
-        data = {"browseId": "FEmusic_liked_albums"}
-        response = await self._post_data(endpoint="browse", data=data)
-        response_albums = response["contents"]["singleColumnBrowseResultsRenderer"][
-            "tabs"
-        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
-            "itemSectionRenderer"
-        ][
-            "contents"
-        ][
-            0
-        ][
-            "gridRenderer"
-        ][
-            "items"
-        ]
-        parsed_albums = ytmusicapi.parsers.library.parse_albums(response_albums)
-        for item in parsed_albums:
-            if item["type"] == "Single":
-                album_type = AlbumType.SINGLE
-            elif item["type"] == "EP":
-                album_type = AlbumType.EP
-            elif item["type"] == "Album":
-                album_type = AlbumType.ALBUM
-            else:
-                album_type = AlbumType.UNKNOWN
-            album = Album(
-                item_id=item["browseId"],
-                name=item["title"],
-                album_type=album_type,
-                provider=self.type,
-            )
-            if item["year"].isdigit():
-                album.year = item["year"]
-            artists = []
-            for artist in item["artists"]:
-                artist_id = artist["id"]
-                if not artist_id:
-                    artist_id = "ytm_va"
-                album_artist = Artist(
-                    item_id=artist_id, name=artist["name"], provider=self.type
-                )
-                album_artist.add_provider_id(
-                    MediaItemProviderId(
-                        item_id=str(artist["id"]), prov_type=self.type, prov_id=self.id
-                    )
-                )
-                artists.append(album_artist)
-            album.artists = artists
-            album.metadata.images = [
-                MediaItemImage(ImageType.THUMB, thumb["url"])
-                for thumb in item["thumbnails"]
-            ]
-            album.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id
-                )
-            )
-            yield album
-
-    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
-        """Retrieve all library playlists from the provider."""
-        data = {"browseId": "FEmusic_liked_playlists"}
-        response = await self._post_data(endpoint="browse", data=data)
-        response_playlists = response["contents"]["singleColumnBrowseResultsRenderer"][
-            "tabs"
-        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
-            "itemSectionRenderer"
-        ][
-            "contents"
-        ][
-            0
-        ][
-            "gridRenderer"
-        ]
-        playlists = ytmusicapi.parsers.browsing.parse_content_list(
-            response_playlists["items"][1:], ytmusicapi.parsers.browsing.parse_playlist
-        )
-        for item in playlists:
-            playlist = Playlist(
-                item_id=str(item["playlistId"]), provider=self.type, name=item["title"]
-            )
-            playlist.metadata.description = item["description"]
-            playlist.metadata.images = [
-                MediaItemImage(ImageType.THUMB, thumb["url"])
-                for thumb in item["thumbnails"]
-            ]
-            playlist.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(item["playlistId"]),
-                    prov_type=self.type,
-                    prov_id=self.id,
-                )
-            )
-            yield playlist
-
-    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
-        """Retrieve library tracks from Youtube Music."""
-        data = {"browseId": "FEmusic_liked_videos"}
-        response = await self._post_data(endpoint="browse", data=data)
-        parsed_tracks = ytmusicapi.parsers.library.parse_library_songs(response)[
-            "parsed"
-        ]
-        for item in parsed_tracks:
-            track = await self.get_track(item["videoId"])
-            album = await self.get_album(item["album"]["id"])
-            track.album = album
-            track.metadata.explicit = item["isExplicit"]
-            track.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(item["videoId"]),
-                    prov_type=self.type,
-                    prov_id=self.id,
-                )
-            )
-            yield track
-
-    async def get_album(self, prov_album_id) -> Album:
-        """Get full album details by id."""
-        data = {"browseId": prov_album_id}
-        album_obj = await self._post_data(endpoint="browse", data=data)
-        return (
-            await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
-            if album_obj
-            else None
-        )
-
-    async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
-        """Get album tracks for given album id."""
-        data = {"browseId": prov_album_id}
-        album_obj = await self._post_data(endpoint="browse", data=data)
-        parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
-        album_playlist_id = parsed_album["audioPlaylistId"]
-        return await self.get_playlist_tracks(album_playlist_id)
-
-    async def get_artist(self, prov_artist_id) -> Artist:
-        """Get full artist details by id."""
-        data = {"browseId": prov_artist_id}
-        artist_obj = await self._post_data(endpoint="browse", data=data)
-        return (
-            await self._parse_artist(artist_obj=artist_obj, artist_id=prov_artist_id)
-            if artist_obj
-            else None
-        )
-
-    async def get_track(self, prov_track_id) -> Track:
-        """Get full track details by id."""
-        signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1
-        data = {
-            "playbackContext": {
-                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
-            },
-            "video_id": prov_track_id,
-        }
-        track_obj = await self._post_data("player", data=data)
-        return await self._parse_track(track_obj) if track_obj else None
-
-    async def get_playlist(self, prov_playlist_id) -> Playlist:
-        """Get full playlist details by id."""
-        browse_id = (
-            "VL" + prov_playlist_id
-            if not prov_playlist_id.startswith("VL")
-            else prov_playlist_id
-        )
-        data = {"browseId": browse_id}
-        playlist_response = await self._post_data("browse", data=data)
-        return await self._parse_playlist(playlist_response)
-
-    async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
-        """Get all playlist tracks for given playlist id."""
-        browse_id = (
-            "VL" + prov_playlist_id
-            if not prov_playlist_id.startswith("VL")
-            else prov_playlist_id
-        )
-        data = {"browseId": browse_id}
-        playlist_obj = await self._post_data("browse", data=data)
-        tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][
-            0
-        ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
-            "musicPlaylistShelfRenderer"
-        ][
-            "contents"
-        ]
-        return [
-            await self.get_track(
-                track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
-            )
-            for track in tracks
-            if "playlistItemData" in track["musicResponsiveListItemRenderer"]
-        ]
-
-    async def get_artist_albums(self, prov_artist_id) -> List[Album]:
-        """Get a list of albums for the given artist."""
-        data = {"browseId": prov_artist_id}
-        response = await self._post_data("browse", data=data)
-        album_response = response["contents"]["singleColumnBrowseResultsRenderer"][
-            "tabs"
-        ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
-            "musicCarouselShelfRenderer"
-        ][
-            "contents"
-        ]
-        albums = []
-        for item in album_response:
-            album_id = item["musicTwoRowItemRenderer"]["navigationEndpoint"][
-                "browseEndpoint"
-            ]["browseId"]
-            album_name = item["musicTwoRowItemRenderer"]["title"]["runs"][0]["text"]
-            thumbnails = item["musicTwoRowItemRenderer"]["thumbnailRenderer"][
-                "musicThumbnailRenderer"
-            ]["thumbnail"]["thumbnails"]
-            album = album = Album(
-                item_id=album_id,
-                name=album_name,
-                provider=self.type,
-            )
-            album.metadata.images = [
-                MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
-            ]
-            album.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(album_id), prov_type=self.type, prov_id=self.id
-                )
-            )
-            albums.append(album)
-        return albums
-
-    async def get_artist_toptracks(self, prov_artist_id) -> List[Track]:
-        """Get a list of 5 most popular tracks for the given artist."""
-        data = {"browseId": prov_artist_id}
-        response = await self._post_data("browse", data=data)
-        # Check if we are dealing with an actual artist with songs, rather than a user
-        if (
-            "musicShelfRenderer"
-            in response["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0][
-                "tabRenderer"
-            ]["content"]["sectionListRenderer"]["contents"][0]
-        ):
-            songs_response = response["contents"]["singleColumnBrowseResultsRenderer"][
-                "tabs"
-            ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
-                "musicShelfRenderer"
-            ][
-                "contents"
-            ]
-            return [
-                await self.get_track(
-                    prov_track_id=song["musicResponsiveListItemRenderer"]["overlay"][
-                        "musicItemThumbnailOverlayRenderer"
-                    ]["content"]["musicPlayButtonRenderer"]["playNavigationEndpoint"][
-                        "watchEndpoint"
-                    ][
-                        "videoId"
-                    ]
-                )
-                for song in songs_response
-            ]
-        return []
-
-    async def get_stream_details(self, item_id: str) -> StreamDetails:
-        """Return the content details for the given track when it will be streamed."""
-        signature_timestamp = await self._get_signature_timestamp()
-        data = {
-            "playbackContext": {
-                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
-            },
-            "video_id": item_id,
-        }
-        track_obj = await self._post_data("player", data=data)
-        stream_format = await self._parse_stream_format(track_obj)
-        url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
-        return StreamDetails(
-            provider=self.type,
-            item_id=item_id,
-            data=url,
-            content_type=ContentType.try_parse(stream_format["mimeType"]),
-        )
-
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        async for chunk in get_http_stream(
-            self.mass, streamdetails.data, streamdetails, seek_position
-        ):
-            yield chunk
-
-    async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
-        url = f"{YTM_BASE_URL}{endpoint}"
-        data.update(self._context)
-        async with self.mass.http_session.post(
-            url,
-            headers=self._headers,
-            json=data,
-            verify_ssl=False,
-            cookies=self._cookies,
-        ) as response:
-            return await response.json()
-
-    async def _get_data(self, url: str, params: Dict = None):
-        async with self.mass.http_session.get(
-            url, headers=self._headers, params=params, cookies=self._cookies
-        ) as response:
-            return await response.text()
-
-    async def _initialize_headers(self, cookie: str) -> Dict[str, str]:
-        """Return headers to include in the requests."""
-        headers = {
-            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
-            "Accept": "*/*",
-            "Accept-Language": "en-US,en;q=0.5",
-            "Content-Type": "application/json",
-            "X-Goog-AuthUser": "0",
-            "x-origin": "https://music.youtube.com",
-            "Cookie": cookie,
-        }
-        sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
-        origin = headers.get("origin", headers.get("x-origin"))
-        headers["Authorization"] = ytmusicapi.helpers.get_authorization(
-            sapisid + " " + origin
-        )
-        self._headers = headers
-
-    async def _initialize_context(self) -> Dict[str, str]:
-        """Return a dict to use as a context in requests."""
-        self._context = {
-            "context": {
-                "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
-                "user": {},
-            }
-        }
-
-    async def _parse_album(self, album_obj: dict, album_id: str) -> Album:
-        """Parse a YT Album response to an Album model object."""
-        parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
-        album = Album(
-            item_id=album_id,
-            name=parsed_album["title"],
-            album_type=AlbumType.ALBUM,
-            provider=self.type,
-        )
-        if parsed_album["year"].isdigit():
-            album.year = parsed_album["year"]
-        images = []
-        for thumb in parsed_album["thumbnails"]:
-            images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
-        album.metadata.images = images
-        if "description" in parsed_album:
-            album.metadata.description = unquote(parsed_album["description"])
-        artists = []
-        for parsed_artist in parsed_album["artists"]:
-            if parsed_artist["id"]:
-                artist_id = parsed_artist["id"]
-                if not artist_id:
-                    artist_id = "ytm_va"
-                artist = Artist(
-                    item_id=artist_id,
-                    provider=self.type,
-                    name=parsed_artist["name"],
-                )
-                artist.add_provider_id(
-                    MediaItemProviderId(
-                        item_id=str(parsed_artist["id"]),
-                        prov_type=self.type,
-                        prov_id=self.id,
-                    )
-                )
-                artists.append(artist)
-        album.artists = artists
-        album.add_provider_id(
-            MediaItemProviderId(
-                item_id=str(album_id), prov_type=self.type, prov_id=self.id
-            )
-        )
-        return album
-
-    async def _parse_artist(self, artist_obj: dict, artist_id: str) -> Artist:
-        """Parse a YT Artist response to Artist model object."""
-        if "musicImmersiveHeaderRenderer" in artist_obj["header"]:
-            # Actual artist
-            name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"][
-                "runs"
-            ][0]["text"]
-            artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][
-                "subscriptionButton"
-            ]["subscribeButtonRenderer"]["channelId"]
-            artist = Artist(item_id=str(artist_id), provider=self.type, name=name)
-            if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
-                artist.metadata.description = unquote(
-                    artist_obj["header"]["musicImmersiveHeaderRenderer"]["description"][
-                        "runs"
-                    ][0]["text"]
-                )
-            images = []
-            if "thumbnail" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
-                for thumb in artist_obj["header"]["musicImmersiveHeaderRenderer"][
-                    "thumbnail"
-                ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]:
-                    images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
-                artist.metadata.images = images
-            artist.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(artist_id), prov_type=self.type, prov_id=self.id
-                )
-            )
-            return artist
-        if "musicVisualHeaderRenderer" in artist_obj["header"]:
-            # Artist that is actually a user
-            name = artist_obj["header"]["musicVisualHeaderRenderer"]["title"]["runs"][
-                0
-            ]["text"]
-            artist = Artist(item_id=str(artist_id), name=name, provider=self.type)
-            if "thumbnail" in artist_obj["header"]["musicVisualHeaderRenderer"]:
-                thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][
-                    "thumbnail"
-                ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]
-            elif (
-                "foregroundThumbnail"
-                in artist_obj["header"]["musicVisualHeaderRenderer"]
-            ):
-                thumbnails = artist_obj["header"]["musicVisualHeaderRenderer"][
-                    "foregroundThumbnail"
-                ]["musicThumbnailRenderer"]["thumbnail"]["thumbnails"]
-            artist.metadata.images = [
-                MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
-            ]
-            artist.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(artist_id), prov_type=self.type, prov_id=self.id
-                )
-            )
-            return artist
-
-    async def _parse_playlist(self, playlist_response: dict) -> Playlist:
-        """Parse a YT Playlist response to a Playlist object."""
-        playlist_id = playlist_response["contents"][
-            "singleColumnBrowseResultsRenderer"
-        ]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
-            "musicPlaylistShelfRenderer"
-        ][
-            "playlistId"
-        ]
-        own_playlist = (
-            "musicEditablePlaylistDetailHeaderRenderer" in playlist_response["header"]
-        )
-        if own_playlist:
-            playlist_response = playlist_response["header"][
-                "musicEditablePlaylistDetailHeaderRenderer"
-            ]
-        title = playlist_response["header"]["musicDetailHeaderRenderer"]["title"][
-            "runs"
-        ][0]["text"]
-        thumbnails = playlist_response["header"]["musicDetailHeaderRenderer"][
-            "thumbnail"
-        ]["croppedSquareThumbnailRenderer"]["thumbnail"]["thumbnails"]
-        playlist = Playlist(item_id=str(playlist_id), provider=self.type, name=title)
-        if "description" in playlist_response["header"]["musicDetailHeaderRenderer"]:
-            playlist.metadata.description = playlist_response["header"][
-                "musicDetailHeaderRenderer"
-            ]["description"]["runs"][0]["text"]
-        playlist.metadata.images = [
-            MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails
-        ]
-        playlist.add_provider_id(
-            MediaItemProviderId(
-                item_id=str(playlist_id), prov_type=self.type, prov_id=self.id
-            )
-        )
-        return playlist
-
-    async def _parse_track(self, track_obj: dict) -> Track:
-        """Parse a YT Track response to a Track model object."""
-        track = Track(
-            item_id=track_obj["videoDetails"]["videoId"],
-            provider=self.type,
-            name=track_obj["videoDetails"]["title"],
-            duration=track_obj["videoDetails"]["lengthSeconds"],
-        )
-        artist_id = track_obj["microformat"]["microformatDataRenderer"][
-            "pageOwnerDetails"
-        ]["externalChannelId"]
-        if artist_id == "UCUTXlgdcKU5vfzFqHOWIvkA":
-            artist = Artist(
-                item_id=artist_id, name="Various Artists", provider=self.type
-            )
-            artist.add_provider_id(
-                MediaItemProviderId(
-                    item_id=str(artist_id), prov_type=self.type, prov_id=self.id
-                )
-            )
-        else:
-            artist = await self.get_artist(artist_id)
-        track.artists = [artist]
-        track.metadata.images = [
-            MediaItemImage(ImageType.THUMB, thumb["url"])
-            for thumb in track_obj["microformat"]["microformatDataRenderer"][
-                "thumbnail"
-            ]["thumbnails"]
-        ]
-        available = False
-        if track_obj["playabilityStatus"]["status"] == "OK":
-            available = True
-        track.add_provider_id(
-            MediaItemProviderId(
-                item_id=str(track_obj["videoDetails"]["videoId"]),
-                prov_type=self.type,
-                prov_id=self.id,
-                available=available,
-            )
-        )
-        return track
-
-    async def _get_signature_timestamp(self):
-        """Get a signature timestamp required to generate valid stream URLs."""
-        response = await self._get_data(url=YTM_DOMAIN)
-        match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
-        if match is None:
-            raise Exception("Could not identify the URL for base.js player.")
-        url = YTM_DOMAIN + match.group(1)
-        response = await self._get_data(url=url)
-        match = re.search(r"signatureTimestamp[:=](\d+)", response)
-        if match is None:
-            raise Exception("Unable to identify the signatureTimestamp.")
-        return int(match.group(1))
-
-    async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
-        """Figure out the stream URL to use based on the YT track object."""
-        cipher_parts = {}
-        for part in stream_format["signatureCipher"].split("&"):
-            key, val = part.split("=", maxsplit=1)
-            cipher_parts[key] = unquote(val)
-        signature = await self._decipher_signature(
-            ciphered_signature=cipher_parts["s"], item_id=item_id
-        )
-        url = cipher_parts["url"] + "&sig=" + signature
-        return url
-
-    @classmethod
-    async def _parse_stream_format(cls, track_obj: dict) -> dict:
-        """Grab the highes available audio stream from available streams."""
-        stream_format = {}
-        quality_mapper = {
-            "AUDIO_QUALITY_LOW": 1,
-            "AUDIO_QUALITY_MEDIUM": 2,
-            "AUDIO_QUALITY_HIGH": 3,
-        }
-        for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
-            if adaptive_format["mimeType"].startswith("audio") and (
-                not stream_format
-                or quality_mapper.get(adaptive_format["audioQuality"], 0)
-                > quality_mapper.get(stream_format["audioQuality"], 0)
-            ):
-                stream_format = adaptive_format
-        if stream_format is None:
-            raise MediaNotFoundError("No stream found for this track")
-        return stream_format
-
-    async def _decipher_signature(self, ciphered_signature: str, item_id: str):
-        """Decipher the signature, required to build the Stream URL."""
-
-        def _decipher():
-            embed_url = f"https://www.youtube.com/embed/{item_id}"
-            embed_html = pytube.request.get(embed_url)
-            js_url = pytube.extract.js_url(embed_html)
-            ytm_js = pytube.request.get(js_url)
-            cipher = pytube.cipher.Cipher(js=ytm_js)
-            return cipher.get_signature(ciphered_signature)
-
-        return await self.mass.loop.run_in_executor(None, _decipher)
diff --git a/music_assistant/music_providers/ytmusic/__init__.py b/music_assistant/music_providers/ytmusic/__init__.py
new file mode 100644 (file)
index 0000000..67df505
--- /dev/null
@@ -0,0 +1 @@
+"""Package with Youtube Music provider."""
diff --git a/music_assistant/music_providers/ytmusic/helpers.py b/music_assistant/music_providers/ytmusic/helpers.py
new file mode 100644 (file)
index 0000000..1307f60
--- /dev/null
@@ -0,0 +1,155 @@
+"""
+Helper module for parsing the Youtube Music API.
+
+This helpers file is an async wrapper around the excellent ytmusicapi package.
+While the ytmusicapi package does an excellent job at parsing the Youtube Music results,
+it is unfortunately not async, which is required for Music Assistant to run smoothly.
+This also nicely separates the parsing logic from the Youtube Music provider logic.
+"""
+
+import asyncio
+import json
+from typing import Dict, List
+
+import ytmusicapi
+
+
+async def get_artist(prov_artist_id: str) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_artist function."""
+
+    def _get_artist():
+        ytm = ytmusicapi.YTMusic()
+        return ytm.get_artist(channelId=prov_artist_id)
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_artist)
+
+
+async def get_album(prov_album_id: str) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_album function."""
+
+    def _get_album():
+        ytm = ytmusicapi.YTMusic()
+        return ytm.get_album(browseId=prov_album_id)
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_album)
+
+
+async def get_playlist(prov_playlist_id: str) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_playlist function."""
+
+    def _get_playlist():
+        ytm = ytmusicapi.YTMusic()
+        return ytm.get_playlist(playlistId=prov_playlist_id)
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_playlist)
+
+
+async def get_track(prov_track_id: str) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_playlist function."""
+
+    def _get_song():
+        ytm = ytmusicapi.YTMusic()
+        track_obj = ytm.get_song(videoId=prov_track_id)
+        track = {}
+        track["videoId"] = track_obj["videoDetails"]["videoId"]
+        track["title"] = track_obj["videoDetails"]["title"]
+        track["artists"] = [
+            {
+                "channelId": track_obj["videoDetails"]["channelId"],
+                "name": track_obj["videoDetails"]["author"],
+            }
+        ]
+        track["duration"] = track_obj["videoDetails"]["lengthSeconds"]
+        track["thumbnails"] = track_obj["microformat"]["microformatDataRenderer"][
+            "thumbnail"
+        ]["thumbnails"]
+        track["isAvailable"] = track_obj["playabilityStatus"]["status"] == "OK"
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_song)
+
+
+async def get_library_artists(headers: Dict[str, str]) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_library_artists function."""
+
+    def _get_library_artists():
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
+        artists = ytm.get_library_artists()
+        # Sync properties with uniformal artist object
+        for artist in artists:
+            artist["id"] = artist["browseId"]
+            artist["name"] = artist["artist"]
+            del artist["browseId"]
+            del artist["artist"]
+        return artists
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_library_artists)
+
+
+async def get_library_albums(headers: Dict[str, str]) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_library_albums function."""
+
+    def _get_library_albums():
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
+        return ytm.get_library_albums()
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_library_albums)
+
+
+async def get_library_playlists(headers: Dict[str, str]) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_library_playlists function."""
+
+    def _get_library_playlists():
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
+        playlists = ytm.get_library_playlists()
+        # Sync properties with uniformal playlist object
+        for playlist in playlists:
+            playlist["id"] = playlist["playlistId"]
+            del playlist["playlistId"]
+        return playlists
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_library_playlists)
+
+
+async def get_library_tracks(headers: Dict[str, str]) -> Dict[str, str]:
+    """Async wrapper around the ytmusicapi get_library_tracks function."""
+
+    def _get_library_tracks():
+        ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
+        tracks = ytm.get_library_songs()
+        return tracks
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _get_library_tracks)
+
+
+async def search(query: str, ytm_filter: str = None, limit: int = 20) -> List[Dict]:
+    """Async wrapper around the ytmusicapi search function."""
+
+    def _search():
+        ytm = ytmusicapi.YTMusic()
+        results = ytm.search(query=query, filter=ytm_filter, limit=limit)
+        # Sync result properties with uniformal objects
+        for result in results:
+            if result["resultType"] == "artist":
+                result["id"] = result["browseId"]
+                result["name"] = result["artist"]
+                del result["browseId"]
+                del result["artist"]
+            elif result["resultType"] == "playlist":
+                if "playlistId" in result:
+                    result["id"] = result["playlistId"]
+                    del result["playlistId"]
+                elif "browseId" in result:
+                    result["id"] = result["browseId"]
+                    del result["browseId"]
+        return results
+
+    loop = asyncio.get_running_loop()
+    return await loop.run_in_executor(None, _search)
diff --git a/music_assistant/music_providers/ytmusic/ytmusic.py b/music_assistant/music_providers/ytmusic/ytmusic.py
new file mode 100644 (file)
index 0000000..a2aae0b
--- /dev/null
@@ -0,0 +1,472 @@
+"""Youtube Music support for MusicAssistant."""
+import re
+from operator import itemgetter
+from typing import AsyncGenerator, Dict, List, Optional
+from urllib.parse import unquote
+
+import pytube
+import ytmusicapi
+
+from music_assistant.helpers.audio import get_http_stream
+from music_assistant.models.enums import ProviderType
+from music_assistant.models.errors import (
+    InvalidDataError,
+    LoginFailed,
+    MediaNotFoundError,
+)
+from music_assistant.models.media_items import (
+    Album,
+    AlbumType,
+    Artist,
+    ContentType,
+    ImageType,
+    MediaItemImage,
+    MediaItemProviderId,
+    MediaItemType,
+    MediaType,
+    Playlist,
+    StreamDetails,
+    Track,
+)
+from music_assistant.models.music_provider import MusicProvider
+from music_assistant.music_providers.ytmusic.helpers import (
+    get_album,
+    get_artist,
+    get_library_albums,
+    get_library_artists,
+    get_library_playlists,
+    get_library_tracks,
+    get_playlist,
+    get_track,
+    search,
+)
+
+YTM_DOMAIN = "https://music.youtube.com"
+YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
+
+
+class YoutubeMusicProvider(MusicProvider):
+    """Provider for Youtube Music."""
+
+    _attr_type = ProviderType.YTMUSIC
+    _attr_name = "Youtube Music"
+    _attr_supported_mediatypes = [
+        MediaType.ARTIST,
+        MediaType.ALBUM,
+        MediaType.TRACK,
+        MediaType.PLAYLIST,
+    ]
+    _headers = None
+    _context = None
+    _cookies = None
+
+    async def setup(self) -> bool:
+        """Set up the YTMusic provider."""
+        if not self.config.enabled:
+            return False
+        if not self.config.username or not self.config.password:
+            raise LoginFailed("Invalid login credentials")
+        await self._initialize_headers(cookie=self.config.password)
+        await self._initialize_context()
+        self._cookies = {"CONSENT": "YES+1"}
+        return True
+
+    async def search(
+        self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
+    ) -> List[MediaItemType]:
+        """
+        Perform search on musicprovider.
+
+            :param search_query: Search query.
+            :param media_types: A list of media_types to include. All types if None.
+            :param limit: Number of items to return in the search (per type).
+        """
+        ytm_filter = None
+        if len(media_types) == 1:
+            # YTM does not support multiple searchtypes, falls back to all if no type given
+            if media_types[0] == MediaType.ARTIST:
+                ytm_filter = "artists"
+            if media_types[0] == MediaType.ALBUM:
+                ytm_filter = "albums"
+            if media_types[0] == MediaType.TRACK:
+                ytm_filter = "songs"
+            if media_types[0] == MediaType.PLAYLIST:
+                ytm_filter = "playlists"
+        results = await search(query=search_query, ytm_filter=ytm_filter, limit=limit)
+        parsed_results = []
+        for result in results:
+            if result["resultType"] == "artist":
+                parsed_results.append(await self._parse_artist(result))
+            elif result["resultType"] == "album":
+                parsed_results.append(
+                    await self._parse_album(result, result["browseId"])
+                )
+            elif result["resultType"] == "playlist":
+                parsed_results.append(await self._parse_playlist(result))
+            elif result["resultType"] == "song":
+                parsed_results.append(await self._parse_track(result))
+        return parsed_results
+
+    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+        """Retrieve all library artists from Youtube Music."""
+        artists_obj = await get_library_artists(headers=self._headers)
+        for artist in artists_obj:
+            yield await self._parse_artist(artist)
+
+    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+        """Retrieve all library albums from Youtube Music."""
+        albums_obj = await get_library_albums(headers=self._headers)
+        for album in albums_obj:
+            yield await self._parse_album(album, album["browseId"])
+
+    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+        """Retrieve all library playlists from the provider."""
+        playlists_obj = await get_library_playlists(headers=self._headers)
+        for playlist in playlists_obj:
+            yield await self._parse_playlist(playlist)
+
+    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+        """Retrieve library tracks from Youtube Music."""
+        tracks_obj = await get_library_tracks(headers=self._headers)
+        for track in tracks_obj:
+            # Library tracks sometimes do not have a valid artist id
+            # In that case, call the API for track details based on track id
+            try:
+                yield await self._parse_track(track)
+            except InvalidDataError:
+                yield await self.get_track(track["videoId"])
+
+    async def get_album(self, prov_album_id) -> Album:
+        """Get full album details by id."""
+        album_obj = await get_album(prov_album_id=prov_album_id)
+        return (
+            await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
+            if album_obj
+            else None
+        )
+
+    async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
+        """Get album tracks for given album id."""
+        album_obj = await get_album(prov_album_id=prov_album_id)
+        return [
+            await self._parse_track(track)
+            for track in album_obj["tracks"]
+            if "tracks" in album_obj
+        ]
+
+    async def get_artist(self, prov_artist_id) -> Artist:
+        """Get full artist details by id."""
+        artist_obj = await get_artist(prov_artist_id=prov_artist_id)
+        return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
+
+    async def get_track(self, prov_track_id) -> Track:
+        """Get full track details by id."""
+        track_obj = await get_track(prov_track_id=prov_track_id)
+        return await self._parse_track(track_obj) if track_obj else None
+
+    async def get_playlist(self, prov_playlist_id) -> Playlist:
+        """Get full playlist details by id."""
+        playlist_obj = await get_playlist(prov_playlist_id)
+        return await self._parse_playlist(playlist_obj)
+
+    async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
+        """Get all playlist tracks for given playlist id."""
+        playlist_obj = await get_playlist(prov_playlist_id)
+        if "tracks" in playlist_obj:
+            tracks = []
+            for track in playlist_obj["tracks"]:
+                if track["isAvailable"]:
+                    tracks.append(await self._parse_track(track))
+            return tracks
+        return []
+
+    async def get_artist_albums(self, prov_artist_id) -> List[Album]:
+        """Get a list of albums for the given artist."""
+        artist_obj = await get_artist(prov_artist_id=prov_artist_id)
+        if "albums" in artist_obj and "results" in artist_obj["albums"]:
+            return [
+                await self._parse_album(album, album["browseId"])
+                for album in artist_obj["albums"]["results"]
+            ]
+        return []
+
+    async def get_artist_toptracks(self, prov_artist_id) -> List[Track]:
+        """Get a list of 5 most popular tracks for the given artist."""
+        artist_obj = await get_artist(prov_artist_id=prov_artist_id)
+        if "songs" in artist_obj and "results" in artist_obj["songs"]:
+            return [
+                await self._parse_track(track)
+                for track in artist_obj["songs"]["results"]
+            ]
+        return []
+
+    async def get_stream_details(self, item_id: str) -> StreamDetails:
+        """Return the content details for the given track when it will be streamed."""
+        signature_timestamp = await self._get_signature_timestamp()
+        data = {
+            "playbackContext": {
+                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
+            },
+            "video_id": item_id,
+        }
+        track_obj = await self._post_data("player", data=data)
+        stream_format = await self._parse_stream_format(track_obj)
+        url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
+        return StreamDetails(
+            provider=self.type,
+            item_id=item_id,
+            data=url,
+            content_type=ContentType.try_parse(stream_format["mimeType"]),
+        )
+
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        async for chunk in get_http_stream(
+            self.mass, streamdetails.data, streamdetails, seek_position
+        ):
+            yield chunk
+
+    async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
+        url = f"{YTM_BASE_URL}{endpoint}"
+        data.update(self._context)
+        async with self.mass.http_session.post(
+            url,
+            headers=self._headers,
+            json=data,
+            verify_ssl=False,
+            cookies=self._cookies,
+        ) as response:
+            return await response.json()
+
+    async def _get_data(self, url: str, params: Dict = None):
+        async with self.mass.http_session.get(
+            url, headers=self._headers, params=params, cookies=self._cookies
+        ) as response:
+            return await response.text()
+
+    async def _initialize_headers(self, cookie: str) -> Dict[str, str]:
+        """Return headers to include in the requests."""
+        headers = {
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
+            "Accept": "*/*",
+            "Accept-Language": "en-US,en;q=0.5",
+            "Content-Type": "application/json",
+            "X-Goog-AuthUser": "0",
+            "x-origin": "https://music.youtube.com",
+            "Cookie": cookie,
+        }
+        sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
+        origin = headers.get("origin", headers.get("x-origin"))
+        headers["Authorization"] = ytmusicapi.helpers.get_authorization(
+            sapisid + " " + origin
+        )
+        self._headers = headers
+
+    async def _initialize_context(self) -> Dict[str, str]:
+        """Return a dict to use as a context in requests."""
+        self._context = {
+            "context": {
+                "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
+                "user": {},
+            }
+        }
+
+    async def _parse_album(self, album_obj: dict, album_id: str) -> Album:
+        """Parse a YT Album response to an Album model object."""
+        if "title" in album_obj:
+            name = album_obj["title"]
+        elif "name" in album_obj:
+            name = album_obj["name"]
+        album = Album(
+            item_id=album_id,
+            name=name,
+            provider=self.type,
+        )
+        if "year" in album_obj and album_obj["year"].isdigit():
+            album.year = album_obj["year"]
+        if "thumbnails" in album_obj:
+            album.metadata.images = await self._parse_thumbnails(
+                album_obj["thumbnails"]
+            )
+        if "description" in album_obj:
+            album.metadata.description = unquote(album_obj["description"])
+        if "artists" in album_obj:
+            album.artists = [
+                await self._parse_artist(artist) for artist in album_obj["artists"]
+            ]
+        if "type" in album_obj:
+            if album_obj["type"] == "Single":
+                album_type = AlbumType.SINGLE
+            elif album_obj["type"] == "EP":
+                album_type = AlbumType.EP
+            elif album_obj["type"] == "Album":
+                album_type = AlbumType.ALBUM
+            else:
+                album_type = AlbumType.UNKNOWN
+            album.album_type = album_type
+        album.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(album_id), prov_type=self.type, prov_id=self.id
+            )
+        )
+        return album
+
+    async def _parse_artist(self, artist_obj: dict) -> Artist:
+        """Parse a YT Artist response to Artist model object."""
+        artist_id = None
+        if "channelId" in artist_obj:
+            artist_id = artist_obj["channelId"]
+        elif "id" in artist_obj and artist_obj["id"]:
+            artist_id = artist_obj["id"]
+        elif artist_obj["name"] == "Various Artists":
+            artist_id = "UCUTXlgdcKU5vfzFqHOWIvkA"
+        if not artist_id:
+            raise InvalidDataError("Artist does not have a valid ID")
+        artist = Artist(item_id=artist_id, name=artist_obj["name"], provider=self.type)
+        if "description" in artist_obj:
+            artist.metadata.description = artist_obj["description"]
+        if "thumbnails" in artist_obj and artist_obj["thumbnails"]:
+            artist.metadata.images = await self._parse_thumbnails(
+                artist_obj["thumbnails"]
+            )
+        artist.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(artist_id),
+                prov_type=self.type,
+                prov_id=self.id,
+                url=f"https://music.youtube.com/channel/{artist_id}",
+            )
+        )
+        return artist
+
+    async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
+        """Parse a YT Playlist response to a Playlist object."""
+        playlist = Playlist(
+            item_id=playlist_obj["id"], provider=self.type, name=playlist_obj["title"]
+        )
+        if "description" in playlist_obj:
+            playlist.metadata.description = playlist_obj["description"]
+        if "thumbnails" in playlist_obj and playlist_obj["thumbnails"]:
+            playlist.metadata.images = await self._parse_thumbnails(
+                playlist_obj["thumbnails"]
+            )
+        if "author" in playlist_obj:
+            playlist.owner = playlist_obj["author"]
+        playlist.add_provider_id(
+            MediaItemProviderId(
+                item_id=playlist_obj["id"], prov_type=self.type, prov_id=self.id
+            )
+        )
+        return playlist
+
+    async def _parse_track(self, track_obj: dict) -> Track:
+        """Parse a YT Track response to a Track model object."""
+        track = Track(
+            item_id=track_obj["videoId"], provider=self.type, name=track_obj["title"]
+        )
+        if "artists" in track_obj:
+            track.artists = [
+                await self._parse_artist(artist) for artist in track_obj["artists"]
+            ]
+        if "thumbnails" in track_obj and track_obj["thumbnails"]:
+            track.metadata.images = await self._parse_thumbnails(
+                track_obj["thumbnails"]
+            )
+        if (
+            "album" in track_obj
+            and track_obj["album"]
+            and "id" in track_obj["album"]
+            and track_obj["album"]["id"]
+            and "artists" in track_obj
+        ):
+            album = track_obj["album"]
+            album["artists"] = track_obj["artists"]
+            track.album = await self._parse_album(album, album["id"])
+        if "isExplicit" in track_obj:
+            track.metadata.explicit = track_obj["isExplicit"]
+        if "duration" in track_obj and track_obj["duration"].isdigit():
+            track.duration = track_obj["duration"]
+        available = True
+        if "isAvailable" in track_obj:
+            available = track_obj["isAvailable"]
+        track.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(track_obj["videoId"]),
+                prov_type=self.type,
+                prov_id=self.id,
+                available=available,
+            )
+        )
+        if not track.item_id:
+            print("what?")
+        return track
+
+    async def _get_signature_timestamp(self):
+        """Get a signature timestamp required to generate valid stream URLs."""
+        response = await self._get_data(url=YTM_DOMAIN)
+        match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
+        if match is None:
+            raise Exception("Could not identify the URL for base.js player.")
+        url = YTM_DOMAIN + match.group(1)
+        response = await self._get_data(url=url)
+        match = re.search(r"signatureTimestamp[:=](\d+)", response)
+        if match is None:
+            raise Exception("Unable to identify the signatureTimestamp.")
+        return int(match.group(1))
+
+    async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
+        """Figure out the stream URL to use based on the YT track object."""
+        cipher_parts = {}
+        for part in stream_format["signatureCipher"].split("&"):
+            key, val = part.split("=", maxsplit=1)
+            cipher_parts[key] = unquote(val)
+        signature = await self._decipher_signature(
+            ciphered_signature=cipher_parts["s"], item_id=item_id
+        )
+        url = cipher_parts["url"] + "&sig=" + signature
+        return url
+
+    @classmethod
+    async def _parse_thumbnails(cls, thumbnails_obj: dict) -> List[MediaItemImage]:
+        """Parse and sort a list of thumbnails from highest quality to lowest quality."""
+        thumbnails_sorted = sorted(
+            thumbnails_obj, key=itemgetter("width"), reverse=True
+        )
+        return [
+            MediaItemImage(ImageType.THUMB, thumb["url"]) for thumb in thumbnails_sorted
+        ]
+
+    @classmethod
+    async def _parse_stream_format(cls, track_obj: dict) -> dict:
+        """Grab the highes available audio stream from available streams."""
+        stream_format = {}
+        quality_mapper = {
+            "AUDIO_QUALITY_LOW": 1,
+            "AUDIO_QUALITY_MEDIUM": 2,
+            "AUDIO_QUALITY_HIGH": 3,
+        }
+        for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
+            if adaptive_format["mimeType"].startswith("audio") and (
+                not stream_format
+                or quality_mapper.get(adaptive_format["audioQuality"], 0)
+                > quality_mapper.get(stream_format["audioQuality"], 0)
+            ):
+                stream_format = adaptive_format
+        if stream_format is None:
+            raise MediaNotFoundError("No stream found for this track")
+        return stream_format
+
+    async def _decipher_signature(self, ciphered_signature: str, item_id: str):
+        """Decipher the signature, required to build the Stream URL."""
+
+        def _decipher():
+            embed_url = f"https://www.youtube.com/embed/{item_id}"
+            embed_html = pytube.request.get(embed_url)
+            js_url = pytube.extract.js_url(embed_html)
+            ytm_js = pytube.request.get(js_url)
+            cipher = pytube.cipher.Cipher(js=ytm_js)
+            return cipher.get_signature(ciphered_signature)
+
+        return await self.mass.loop.run_in_executor(None, _decipher)