import asyncio
import logging
-import re
from collections.abc import AsyncGenerator
from time import time
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
-import pytube
+import yt_dlp
from ytmusicapi.constants import SUPPORTED_LANGUAGES
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
CONF_TOKEN_TYPE = "token_type"
CONF_EXPIRY_TIME = "expiry_time"
-YT_DOMAIN = "https://www.youtube.com"
YTM_DOMAIN = "https://music.youtube.com"
YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
VARIOUS_ARTISTS_YTM_ID = "UCUTXlgdcKU5vfzFqHOWIvkA"
"RDTMAK5uy_nilrsVWxrKskY0ZUpVZ3zpB0u4LwWTVJ4", # Replay Mix
"RDTMAK5uy_mZtXeU08kxXJOUhL0ETdAuZTh1z7aAFAo", # Archive Mix
)
+YTM_PREMIUM_CHECK_TRACK_ID = "dQw4w9WgXcQ"
SUPPORTED_FEATURES = (
ProviderFeature.LIBRARY_ARTISTS,
_headers = None
_context = None
_cookies = None
- _signature_timestamp = 0
_cipher = None
async def handle_async_init(self) -> None:
"""Set up the YTMusic provider."""
- logging.getLogger("pytube").setLevel(self.logger.level + 10)
+ logging.getLogger("yt_dlp").setLevel(self.logger.level + 10)
if not self.config.get_value(CONF_AUTH_TOKEN):
msg = "Invalid login credentials"
raise LoginFailed(msg)
await self._initialize_headers()
await self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
- self._signature_timestamp = await self._get_signature_timestamp()
# get default language (that is supported by YTM)
mass_locale = self.mass.metadata.locale
for lang_code in SUPPORTED_LANGUAGES:
break
else:
self.language = "en"
+ if not await self._user_has_ytm_premium():
+ raise LoginFailed("User does not have Youtube Music Premium")
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
if track_obj := await get_track(
prov_track_id=prov_track_id,
headers=self._headers,
- signature_timestamp=self._signature_timestamp,
language=self.language,
):
return await self._parse_track(track_obj)
return tracks
return []
- async def get_stream_details(self, item_id: str, retry=0) -> StreamDetails:
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- await self._check_oauth_token()
- track_obj = await get_track(
- prov_track_id=item_id,
- headers=self._headers,
- signature_timestamp=self._signature_timestamp,
- )
- if not track_obj:
- msg = f"Item {item_id} not found"
- raise MediaNotFoundError(msg)
- stream_format = await self._parse_stream_format(track_obj)
- url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
- if not await self._is_valid_deciphered_url(url=url):
- if retry > 4:
- self.logger.warning(
- f"Could not resolve a valid URL for item '{item_id}'. "
- "Are you playing music on another device using the same account?"
- )
- msg = f"Could not resolve a valid URL for item '{item_id}'."
- raise UnplayableMediaError(msg)
- self.logger.debug(
- "Invalid playback URL encountered. Retrying with new signature timestamp."
- )
- self._cipher = None
- self._signature_timestamp = await self._get_signature_timestamp()
- return await self.get_stream_details(item_id=item_id, retry=retry + 1)
+ stream_format = await self._get_stream_format(item_id=item_id)
+ self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)
stream_details = StreamDetails(
provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
- content_type=ContentType.try_parse(stream_format["mimeType"]),
+ content_type=ContentType.try_parse(stream_format["audio_ext"]),
),
stream_type=StreamType.HTTP,
- path=url,
+ path=stream_format["url"],
)
- if stream_format.get("audioChannels") and str(stream_format.get("audioChannels")).isdigit():
- stream_details.audio_format.channels = int(stream_format.get("audioChannels"))
- if stream_format.get("audioSampleRate") and stream_format.get("audioSampleRate").isdigit():
- stream_details.audio_format.sample_rate = int(stream_format.get("audioSampleRate"))
+ if (
+ stream_format.get("audio_channels")
+ and str(stream_format.get("audio_channels")).isdigit()
+ ):
+ stream_details.audio_format.channels = int(stream_format.get("audio_channels"))
+ if stream_format.get("asr"):
+ stream_details.audio_format.sample_rate = int(stream_format.get("asr"))
return stream_details
async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
self.config.update({CONF_EXPIRY_TIME: time() + token["expires_in"]})
self.config.update({CONF_TOKEN_TYPE: token["token_type"]})
await self._initialize_headers()
+ await self._update_ytdlp_oauth_token_cache()
async def _initialize_headers(self) -> dict[str, str]:
"""Return headers to include in the requests."""
"Accept-Language": "en-US,en;q=0.5",
"Content-Type": "application/json",
"X-Goog-AuthUser": "0",
- "x-origin": "https://music.youtube.com",
+ "x-origin": YTM_DOMAIN,
"X-Goog-Request-Time": str(int(time())),
"Authorization": auth,
}
item_id=str(artist_id),
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=f"https://music.youtube.com/channel/{artist_id}",
+ url=f"{YTM_DOMAIN}/channel/{artist_id}",
)
},
)
track.duration = int(track_obj["duration_seconds"])
return track
- async def _get_signature_timestamp(self):
- """Get a signature timestamp required to generate valid stream URLs."""
- response = await self._get_data(url=YTM_DOMAIN)
- match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
- if match is None:
- # retry with youtube domain
- response = await self._get_data(url=YT_DOMAIN)
- match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
- if match is None:
- msg = "Could not identify the URL for base.js player."
- raise Exception(msg) # pylint: disable=broad-exception-raised
- url = YTM_DOMAIN + match.group(1)
- response = await self._get_data(url=url)
- match = re.search(r"signatureTimestamp[:=](\d+)", response)
- if match is None:
- msg = "Unable to identify the signatureTimestamp."
- raise Exception(msg) # pylint: disable=broad-exception-raised
- return int(match.group(1))
-
- async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
- """Figure out the stream URL to use based on the YT track object."""
- url = None
- if stream_format.get("signatureCipher"):
- # Secured URL
- cipher_parts = {}
- for part in stream_format["signatureCipher"].split("&"):
- key, val = part.split("=", maxsplit=1)
- cipher_parts[key] = unquote(val)
- signature = await self._decipher_signature(
- ciphered_signature=cipher_parts["s"], item_id=item_id
- )
- url = cipher_parts["url"] + "&sig=" + signature
- elif stream_format.get("url"):
- # Non secured URL
- url = stream_format.get("url")
- else:
- self.logger.debug(
- f"Something went wrong. No URL found for stream format {stream_format}"
- )
- return url
-
- async def _decipher_signature(self, ciphered_signature: str, item_id: str):
- """Decipher the signature, required to build the Stream URL."""
-
- def _decipher():
- embed_url = f"https://www.youtube.com/embed/{item_id}"
- embed_html = pytube.request.get(embed_url)
- js_url = pytube.extract.js_url(embed_html)
- ytm_js = pytube.request.get(js_url)
- cipher = pytube.cipher.Cipher(js=ytm_js)
- return cipher
-
- if not self._cipher:
- self.logger.debug("Creating a new cipher")
- self._cipher = await asyncio.to_thread(_decipher)
- return self._cipher.get_signature(ciphered_signature)
-
- async def _is_valid_deciphered_url(self, url: str) -> bool:
- """Verify whether the URL has been deciphered using a valid cipher."""
- async with self.mass.http_session.head(url) as response:
- # TODO: Remove after 403 issue has been verified as fixed
- if response.status != 200:
- self.logger.debug(f"Deciphered URL HTTP status: {response.status}")
- return response.status != 403
+ async def _get_stream_format(self, item_id: str) -> dict[str, Any]:
+ """Figure out the stream URL to use and return the highest quality."""
+ await self._check_oauth_token()
+
+ def _extract_best_stream_url_format() -> dict[str, Any]:
+ url = f"{YTM_DOMAIN}/watch?v={item_id}"
+ ydl_opts = {
+ "quiet": self.logger.level > logging.DEBUG,
+ "username": "oauth2",
+ "password": "",
+ }
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
+ try:
+ info = ydl.extract_info(url, download=False)
+ except yt_dlp.utils.DownloadError as err:
+ raise UnplayableMediaError(err) from err
+ format_selector = ydl.build_format_selector("m4a/bestaudio")
+ stream_format = next(format_selector({"formats": info["formats"]}))
+ return stream_format
+
+ return await asyncio.to_thread(_extract_best_stream_url_format)
+
+ async def _update_ytdlp_oauth_token_cache(self) -> None:
+ """Update the ytdlp token so we can grab the best available quality audio stream."""
+
+ def _update_oauth_cache() -> None:
+ ydl_opts = {
+ "quiet": self.logger.level > logging.DEBUG,
+ "username": "oauth2",
+ "password": "",
+ }
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
+ token_data = {
+ "access_token": self.config.get_value(CONF_AUTH_TOKEN),
+ "expires": self.config.get_value(CONF_EXPIRY_TIME),
+ "token_type": self.config.get_value(CONF_TOKEN_TYPE),
+ "refresh_token": self.config.get_value(CONF_REFRESH_TOKEN),
+ }
+ ydl.cache.store("youtube-oauth2", "token_data", token_data)
+
+ await asyncio.to_thread(_update_oauth_cache)
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
return ItemMapping(
artist_id = VARIOUS_ARTISTS_YTM_ID
return self._get_item_mapping(MediaType.ARTIST, artist_id, artist_obj.get("name"))
+ async def _user_has_ytm_premium(self) -> bool:
+ """Check if the user has Youtube Music Premium."""
+ stream_format = await self._get_stream_format(YTM_PREMIUM_CHECK_TRACK_ID)
+ # Only premium users can stream the HQ stream of this song
+ return stream_format["format_id"] == "141"
+
async def _parse_thumbnails(self, thumbnails_obj: dict) -> list[MediaItemImage]:
"""Parse and YTM thumbnails to MediaItemImage."""
result: list[MediaItemImage] = []
)
)
return result
-
- @classmethod
- async def _parse_stream_format(cls, track_obj: dict) -> dict:
- """Grab the highest available audio stream from available streams."""
- stream_format = {}
- quality_mapper = {
- "AUDIO_QUALITY_LOW": 1,
- "AUDIO_QUALITY_MEDIUM": 2,
- "AUDIO_QUALITY_HIGH": 3,
- }
- if "streamingData" not in track_obj:
- raise MediaNotFoundError("No stream found for this track")
- for adaptive_format in track_obj["streamingData"]["adaptiveFormats"]:
- if adaptive_format["mimeType"].startswith("audio") and (
- not stream_format
- or quality_mapper.get(adaptive_format["audioQuality"], 0)
- > quality_mapper.get(stream_format["audioQuality"], 0)
- ):
- stream_format = adaptive_format
- if stream_format is None:
- raise MediaNotFoundError("No stream found for this track")
- return stream_format