"""Youtube Music support for MusicAssistant."""
import re
from operator import itemgetter
+from time import time
from typing import AsyncGenerator, Dict, List, Optional
from urllib.parse import unquote
_headers = None
_context = None
_cookies = None
+ _signature_timestamp = 0
+ _cipher = None
async def setup(self) -> bool:
"""Set up the YTMusic provider."""
await self._initialize_headers(cookie=self.config.password)
await self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
+ self._signature_timestamp = await self._get_signature_timestamp()
return True
async def search(
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- signature_timestamp = await self._get_signature_timestamp()
data = {
"playbackContext": {
- "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
+ "contentPlaybackContext": {
+ "signatureTimestamp": self._signature_timestamp
+ }
},
"video_id": item_id,
}
track_obj = await self._post_data("player", data=data)
stream_format = await self._parse_stream_format(track_obj)
url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
- return StreamDetails(
+ stream_details = StreamDetails(
provider=self.type,
item_id=item_id,
content_type=ContentType.try_parse(stream_format["mimeType"]),
direct=url,
)
+ if (
+ track_obj["streamingData"].get("expiresInSeconds")
+ and track_obj["streamingData"].get("expiresInSeconds").isdigit()
+ ):
+ stream_details.expires = time() + int(
+ track_obj["streamingData"].get("expiresInSeconds")
+ )
+ if (
+ stream_format.get("audioChannels")
+ and str(stream_format.get("audioChannels")).isdigit()
+ ):
+ stream_details.channels = int(stream_format.get("audioChannels"))
+ if (
+ stream_format.get("audioSampleRate")
+ and stream_format.get("audioSampleRate").isdigit()
+ ):
+ stream_details.sample_rate = int(stream_format.get("audioSampleRate"))
+ return stream_details
async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
url = f"{YTM_BASE_URL}{endpoint}"
track_obj["thumbnails"]
)
if (
- "album" in track_obj
- and track_obj["album"]
- and "id" in track_obj["album"]
- and track_obj["album"]["id"]
- and "artists" in track_obj
+ track_obj.get("album")
+ and track_obj.get("artists")
+ and isinstance(track_obj.get("album"), dict)
+ and track_obj["album"].get("id")
):
album = track_obj["album"]
album["artists"] = track_obj["artists"]
js_url = pytube.extract.js_url(embed_html)
ytm_js = pytube.request.get(js_url)
cipher = pytube.cipher.Cipher(js=ytm_js)
- return cipher.get_signature(ciphered_signature)
+ return cipher
- return await self.mass.loop.run_in_executor(None, _decipher)
+ if not self._cipher:
+ self._cipher = await self.mass.loop.run_in_executor(None, _decipher)
+ return self._cipher.get_signature(ciphered_signature)