Auto renew cipher for Youtube Music provider (#539)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Thu, 16 Mar 2023 21:15:38 +0000 (22:15 +0100)
committerGitHub <noreply@github.com>
Thu, 16 Mar 2023 21:15:38 +0000 (22:15 +0100)
This PR adds logic to auto-renew a cipher when it has expires.

music_assistant/common/models/errors.py
music_assistant/server/providers/ytmusic/__init__.py

index e07881496b685ccdeb7885cf4f38f33ea8a9490b..e26456ccc25a66df6e5e76195926006d3e2ced5e 100644 (file)
@@ -79,6 +79,12 @@ class InvalidCommand(MusicAssistantError):
     error_code = 12
 
 
+class UnplayableMediaError(MusicAssistantError):
+    """Error thrown when a MediaItem cannot be played properly."""
+
+    error_code = 13
+
+
 def error_code_to_exception(error_code: int) -> MusicAssistantError:
     """Return MusicAssistant Error (exception) from error_code."""
     match error_code:
@@ -106,5 +112,7 @@ def error_code_to_exception(error_code: int) -> MusicAssistantError:
             return PlayerCommandFailed
         case 12:
             return InvalidCommand
+        case 13:
+            return UnplayableMediaError
         case _:
             return MusicAssistantError
index 4765cde58ad7ef9f258a981476a8650cb4ba97da..1dc9a408b7524a9282aa7c4672c2da9600cbad85 100644 (file)
@@ -10,7 +10,12 @@ import pytube
 import ytmusicapi
 
 from music_assistant.common.models.enums import ProviderFeature
-from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
+from music_assistant.common.models.errors import (
+    InvalidDataError,
+    LoginFailed,
+    MediaNotFoundError,
+    UnplayableMediaError,
+)
 from music_assistant.common.models.media_items import (
     Album,
     AlbumType,
@@ -604,7 +609,7 @@ class YoutubeMusicProvider(MusicProvider):
             raise Exception("Unable to identify the signatureTimestamp.")
         return int(match.group(1))
 
-    async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
+    async def _parse_stream_url(self, stream_format: dict, item_id: str, retry: bool = True) -> str:
         """Figure out the stream URL to use based on the YT track object."""
         url = None
         if stream_format.get("signatureCipher"):
@@ -617,11 +622,42 @@ class YoutubeMusicProvider(MusicProvider):
                 ciphered_signature=cipher_parts["s"], item_id=item_id
             )
             url = cipher_parts["url"] + "&sig=" + signature
+            # Verify if URL is playable. If not, obtain a new cipher and try again.
+            if not await self._is_valid_deciphered_url(url=url):
+                if not retry:
+                    raise UnplayableMediaError(
+                        f"Cannot obtain a valid URL for item '{item_id}' after renewing cipher."
+                    )
+                self.logger.debug("Cipher expired. Obtaining new Cipher.")
+                self._cipher = None
+                return self._parse_stream_url(
+                    stream_format=stream_format, item_id=item_id, retry=False
+                )
         elif stream_format.get("url"):
             # Non secured URL
             url = stream_format.get("url")
         return url
 
+    async def _decipher_signature(self, ciphered_signature: str, item_id: str):
+        """Decipher the signature, required to build the Stream URL."""
+
+        def _decipher():
+            embed_url = f"https://www.youtube.com/embed/{item_id}"
+            embed_html = pytube.request.get(embed_url)
+            js_url = pytube.extract.js_url(embed_html)
+            ytm_js = pytube.request.get(js_url)
+            cipher = pytube.cipher.Cipher(js=ytm_js)
+            return cipher
+
+        if not self._cipher:
+            self._cipher = await asyncio.to_thread(_decipher)
+        return self._cipher.get_signature(ciphered_signature)
+
+    async def _is_valid_deciphered_url(self, url: str) -> bool:
+        """Verify whether the URL has been deciphered using a valid cipher."""
+        async with self.mass.http_session.head(url) as response:
+            return response.status == 200
+
     @classmethod
     async def _parse_thumbnails(cls, thumbnails_obj: dict) -> list[MediaItemImage]:
         """Parse and sort a list of thumbnails and return the highest quality."""
@@ -647,18 +683,3 @@ class YoutubeMusicProvider(MusicProvider):
         if stream_format is None:
             raise MediaNotFoundError("No stream found for this track")
         return stream_format
-
-    async def _decipher_signature(self, ciphered_signature: str, item_id: str):
-        """Decipher the signature, required to build the Stream URL."""
-
-        def _decipher():
-            embed_url = f"https://www.youtube.com/embed/{item_id}"
-            embed_html = pytube.request.get(embed_url)
-            js_url = pytube.extract.js_url(embed_html)
-            ytm_js = pytube.request.get(js_url)
-            cipher = pytube.cipher.Cipher(js=ytm_js)
-            return cipher
-
-        if not self._cipher:
-            self._cipher = await asyncio.to_thread(_decipher)
-        return self._cipher.get_signature(ciphered_signature)