From: Marvin Schenkel Date: Tue, 5 Jul 2022 16:54:05 +0000 (+0200) Subject: precommit fixes X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=7b2b8c28b119b35c37a9eaf71c4d97a3f065408f;p=music-assistant-server.git precommit fixes --- diff --git a/examples/ytmusic.py b/examples/ytmusic.py index 2195eff7..0884c4c2 100644 --- a/examples/ytmusic.py +++ b/examples/ytmusic.py @@ -1,6 +1,5 @@ -import argparse +"""Example file to play music from YTM. Might omit later""" import asyncio -from cgi import test import logging import os from os.path import abspath, dirname @@ -8,11 +7,11 @@ from sys import path path.insert(1, dirname(dirname(abspath(__file__)))) +# pylint: disable=wrong-import-position from music_assistant.mass import MusicAssistant from music_assistant.models.config import MassConfig, MusicProviderConfig -from music_assistant.models.enums import MediaType, ProviderType +from music_assistant.models.enums import ProviderType from music_assistant.models.player import Player, PlayerState -from music_assistant.models.player_queue import RepeatMode # setup logger logging.basicConfig( @@ -103,22 +102,18 @@ async def main(): async with MusicAssistant(mass_conf) as mass: # get some data - yt = mass.music.get_provider(ProviderType.YTMUSIC) - - results = await yt.search("dark side of the moon", [MediaType.ALBUM]) - tracks = await yt.get_album_tracks(results[0].item_id) - - stream_details = await yt.get_stream_details(tracks[0].item_id) - print(stream_details.data) - - #artist = await yt.get_artist("UC-T-wUsgssr7Vy-BiJD4-2g") - # print(artist) + ytm = mass.music.get_provider(ProviderType.YTMUSIC) + # track = await yt.get_track("pE3ju1qS848") + album = await ytm.get_album("MPREb_AYetWMZunqA") + print(album) + # sd = await yt.get_stream_details(track.item_id) + # print(sd.data) # test_player1 = TestPlayer("test1") # await mass.players.register_player(test_player1) - # await test_player1.active_queue.play_media(track) + # await test_player1.active_queue.play_media(track.uri) - # await asyncio.sleep(3600) + await asyncio.sleep(3600) if __name__ == "__main__": diff --git a/music_assistant/music_providers/ytmusic.py b/music_assistant/music_providers/ytmusic.py index 9d3802e6..10f4f63f 100644 --- a/music_assistant/music_providers/ytmusic.py +++ b/music_assistant/music_providers/ytmusic.py @@ -1,15 +1,15 @@ -"""YT Music support for MusicAssistant""" -from email.utils import parseaddr +"""YT Music support for MusicAssistant.""" import json import re -from requests.structures import CaseInsensitiveDict +from datetime import date from typing import AsyncGenerator, Dict, List, Optional from urllib.parse import unquote -from yaml import parse -import ytmusicapi import pytube +import ytmusicapi +from requests.structures import CaseInsensitiveDict +from music_assistant.helpers.audio import get_http_stream from music_assistant.models.enums import ProviderType from music_assistant.models.errors import MediaNotFoundError from music_assistant.models.media_items import ( @@ -19,23 +19,20 @@ from music_assistant.models.media_items import ( ContentType, ImageType, MediaItemImage, + MediaItemProviderId, MediaItemType, MediaType, - Playlist, StreamDetails, Track, ) from music_assistant.models.music_provider import MusicProvider -from music_assistant.helpers.audio import ( - get_http_stream, -) YTM_DOMAIN = "https://music.youtube.com" YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/" class YTMusic(MusicProvider): - """Provider for Youtube Music""" + """Provider for Youtube Music.""" _attr_type = ProviderType.YTMUSIC _attr_name = "YTMusic" @@ -46,11 +43,13 @@ class YTMusic(MusicProvider): MediaType.PLAYLIST, ] _headers = None + _context = None + _cookies = None async def setup(self) -> bool: - """Sets up the YTMusic provider""" - self._headers = await self._initialize_headers() - self._context = await self._initialize_context() + """Set up the YTMusic provider.""" + await self._initialize_headers() + await self._initialize_context() self._cookies = {"CONSENT": "YES+1"} return True @@ -59,24 +58,25 @@ class YTMusic(MusicProvider): ) -> List[MediaItemType]: """ Perform search on musicprovider. + :param search_query: Search query. :param media_types: A list of media_types to include. All types if None. :param limit: Number of items to return in the search (per type). """ data = {"query": search_query} - filter = None + ytm_filter = None if len(media_types) == 1: # YTM does not support multiple searchtypes, falls back to all if no type given if media_types[0] == MediaType.ARTIST: - filter = "artists" + ytm_filter = "artists" if media_types[0] == MediaType.ALBUM: - filter = "albums" + ytm_filter = "albums" if media_types[0] == MediaType.TRACK: - filter = "songs" + ytm_filter = "songs" if media_types[0] == MediaType.PLAYLIST: - filter = "playlists" + ytm_filter = "playlists" params = ytmusicapi.parsers.search_params.get_search_params( - filter=filter, scope=None, ignore_spelling=False + filter=ytm_filter, scope=None, ignore_spelling=False ) data["params"] = params search_results = await self._post_data(endpoint="search", data=data) @@ -104,7 +104,9 @@ class YTMusic(MusicProvider): parsed_results.append(await self.get_album(album_id)) elif category == "Songs": for song in result["musicShelfRenderer"]["contents"]: - song_id = song["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"] + song_id = song["musicResponsiveListItemRenderer"][ + "playlistItemData" + ]["videoId"] parsed_results.append(await self.get_track(song_id)) else: print(category) @@ -114,25 +116,29 @@ class YTMusic(MusicProvider): """Get full album details by id.""" data = {"browseId": prov_album_id} album_obj = await self._post_data(endpoint="browse", data=data) - return await self._parse_album(album_obj=album_obj, album_id=prov_album_id) if album_obj else None + return ( + await self._parse_album(album_obj=album_obj, album_id=prov_album_id) + if album_obj + else None + ) async def get_album_tracks(self, prov_album_id: str) -> List[Track]: """Get album tracks for given album id.""" data = {"browseId": prov_album_id} - album_obj = await self._post_data(endpoint="browse", data=data) + album_obj = await self._post_data(endpoint="browse", data=data) parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) album_playlist_id = parsed_album["audioPlaylistId"] return await self.get_playlist_tracks(album_playlist_id) async def get_artist(self, prov_artist_id) -> Artist: - """Get full artist details by id""" + """Get full artist details by id.""" data = {"browseId": prov_artist_id} artist_obj = await self._post_data(endpoint="browse", data=data) return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None async def get_track(self, prov_track_id) -> Track: """Get full track details by id.""" - signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1 + signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1 data = { "playbackContext": { "contentPlaybackContext": {"signatureTimestamp": signature_timestamp} @@ -144,12 +150,24 @@ class YTMusic(MusicProvider): async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: """Get all playlist tracks for given playlist id.""" - browseId = "VL" + prov_playlist_id if not prov_playlist_id.startswith("VL") else prov_playlist_id - data = {'browseId': browseId} + browse_id = ( + "VL" + prov_playlist_id + if not prov_playlist_id.startswith("VL") + else prov_playlist_id + ) + data = {"browseId": browse_id} playlist_obj = await self._post_data("browse", data=data) - tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0]["musicPlaylistShelfRenderer"]["contents"] + tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][ + 0 + ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][ + "musicPlaylistShelfRenderer" + ][ + "contents" + ] return [ - await self.get_track(track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]) + await self.get_track( + track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"] + ) for track in tracks ] @@ -197,11 +215,11 @@ class YTMusic(MusicProvider): return await response.text() async def _initialize_headers(self) -> Dict[str, str]: - """Returns headers to include in the requests""" + """Return headers to include in the requests.""" # TODO: Replace with Cookie string from Config path = "../headers_auth.json" headers = None - with open(path) as json_file: + with open(path, mode="r", encoding="utf-8") as json_file: headers = CaseInsensitiveDict(json.load(json_file)) cookie = headers.get("cookie") sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie) @@ -209,12 +227,11 @@ class YTMusic(MusicProvider): headers["Authorization"] = ytmusicapi.helpers.get_authorization( sapisid + " " + origin ) - - return headers + self._headers = headers async def _initialize_context(self) -> Dict[str, str]: - """Returns a dict to use as a context in requests""" - return { + """Return a dict to use as a context in requests.""" + self._context = { "context": { "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"}, "user": {}, @@ -222,7 +239,7 @@ class YTMusic(MusicProvider): } async def _parse_album(self, album_obj: dict, album_id: str) -> Album: - """Parses a YT Album response to an Album model object""" + """Parse a YT Album response to an Album model object.""" parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj) album = Album( item_id=album_id, @@ -238,20 +255,37 @@ class YTMusic(MusicProvider): if "description" in parsed_album: album.metadata.description = unquote(parsed_album["description"]) artists = [] - for artist in parsed_album["artists"]: - if artist["id"]: - artists.append(await self.get_artist(artist["id"])) + for parsed_artist in parsed_album["artists"]: + if parsed_artist["id"]: + artist = Artist( + item_id=parsed_artist["id"], + provider=self.type, + name=parsed_artist["name"], + ) + artist.add_provider_id( + MediaItemProviderId( + item_id=str(parsed_artist["id"]), + prov_type=self.type, + prov_id=self.id, + ) + ) + artists.append(artist) album.artists = artists + album.add_provider_id( + MediaItemProviderId( + item_id=str(album_id), prov_type=self.type, prov_id=self.id + ) + ) return album async def _parse_artist(self, artist_obj: dict) -> Artist: - """Parse a YT Artist response to Artist model object""" + """Parse a YT Artist response to Artist model object.""" name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0][ "text" ] - artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"][ - "subscribeButtonRenderer" - ]["channelId"] + artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][ + "subscriptionButton" + ]["subscribeButtonRenderer"]["channelId"] artist = Artist(item_id=str(artist_id), provider=self.type, name=name) if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]: artist.metadata.description = unquote( @@ -265,72 +299,87 @@ class YTMusic(MusicProvider): ]["thumbnail"]["thumbnails"]: images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) artist.metadata.images = images + artist.add_provider_id( + MediaItemProviderId( + item_id=str(artist_id), prov_type=self.type, prov_id=self.id + ) + ) return artist async def _parse_track(self, track_obj: dict) -> Track: - """Parses a YT Track response to a Track model object""" + """Parse a YT Track response to a Track model object.""" track = Track( item_id=track_obj["videoDetails"]["videoId"], provider=self.type, name=track_obj["videoDetails"]["title"], duration=track_obj["videoDetails"]["lengthSeconds"], ) - artist = await self.get_artist( - track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][ - "externalChannelId" - ] - ) - track.artists = [artist] + print(json.dumps(track_obj)) + # artist = await self.get_artist( + # track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][ + # "externalChannelId" + # ] + # ) + # track.artists = [artist] images = [] for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][ "thumbnails" ]: images.append(MediaItemImage(ImageType.THUMB, thumb["url"])) + track.add_provider_id( + MediaItemProviderId( + item_id=str(track_obj["videoDetails"]["videoId"]), + prov_type=self.type, + prov_id=self.id, + ) + ) return track - async def _parse_stream_format(self, track_obj: dict) -> dict: - """Grabs the highes available audio stream from available streams""" - stream_format = None - for format in track_obj["streamingData"]["adaptiveFormats"]: - if ( - format["mimeType"].startswith("audio") - and format["audioQuality"] == "AUDIO_QUALITY_HIGH" - ): - stream_format = format - if stream_format is None: - raise MediaNotFoundError("No stream found for this track") - return stream_format + async def _get_signature_timestamp(self): + """Get a signature timestamp required to generate valid stream URLs.""" + response = await self._get_data(url=YTM_DOMAIN) + match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response) + if match is None: + raise Exception("Could not identify the URL for base.js player.") + url = YTM_DOMAIN + match.group(1) + response = await self._get_data(url=url) + match = re.search(r"signatureTimestamp[:=](\d+)", response) + if match is None: + raise Exception("Unable to identify the signatureTimestamp.") + return int(match.group(1)) async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str: - """Figures out the stream URL to use based on the YT track object""" - cipher_parts = dict() + """Figure out the stream URL to use based on the YT track object.""" + cipher_parts = {} for part in stream_format["signatureCipher"].split("&"): - k, v = part.split("=", maxsplit=1) - cipher_parts[k] = unquote(v) + key, val = part.split("=", maxsplit=1) + cipher_parts[key] = unquote(val) signature = await self._decipher_signature( ciphered_signature=cipher_parts["s"], item_id=item_id ) url = cipher_parts["url"] + "&sig=" + signature return url - async def _decipher_signature(self, ciphered_signature: str, item_id: str): - """Decipher the signature, required to build the Stream URL""" + @classmethod + async def _parse_stream_format(cls, track_obj: dict) -> dict: + """Grab the highes available audio stream from available streams.""" + stream_format = None + for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]: + if ( + adaptive_format["mimeType"].startswith("audio") + and adaptive_format["audioQuality"] == "AUDIO_QUALITY_HIGH" + ): + stream_format = adaptive_format + if stream_format is None: + raise MediaNotFoundError("No stream found for this track") + return stream_format + + @classmethod + async def _decipher_signature(cls, ciphered_signature: str, item_id: str): + """Decipher the signature, required to build the Stream URL.""" embed_url = f"https://www.youtube.com/embed/{item_id}" embed_html = pytube.request.get(embed_url) js_url = pytube.extract.js_url(embed_html) - js = pytube.request.get(js_url) - cipher = pytube.cipher.Cipher(js=js) + ytm_js = pytube.request.get(js_url) + cipher = pytube.cipher.Cipher(js=ytm_js) return cipher.get_signature(ciphered_signature) - - async def _get_signature_timestamp(self): - """Gets a signature timestamp required to generate valid stream URLs""" - response = await self._get_data(url=YTM_DOMAIN) - match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response) - if match is None: - raise Exception("Could not identify the URL for base.js player.") - url = YTM_DOMAIN + match.group(1) - response = await self._get_data(url=url) - match = re.search(r"signatureTimestamp[:=](\d+)", response) - if match is None: - raise Exception("Unable to identify the signatureTimestamp.") - return int(match.group(1))