Generate valid stream urls
authorMarvin Schenkel <marvinschenkel@gmail.com>
Sat, 2 Jul 2022 10:39:07 +0000 (12:39 +0200)
committerMarvin Schenkel <marvinschenkel@gmail.com>
Wed, 6 Jul 2022 14:05:25 +0000 (16:05 +0200)
examples/ytmusic.py
music_assistant/music_providers/ytmusic.py

index 10e3bc2caecf8c0ca3e26024084f810f656dd1aa..86a90c1d94360b0b8e73f465e1437f3ea378d4d9 100644 (file)
@@ -108,14 +108,7 @@ async def main():
         yt = mass.music.get_provider(ProviderType.YTMUSIC)
         await yt.get_album("MPREb_9nqEki4ZDpp")
         track = await yt.get_track("pE3ju1qS848")
-        
-        test_player1 = TestPlayer("test1")
-        await mass.players.register_player(test_player1)
-        await test_player1.active_queue.play_media(track)
-
-        await asyncio.sleep(3600)
-        
-
+        await yt.get_stream_details("pE3ju1qS848")
 
 if __name__ == "__main__":
     try:
index 7ec13bef8cd629a167df9c0d780049d1dc5c37d1..65870de254785fe403c0e53cb3005e542dad52d6 100644 (file)
@@ -1,7 +1,10 @@
 """YT Music support for MusicAssistant"""
 import json
+import requests
+import re
 from requests.structures import CaseInsensitiveDict
 from typing import AsyncGenerator, Dict, List, Optional
+from urllib.parse import unquote
 
 import ytmusicapi
 import pytube
@@ -49,6 +52,7 @@ class YTMusic(MusicProvider):
         """Sets up the YTMusic provider"""
         self._headers = await self._initialize_headers()
         self._context = await self._initialize_context()
+        self._cookies = {'CONSENT': 'YES+1'}
         return True
 
     async def get_album(self, prov_album_id) -> Album:
@@ -80,7 +84,7 @@ class YTMusic(MusicProvider):
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
-        signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
+        signature_timestamp = await self._get_signature_timestamp()
         data = {
             "playbackContext": {
                 "contentPlaybackContext": {
@@ -114,10 +118,16 @@ class YTMusic(MusicProvider):
         data.update(self._context)
 
         async with self.mass.http_session.post(
-            url, headers=self._headers, json=data, verify_ssl=False
+            url, headers=self._headers, json=data, verify_ssl=False, cookies=self._cookies
         ) as response:
             return await response.json()
 
+    async def _get_data(self, url: str, params: Dict = None):
+        async with self.mass.http_session.get(
+            url, headers=self._headers, params=params, cookies=self._cookies
+        ) as response:
+            return await response.text()
+
     async def _initialize_headers(self) -> Dict[str, str]:
         """Returns headers to include in the requests"""
         # TODO: Replace with Cookie string from Config
@@ -192,10 +202,11 @@ class YTMusic(MusicProvider):
         cipherParts = dict()
         for part in stream_format["signatureCipher"].split("&"):
             k, v = part.split("=", maxsplit=1)
-            cipherParts[k] = v
+            cipherParts[k] = unquote(v)
 
-        signature = await self._decipher_signature(ciphered_signature=cipherParts["s"], item_id=item_id)
+        signature = await self._decipher_signature(ciphered_signature=cipherParts["s"], item_id=item_id)        
         url = cipherParts["url"] + "&sig=" + signature
+        
         return url        
 
     async def _decipher_signature(self, ciphered_signature: str, item_id: str):
@@ -206,3 +217,18 @@ class YTMusic(MusicProvider):
         js = pytube.request.get(js_url)
         cipher = pytube.cipher.Cipher(js=js)
         return cipher.get_signature(ciphered_signature)
+
+    async def _get_signature_timestamp(self):
+        """Gets a signature timestamp required to generate valid stream URLs"""
+        response = await self._get_data(url=YTM_DOMAIN)
+        match = re.search(r'jsUrl"\s*:\s*"([^"]+)"', response)
+        if match is None:
+            raise Exception("Could not identify the URL for base.js player.")
+
+        url = YTM_DOMAIN + match.group(1)
+        response = await self._get_data(url=url)
+        match = re.search(r"signatureTimestamp[:=](\d+)", response)
+        if match is None:
+            raise Exception("Unable to identify the signatureTimestamp.")
+
+        return int(match.group(1))