Improve Youtube Music (#413)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Thu, 14 Jul 2022 10:00:43 +0000 (12:00 +0200)
committerGitHub <noreply@github.com>
Thu, 14 Jul 2022 10:00:43 +0000 (12:00 +0200)
* Cache signature timestamp and cipher

* Fix album sometimes not loading

* Add expires to streamdetails, log loudness

* Add audio channels and sample rate to streamdetails

music_assistant/music_providers/ytmusic/ytmusic.py

index 4a8085c582e7ace0c672fe30f30c281e318d6306..1544dec7171e67820d9706d79a2a3fc98e3a2f35 100644 (file)
@@ -1,6 +1,7 @@
 """Youtube Music support for MusicAssistant."""
 import re
 from operator import itemgetter
+from time import time
 from typing import AsyncGenerator, Dict, List, Optional
 from urllib.parse import unquote
 
@@ -58,6 +59,8 @@ class YoutubeMusicProvider(MusicProvider):
     _headers = None
     _context = None
     _cookies = None
+    _signature_timestamp = 0
+    _cipher = None
 
     async def setup(self) -> bool:
         """Set up the YTMusic provider."""
@@ -68,6 +71,7 @@ class YoutubeMusicProvider(MusicProvider):
         await self._initialize_headers(cookie=self.config.password)
         await self._initialize_context()
         self._cookies = {"CONSENT": "YES+1"}
+        self._signature_timestamp = await self._get_signature_timestamp()
         return True
 
     async def search(
@@ -231,22 +235,41 @@ class YoutubeMusicProvider(MusicProvider):
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
-        signature_timestamp = await self._get_signature_timestamp()
         data = {
             "playbackContext": {
-                "contentPlaybackContext": {"signatureTimestamp": signature_timestamp}
+                "contentPlaybackContext": {
+                    "signatureTimestamp": self._signature_timestamp
+                }
             },
             "video_id": item_id,
         }
         track_obj = await self._post_data("player", data=data)
         stream_format = await self._parse_stream_format(track_obj)
         url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
-        return StreamDetails(
+        stream_details = StreamDetails(
             provider=self.type,
             item_id=item_id,
             content_type=ContentType.try_parse(stream_format["mimeType"]),
             direct=url,
         )
+        if (
+            track_obj["streamingData"].get("expiresInSeconds")
+            and track_obj["streamingData"].get("expiresInSeconds").isdigit()
+        ):
+            stream_details.expires = time() + int(
+                track_obj["streamingData"].get("expiresInSeconds")
+            )
+        if (
+            stream_format.get("audioChannels")
+            and str(stream_format.get("audioChannels")).isdigit()
+        ):
+            stream_details.channels = int(stream_format.get("audioChannels"))
+        if (
+            stream_format.get("audioSampleRate")
+            and stream_format.get("audioSampleRate").isdigit()
+        ):
+            stream_details.sample_rate = int(stream_format.get("audioSampleRate"))
+        return stream_details
 
     async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
         url = f"{YTM_BASE_URL}{endpoint}"
@@ -398,11 +421,10 @@ class YoutubeMusicProvider(MusicProvider):
                 track_obj["thumbnails"]
             )
         if (
-            "album" in track_obj
-            and track_obj["album"]
-            and "id" in track_obj["album"]
-            and track_obj["album"]["id"]
-            and "artists" in track_obj
+            track_obj.get("album")
+            and track_obj.get("artists")
+            and isinstance(track_obj.get("album"), dict)
+            and track_obj["album"].get("id")
         ):
             album = track_obj["album"]
             album["artists"] = track_obj["artists"]
@@ -495,6 +517,8 @@ class YoutubeMusicProvider(MusicProvider):
             js_url = pytube.extract.js_url(embed_html)
             ytm_js = pytube.request.get(js_url)
             cipher = pytube.cipher.Cipher(js=ytm_js)
-            return cipher.get_signature(ciphered_signature)
+            return cipher
 
-        return await self.mass.loop.run_in_executor(None, _decipher)
+        if not self._cipher:
+            self._cipher = await self.mass.loop.run_in_executor(None, _decipher)
+        return self._cipher.get_signature(ciphered_signature)