From d96d17712ec894063a59b835806e3f792e200787 Mon Sep 17 00:00:00 2001 From: Marvin Schenkel Date: Thu, 16 Mar 2023 22:15:38 +0100 Subject: [PATCH] Auto renew cipher for Youtube Music provider (#539) This PR adds logic to auto-renew a cipher when it has expires. --- music_assistant/common/models/errors.py | 8 +++ .../server/providers/ytmusic/__init__.py | 55 +++++++++++++------ 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/music_assistant/common/models/errors.py b/music_assistant/common/models/errors.py index e0788149..e26456cc 100644 --- a/music_assistant/common/models/errors.py +++ b/music_assistant/common/models/errors.py @@ -79,6 +79,12 @@ class InvalidCommand(MusicAssistantError): error_code = 12 +class UnplayableMediaError(MusicAssistantError): + """Error thrown when a MediaItem cannot be played properly.""" + + error_code = 13 + + def error_code_to_exception(error_code: int) -> MusicAssistantError: """Return MusicAssistant Error (exception) from error_code.""" match error_code: @@ -106,5 +112,7 @@ def error_code_to_exception(error_code: int) -> MusicAssistantError: return PlayerCommandFailed case 12: return InvalidCommand + case 13: + return UnplayableMediaError case _: return MusicAssistantError diff --git a/music_assistant/server/providers/ytmusic/__init__.py b/music_assistant/server/providers/ytmusic/__init__.py index 4765cde5..1dc9a408 100644 --- a/music_assistant/server/providers/ytmusic/__init__.py +++ b/music_assistant/server/providers/ytmusic/__init__.py @@ -10,7 +10,12 @@ import pytube import ytmusicapi from music_assistant.common.models.enums import ProviderFeature -from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError +from music_assistant.common.models.errors import ( + InvalidDataError, + LoginFailed, + MediaNotFoundError, + UnplayableMediaError, +) from music_assistant.common.models.media_items import ( Album, AlbumType, @@ -604,7 +609,7 @@ class YoutubeMusicProvider(MusicProvider): raise Exception("Unable to identify the signatureTimestamp.") return int(match.group(1)) - async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str: + async def _parse_stream_url(self, stream_format: dict, item_id: str, retry: bool = True) -> str: """Figure out the stream URL to use based on the YT track object.""" url = None if stream_format.get("signatureCipher"): @@ -617,11 +622,42 @@ class YoutubeMusicProvider(MusicProvider): ciphered_signature=cipher_parts["s"], item_id=item_id ) url = cipher_parts["url"] + "&sig=" + signature + # Verify if URL is playable. If not, obtain a new cipher and try again. + if not await self._is_valid_deciphered_url(url=url): + if not retry: + raise UnplayableMediaError( + f"Cannot obtain a valid URL for item '{item_id}' after renewing cipher." + ) + self.logger.debug("Cipher expired. Obtaining new Cipher.") + self._cipher = None + return self._parse_stream_url( + stream_format=stream_format, item_id=item_id, retry=False + ) elif stream_format.get("url"): # Non secured URL url = stream_format.get("url") return url + async def _decipher_signature(self, ciphered_signature: str, item_id: str): + """Decipher the signature, required to build the Stream URL.""" + + def _decipher(): + embed_url = f"https://www.youtube.com/embed/{item_id}" + embed_html = pytube.request.get(embed_url) + js_url = pytube.extract.js_url(embed_html) + ytm_js = pytube.request.get(js_url) + cipher = pytube.cipher.Cipher(js=ytm_js) + return cipher + + if not self._cipher: + self._cipher = await asyncio.to_thread(_decipher) + return self._cipher.get_signature(ciphered_signature) + + async def _is_valid_deciphered_url(self, url: str) -> bool: + """Verify whether the URL has been deciphered using a valid cipher.""" + async with self.mass.http_session.head(url) as response: + return response.status == 200 + @classmethod async def _parse_thumbnails(cls, thumbnails_obj: dict) -> list[MediaItemImage]: """Parse and sort a list of thumbnails and return the highest quality.""" @@ -647,18 +683,3 @@ class YoutubeMusicProvider(MusicProvider): if stream_format is None: raise MediaNotFoundError("No stream found for this track") return stream_format - - async def _decipher_signature(self, ciphered_signature: str, item_id: str): - """Decipher the signature, required to build the Stream URL.""" - - def _decipher(): - embed_url = f"https://www.youtube.com/embed/{item_id}" - embed_html = pytube.request.get(embed_url) - js_url = pytube.extract.js_url(embed_html) - ytm_js = pytube.request.get(js_url) - cipher = pytube.cipher.Cipher(js=ytm_js) - return cipher - - if not self._cipher: - self._cipher = await asyncio.to_thread(_decipher) - return self._cipher.get_signature(ciphered_signature) -- 2.34.1