-"""YT Music support for MusicAssistant"""
-from email.utils import parseaddr
+"""YT Music support for MusicAssistant."""
import json
import re
-from requests.structures import CaseInsensitiveDict
+from datetime import date
from typing import AsyncGenerator, Dict, List, Optional
from urllib.parse import unquote
-from yaml import parse
-import ytmusicapi
import pytube
+import ytmusicapi
+from requests.structures import CaseInsensitiveDict
+from music_assistant.helpers.audio import get_http_stream
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.media_items import (
ContentType,
ImageType,
MediaItemImage,
+ MediaItemProviderId,
MediaItemType,
MediaType,
- Playlist,
StreamDetails,
Track,
)
from music_assistant.models.music_provider import MusicProvider
-from music_assistant.helpers.audio import (
- get_http_stream,
-)
YTM_DOMAIN = "https://music.youtube.com"
YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
class YTMusic(MusicProvider):
- """Provider for Youtube Music"""
+ """Provider for Youtube Music."""
_attr_type = ProviderType.YTMUSIC
_attr_name = "YTMusic"
MediaType.PLAYLIST,
]
_headers = None
+ _context = None
+ _cookies = None
async def setup(self) -> bool:
- """Sets up the YTMusic provider"""
- self._headers = await self._initialize_headers()
- self._context = await self._initialize_context()
+ """Set up the YTMusic provider."""
+ await self._initialize_headers()
+ await self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
return True
) -> List[MediaItemType]:
"""
Perform search on musicprovider.
+
:param search_query: Search query.
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
data = {"query": search_query}
- filter = None
+ ytm_filter = None
if len(media_types) == 1:
# YTM does not support multiple searchtypes, falls back to all if no type given
if media_types[0] == MediaType.ARTIST:
- filter = "artists"
+ ytm_filter = "artists"
if media_types[0] == MediaType.ALBUM:
- filter = "albums"
+ ytm_filter = "albums"
if media_types[0] == MediaType.TRACK:
- filter = "songs"
+ ytm_filter = "songs"
if media_types[0] == MediaType.PLAYLIST:
- filter = "playlists"
+ ytm_filter = "playlists"
params = ytmusicapi.parsers.search_params.get_search_params(
- filter=filter, scope=None, ignore_spelling=False
+ filter=ytm_filter, scope=None, ignore_spelling=False
)
data["params"] = params
search_results = await self._post_data(endpoint="search", data=data)
parsed_results.append(await self.get_album(album_id))
elif category == "Songs":
for song in result["musicShelfRenderer"]["contents"]:
- song_id = song["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
+ song_id = song["musicResponsiveListItemRenderer"][
+ "playlistItemData"
+ ]["videoId"]
parsed_results.append(await self.get_track(song_id))
else:
print(category)
"""Get full album details by id."""
data = {"browseId": prov_album_id}
album_obj = await self._post_data(endpoint="browse", data=data)
- return await self._parse_album(album_obj=album_obj, album_id=prov_album_id) if album_obj else None
+ return (
+ await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
+ if album_obj
+ else None
+ )
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
data = {"browseId": prov_album_id}
- album_obj = await self._post_data(endpoint="browse", data=data)
+ album_obj = await self._post_data(endpoint="browse", data=data)
parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
album_playlist_id = parsed_album["audioPlaylistId"]
return await self.get_playlist_tracks(album_playlist_id)
async def get_artist(self, prov_artist_id) -> Artist:
- """Get full artist details by id"""
+ """Get full artist details by id."""
data = {"browseId": prov_artist_id}
artist_obj = await self._post_data(endpoint="browse", data=data)
return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
+ signature_timestamp = (date.today() - date.fromtimestamp(0)).days - 1
data = {
"playbackContext": {
"contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- browseId = "VL" + prov_playlist_id if not prov_playlist_id.startswith("VL") else prov_playlist_id
- data = {'browseId': browseId}
+ browse_id = (
+ "VL" + prov_playlist_id
+ if not prov_playlist_id.startswith("VL")
+ else prov_playlist_id
+ )
+ data = {"browseId": browse_id}
playlist_obj = await self._post_data("browse", data=data)
- tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0]["musicPlaylistShelfRenderer"]["contents"]
+ tracks = playlist_obj["contents"]["singleColumnBrowseResultsRenderer"]["tabs"][
+ 0
+ ]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0][
+ "musicPlaylistShelfRenderer"
+ ][
+ "contents"
+ ]
return [
- await self.get_track(track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"])
+ await self.get_track(
+ track["musicResponsiveListItemRenderer"]["playlistItemData"]["videoId"]
+ )
for track in tracks
]
return await response.text()
async def _initialize_headers(self) -> Dict[str, str]:
- """Returns headers to include in the requests"""
+ """Return headers to include in the requests."""
# TODO: Replace with Cookie string from Config
path = "../headers_auth.json"
headers = None
- with open(path) as json_file:
+ with open(path, mode="r", encoding="utf-8") as json_file:
headers = CaseInsensitiveDict(json.load(json_file))
cookie = headers.get("cookie")
sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
headers["Authorization"] = ytmusicapi.helpers.get_authorization(
sapisid + " " + origin
)
-
- return headers
+ self._headers = headers
async def _initialize_context(self) -> Dict[str, str]:
- """Returns a dict to use as a context in requests"""
- return {
+ """Return a dict to use as a context in requests."""
+ self._context = {
"context": {
"client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
"user": {},
}
async def _parse_album(self, album_obj: dict, album_id: str) -> Album:
- """Parses a YT Album response to an Album model object"""
+ """Parse a YT Album response to an Album model object."""
parsed_album = ytmusicapi.parsers.albums.parse_album_header(album_obj)
album = Album(
item_id=album_id,
if "description" in parsed_album:
album.metadata.description = unquote(parsed_album["description"])
artists = []
- for artist in parsed_album["artists"]:
- if artist["id"]:
- artists.append(await self.get_artist(artist["id"]))
+ for parsed_artist in parsed_album["artists"]:
+ if parsed_artist["id"]:
+ artist = Artist(
+ item_id=parsed_artist["id"],
+ provider=self.type,
+ name=parsed_artist["name"],
+ )
+ artist.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(parsed_artist["id"]),
+ prov_type=self.type,
+ prov_id=self.id,
+ )
+ )
+ artists.append(artist)
album.artists = artists
+ album.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(album_id), prov_type=self.type, prov_id=self.id
+ )
+ )
return album
async def _parse_artist(self, artist_obj: dict) -> Artist:
- """Parse a YT Artist response to Artist model object"""
+ """Parse a YT Artist response to Artist model object."""
name = artist_obj["header"]["musicImmersiveHeaderRenderer"]["title"]["runs"][0][
"text"
]
- artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"]["subscriptionButton"][
- "subscribeButtonRenderer"
- ]["channelId"]
+ artist_id = artist_obj["header"]["musicImmersiveHeaderRenderer"][
+ "subscriptionButton"
+ ]["subscribeButtonRenderer"]["channelId"]
artist = Artist(item_id=str(artist_id), provider=self.type, name=name)
if "description" in artist_obj["header"]["musicImmersiveHeaderRenderer"]:
artist.metadata.description = unquote(
]["thumbnail"]["thumbnails"]:
images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
artist.metadata.images = images
+ artist.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(artist_id), prov_type=self.type, prov_id=self.id
+ )
+ )
return artist
async def _parse_track(self, track_obj: dict) -> Track:
- """Parses a YT Track response to a Track model object"""
+ """Parse a YT Track response to a Track model object."""
track = Track(
item_id=track_obj["videoDetails"]["videoId"],
provider=self.type,
name=track_obj["videoDetails"]["title"],
duration=track_obj["videoDetails"]["lengthSeconds"],
)
- artist = await self.get_artist(
- track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][
- "externalChannelId"
- ]
- )
- track.artists = [artist]
+ print(json.dumps(track_obj))
+ # artist = await self.get_artist(
+ # track_obj["microformat"]["microformatDataRenderer"]["pageOwnerDetails"][
+ # "externalChannelId"
+ # ]
+ # )
+ # track.artists = [artist]
images = []
for thumb in track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][
"thumbnails"
]:
images.append(MediaItemImage(ImageType.THUMB, thumb["url"]))
+ track.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(track_obj["videoDetails"]["videoId"]),
+ prov_type=self.type,
+ prov_id=self.id,
+ )
+ )
return track
- async def _parse_stream_format(self, track_obj: dict) -> dict:
- """Grabs the highes available audio stream from available streams"""
- stream_format = None
- for format in track_obj["streamingData"]["adaptiveFormats"]:
- if (
- format["mimeType"].startswith("audio")
- and format["audioQuality"] == "AUDIO_QUALITY_HIGH"
- ):
- stream_format = format
- if stream_format is None:
- raise MediaNotFoundError("No stream found for this track")
- return stream_format
+ async def _get_signature_timestamp(self):
+ """Get a signature timestamp required to generate valid stream URLs."""
+ response = await self._get_data(url=YTM_DOMAIN)
+ match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
+ if match is None:
+ raise Exception("Could not identify the URL for base.js player.")
+ url = YTM_DOMAIN + match.group(1)
+ response = await self._get_data(url=url)
+ match = re.search(r"signatureTimestamp[:=](\d+)", response)
+ if match is None:
+ raise Exception("Unable to identify the signatureTimestamp.")
+ return int(match.group(1))
async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
- """Figures out the stream URL to use based on the YT track object"""
- cipher_parts = dict()
+ """Figure out the stream URL to use based on the YT track object."""
+ cipher_parts = {}
for part in stream_format["signatureCipher"].split("&"):
- k, v = part.split("=", maxsplit=1)
- cipher_parts[k] = unquote(v)
+ key, val = part.split("=", maxsplit=1)
+ cipher_parts[key] = unquote(val)
signature = await self._decipher_signature(
ciphered_signature=cipher_parts["s"], item_id=item_id
)
url = cipher_parts["url"] + "&sig=" + signature
return url
- async def _decipher_signature(self, ciphered_signature: str, item_id: str):
- """Decipher the signature, required to build the Stream URL"""
+ @classmethod
+ async def _parse_stream_format(cls, track_obj: dict) -> dict:
+ """Grab the highes available audio stream from available streams."""
+ stream_format = None
+ for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
+ if (
+ adaptive_format["mimeType"].startswith("audio")
+ and adaptive_format["audioQuality"] == "AUDIO_QUALITY_HIGH"
+ ):
+ stream_format = adaptive_format
+ if stream_format is None:
+ raise MediaNotFoundError("No stream found for this track")
+ return stream_format
+
+ @classmethod
+ async def _decipher_signature(cls, ciphered_signature: str, item_id: str):
+ """Decipher the signature, required to build the Stream URL."""
embed_url = f"https://www.youtube.com/embed/{item_id}"
embed_html = pytube.request.get(embed_url)
js_url = pytube.extract.js_url(embed_html)
- js = pytube.request.get(js_url)
- cipher = pytube.cipher.Cipher(js=js)
+ ytm_js = pytube.request.get(js_url)
+ cipher = pytube.cipher.Cipher(js=ytm_js)
return cipher.get_signature(ciphered_signature)
-
- async def _get_signature_timestamp(self):
- """Gets a signature timestamp required to generate valid stream URLs"""
- response = await self._get_data(url=YTM_DOMAIN)
- match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
- if match is None:
- raise Exception("Could not identify the URL for base.js player.")
- url = YTM_DOMAIN + match.group(1)
- response = await self._get_data(url=url)
- match = re.search(r"signatureTimestamp[:=](\d+)", response)
- if match is None:
- raise Exception("Unable to identify the signatureTimestamp.")
- return int(match.group(1))