precommit fixes
authorMarvin Schenkel <marvinschenkel@gmail.com>
Tue, 5 Jul 2022 16:54:05 +0000 (18:54 +0200)
committerMarvin Schenkel <marvinschenkel@gmail.com>
Wed, 6 Jul 2022 14:05:25 +0000 (16:05 +0200)
examples/ytmusic.py
music_assistant/music_providers/ytmusic.py

index 2195eff7d582c21ba60b3de75ef0817a3ca22bf3..0884c4c273c80ea73242cd39d1c62013a40aadc8 100644 (file)
@@ -1,6 +1,5 @@
-import argparse
+"""Example file to play music from YTM. Might omit later"""
 import asyncio
-from cgi import test
 import logging
 import os
 from os.path import abspath, dirname
@@ -8,11 +7,11 @@ from sys import path
 
 path.insert(1, dirname(dirname(abspath(__file__))))
 
+# pylint: disable=wrong-import-position
 from music_assistant.mass import MusicAssistant
 from music_assistant.models.config import MassConfig, MusicProviderConfig
-from music_assistant.models.enums import MediaType, ProviderType
+from music_assistant.models.enums import ProviderType
 from music_assistant.models.player import Player, PlayerState
-from music_assistant.models.player_queue import RepeatMode
 
 # setup logger
 logging.basicConfig(
@@ -103,22 +102,18 @@ async def main():
 
     async with MusicAssistant(mass_conf) as mass:
         # get some data
-        yt = mass.music.get_provider(ProviderType.YTMUSIC)
-        
-        results = await yt.search("dark side of the moon", [MediaType.ALBUM])
-        tracks = await yt.get_album_tracks(results[0].item_id)
-
-        stream_details = await yt.get_stream_details(tracks[0].item_id)
-        print(stream_details.data)
-
-        #artist = await yt.get_artist("UC-T-wUsgssr7Vy-BiJD4-2g")
-        # print(artist)
+        ytm = mass.music.get_provider(ProviderType.YTMUSIC)
+        # track = await yt.get_track("pE3ju1qS848")
+        album = await ytm.get_album("MPREb_AYetWMZunqA")
+        print(album)
+        # sd = await yt.get_stream_details(track.item_id)
+        # print(sd.data)
 
         # test_player1 = TestPlayer("test1")
         # await mass.players.register_player(test_player1)
-        # await test_player1.active_queue.play_media(track)
+        # await test_player1.active_queue.play_media(track.uri)
 
-        await asyncio.sleep(3600)
+        await asyncio.sleep(3600)
 
 
 if __name__ == "__main__":
index 9d3802e6143607672dff9057a1ea3faa37189d6a..10f4f63f6e1e3f5256d63547fb5567a2dc80f4c5 100644 (file)
@@ -1,15 +1,15 @@
-"""YT Music support for MusicAssistant"""
-from email.utils import parseaddr
+"""YT Music support for MusicAssistant."""
 import json
 import re
-from requests.structures import CaseInsensitiveDict
+from datetime import date
 from typing import AsyncGenerator, Dict, List, Optional
 from urllib.parse import unquote
-from yaml import parse
 
-import ytmusicapi
 import pytube
+import ytmusicapi
+from requests.structures import CaseInsensitiveDict
 
+from music_assistant.helpers.audio import get_http_stream
 from music_assistant.models.enums import ProviderType
 from music_assistant.models.errors import MediaNotFoundError
 from music_assistant.models.media_items import (
@@ -19,23 +19,20 @@ from music_assistant.models.media_items import (
     ContentType,
     ImageType,
     MediaItemImage,
+    MediaItemProviderId,
     MediaItemType,
     MediaType,
-    Playlist,
     StreamDetails,
     Track,
 )
 from music_assistant.models.music_provider import MusicProvider
-from music_assistant.helpers.audio import (
-    get_http_stream,
-)
 
 YTM_DOMAIN = "https://music.youtube.com"
 YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
 
 
 class YTMusic(MusicProvider):
-    """Provider for Youtube Music"""
+    """Provider for Youtube Music."""
 
     _attr_type = ProviderType.YTMUSIC
     _attr_name = "YTMusic"
@@ -46,11 +43,13 @@ class YTMusic(MusicProvider):
         MediaType.PLAYLIST,
     ]
     _headers = None
+    _context = None
+    _cookies = None
 
     async def setup(self) -> bool:
-        """Sets up the YTMusic provider"""
-        self._headers = await self._initialize_headers()
-        self._context = await self._initialize_context()
+        """Set up the YTMusic provider."""
+        await self._initialize_headers()
+        await self._initialize_context()
         self._cookies = {"CONSENT": "YES+1"}
         return True
 
@@ -59,24 +58,25 @@ class YTMusic(MusicProvider):
     ) -> List[MediaItemType]:
         """
         Perform search on musicprovider.
+
             :param search_query: Search query.
             :param media_types: A list of media_types to include. All types if None.
             :param limit: Number of items to return in the search (per type).
         """
         data = {"query": search_query}
-        filter = None
+        ytm_filter = None
         if len(media_types) == 1:
             # YTM does not support multiple searchtypes, falls back to all if no type given
             if media_types[0] == MediaType.ARTIST:
-                filter = "artists"
+                ytm_filter = "artists"
             if media_types[0] == MediaType.ALBUM:
-                filter = "albums"
+                ytm_filter = "albums"
             if media_types[0] == MediaType.TRACK:
-                filter = "songs"
+                ytm_filter = "songs"
             if media_types[0] == MediaType.PLAYLIST:
-                filter = "playlists"
+                ytm_filter = "playlists"
         params = ytmusicapi.parsers.search_params.get_search_params(
-            filter=filter, scope=None, ignore_spelling=False
+            filter=ytm_filter, scope=None, ignore_spelling=False
         )
         data["params"] = params
         search_results = await self._post_data(endpoint="search", data=data)
@@ -104,7 +104,9 @@ class YTMusic(MusicProvider):
                         parsed_results.append(await self.get_album(album_id))
                 elif category == "Songs":
                     for song in result["musicShelfRenderer"]["contents"]:
-                        song_id = song["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
+                        song_id = song["musicResponsiveListItemRenderer"][
+                            "playlistItemData"
+                        ]["videoId"]
                         parsed_results.append(await self.get_track(song_id))
                 else:
                     print(category)
@@ -114,25 +116,29 @@ class YTMusic(MusicProvider):
         """Get full album details by id."""
         data = {"browseId": prov_album_id}
         album_obj = await self._post_data(endpoint="browse", data=data)
-        return await self._parse_album(album_obj=album_obj, album_id=prov_album_id) if album_obj else None
+        return (
+            await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
+            if album_obj
+            else None
+        )
 
     async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
         """Get album tracks for given album id."""
         data = {"browseId": prov_album_id}
-        album_obj = await self._post_data(endpoint="browse", data=data)        
+        album_obj = await self._post_data(endpoint="browse", data=data)
         parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
         album_playlist_id = parsed_album["audioPlaylistId"]
         return await self.get_playlist_tracks(album_playlist_id)
 
     async def get_artist(self, prov_artist_id) -> Artist:
-        """Get full artist details by id"""
+        """Get full artist details by id."""
         data = {"browseId": prov_artist_id}
         artist_obj = await self._post_data(endpoint="browse", data=data)
         return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
 
     async def get_track(self, prov_track_id) -> Track:
         """Get full track details by id."""
-        signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
+        signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1
         data = {
             "playbackContext": {
                 "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
@@ -144,12 +150,24 @@ class YTMusic(MusicProvider):
 
     async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
         """Get all playlist tracks for given playlist id."""
-        browseId = "VL" + prov_playlist_id if not prov_playlist_id.startswith("VL") else prov_playlist_id
-        data = {'browseId': browseId}
+        browse_id = (
+            "VL" + prov_playlist_id
+            if not prov_playlist_id.startswith("VL")
+            else prov_playlist_id
+        )
+        data = {"browseId": browse_id}
         playlist_obj = await self._post_data("browse", data=data)
-        tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0]["musicPlaylistShelfRenderer"]["contents"]
+        tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][
+            0
+        ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
+            "musicPlaylistShelfRenderer"
+        ][
+            "contents"
+        ]
         return [
-            await self.get_track(track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"])
+            await self.get_track(
+                track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
+            )
             for track in tracks
         ]
 
@@ -197,11 +215,11 @@ class YTMusic(MusicProvider):
             return await response.text()
 
     async def _initialize_headers(self) -> Dict[str, str]:
-        """Returns headers to include in the requests"""
+        """Return headers to include in the requests."""
         # TODO: Replace with Cookie string from Config
         path = "../headers_auth.json"
         headers = None
-        with open(path) as json_file:
+        with open(path, mode="r", encoding="utf-8") as json_file:
             headers = CaseInsensitiveDict(json.load(json_file))
         cookie = headers.get("cookie")
         sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
@@ -209,12 +227,11 @@ class YTMusic(MusicProvider):
         headers["Authorization"] = ytmusicapi.helpers.get_authorization(
             sapisid + " " + origin
         )
-
-        return headers
+        self._headers = headers
 
     async def _initialize_context(self) -> Dict[str, str]:
-        """Returns a dict to use as a context in requests"""
-        return {
+        """Return a dict to use as a context in requests."""
+        self._context = {
             "context": {
                 "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
                 "user": {},
@@ -222,7 +239,7 @@ class YTMusic(MusicProvider):
         }
 
     async def _parse_album(self, album_obj: dict, album_id: str) -> Album:
-        """Parses a YT Album response to an Album model object"""
+        """Parse a YT Album response to an Album model object."""
         parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
         album = Album(
             item_id=album_id,
@@ -238,20 +255,37 @@ class YTMusic(MusicProvider):
         if "description" in parsed_album:
             album.metadata.description = unquote(parsed_album["description"])
         artists = []
-        for artist in parsed_album["artists"]:
-            if artist["id"]:
-                artists.append(await self.get_artist(artist["id"]))
+        for parsed_artist in parsed_album["artists"]:
+            if parsed_artist["id"]:
+                artist = Artist(
+                    item_id=parsed_artist["id"],
+                    provider=self.type,
+                    name=parsed_artist["name"],
+                )
+                artist.add_provider_id(
+                    MediaItemProviderId(
+                        item_id=str(parsed_artist["id"]),
+                        prov_type=self.type,
+                        prov_id=self.id,
+                    )
+                )
+                artists.append(artist)
         album.artists = artists
+        album.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(album_id), prov_type=self.type, prov_id=self.id
+            )
+        )
         return album
 
     async def _parse_artist(self, artist_obj: dict) -> Artist:
-        """Parse a YT Artist response to Artist model object"""
+        """Parse a YT Artist response to Artist model object."""
         name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0][
             "text"
         ]
-        artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"][
-            "subscribeButtonRenderer"
-        ]["channelId"]
+        artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][
+            "subscriptionButton"
+        ]["subscribeButtonRenderer"]["channelId"]
         artist = Artist(item_id=str(artist_id), provider=self.type, name=name)
         if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
             artist.metadata.description = unquote(
@@ -265,72 +299,87 @@ class YTMusic(MusicProvider):
         ]["thumbnail"]["thumbnails"]:
             images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
         artist.metadata.images = images
+        artist.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(artist_id), prov_type=self.type, prov_id=self.id
+            )
+        )
         return artist
 
     async def _parse_track(self, track_obj: dict) -> Track:
-        """Parses a YT Track response to a Track model object"""
+        """Parse a YT Track response to a Track model object."""
         track = Track(
             item_id=track_obj["videoDetails"]["videoId"],
             provider=self.type,
             name=track_obj["videoDetails"]["title"],
             duration=track_obj["videoDetails"]["lengthSeconds"],
         )
-        artist = await self.get_artist(
-            track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][
-                "externalChannelId"
-            ]
-        )
-        track.artists = [artist]
+        print(json.dumps(track_obj))
+        # artist = await self.get_artist(
+        #     track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][
+        #         "externalChannelId"
+        #     ]
+        # )
+        # track.artists = [artist]
         images = []
         for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][
             "thumbnails"
         ]:
             images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
+        track.add_provider_id(
+            MediaItemProviderId(
+                item_id=str(track_obj["videoDetails"]["videoId"]),
+                prov_type=self.type,
+                prov_id=self.id,
+            )
+        )
         return track
 
-    async def _parse_stream_format(self, track_obj: dict) -> dict:
-        """Grabs the highes available audio stream from available streams"""
-        stream_format = None
-        for format in track_obj["streamingData"]["adaptiveFormats"]:
-            if (
-                format["mimeType"].startswith("audio")
-                and format["audioQuality"] == "AUDIO_QUALITY_HIGH"
-            ):
-                stream_format = format
-        if stream_format is None:
-            raise MediaNotFoundError("No stream found for this track")
-        return stream_format
+    async def _get_signature_timestamp(self):
+        """Get a signature timestamp required to generate valid stream URLs."""
+        response = await self._get_data(url=YTM_DOMAIN)
+        match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
+        if match is None:
+            raise Exception("Could not identify the URL for base.js player.")
+        url = YTM_DOMAIN + match.group(1)
+        response = await self._get_data(url=url)
+        match = re.search(r"signatureTimestamp[:=](\d+)", response)
+        if match is None:
+            raise Exception("Unable to identify the signatureTimestamp.")
+        return int(match.group(1))
 
     async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
-        """Figures out the stream URL to use based on the YT track object"""
-        cipher_parts = dict()
+        """Figure out the stream URL to use based on the YT track object."""
+        cipher_parts = {}
         for part in stream_format["signatureCipher"].split("&"):
-            k, v = part.split("=", maxsplit=1)
-            cipher_parts[k] = unquote(v)
+            key, val = part.split("=", maxsplit=1)
+            cipher_parts[key] = unquote(val)
         signature = await self._decipher_signature(
             ciphered_signature=cipher_parts["s"], item_id=item_id
         )
         url = cipher_parts["url"] + "&sig=" + signature
         return url
 
-    async def _decipher_signature(self, ciphered_signature: str, item_id: str):
-        """Decipher the signature, required to build the Stream URL"""
+    @classmethod
+    async def _parse_stream_format(cls, track_obj: dict) -> dict:
+        """Grab the highes available audio stream from available streams."""
+        stream_format = None
+        for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
+            if (
+                adaptive_format["mimeType"].startswith("audio")
+                and adaptive_format["audioQuality"] == "AUDIO_QUALITY_HIGH"
+            ):
+                stream_format = adaptive_format
+        if stream_format is None:
+            raise MediaNotFoundError("No stream found for this track")
+        return stream_format
+
+    @classmethod
+    async def _decipher_signature(cls, ciphered_signature: str, item_id: str):
+        """Decipher the signature, required to build the Stream URL."""
         embed_url = f"https://www.youtube.com/embed/{item_id}"
         embed_html = pytube.request.get(embed_url)
         js_url = pytube.extract.js_url(embed_html)
-        js = pytube.request.get(js_url)
-        cipher = pytube.cipher.Cipher(js=js)
+        ytm_js = pytube.request.get(js_url)
+        cipher = pytube.cipher.Cipher(js=ytm_js)
         return cipher.get_signature(ciphered_signature)
-
-    async def _get_signature_timestamp(self):
-        """Gets a signature timestamp required to generate valid stream URLs"""
-        response = await self._get_data(url=YTM_DOMAIN)
-        match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
-        if match is None:
-            raise Exception("Could not identify the URL for base.js player.")
-        url = YTM_DOMAIN + match.group(1)
-        response = await self._get_data(url=url)
-        match = re.search(r"signatureTimestamp[:=](\d+)", response)
-        if match is None:
-            raise Exception("Unable to identify the signatureTimestamp.")
-        return int(match.group(1))