import asyncio
import logging
from collections.abc import AsyncGenerator
-from time import time
+from io import StringIO
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
from music_assistant_models.streamdetails import StreamDetails
from ytmusicapi.constants import SUPPORTED_LANGUAGES
from ytmusicapi.exceptions import YTMusicServerError
+from ytmusicapi.helpers import get_authorization, sapisid_from_cookie
-from music_assistant.helpers.auth import AuthenticationHelper
+from music_assistant.constants import CONF_USERNAME
from music_assistant.models.music_provider import MusicProvider
from .helpers import (
add_remove_playlist_tracks,
+ convert_to_netscape,
get_album,
get_artist,
get_library_albums,
get_playlist,
get_song_radio_tracks,
get_track,
+ is_brand_account,
library_add_remove_album,
library_add_remove_artist,
library_add_remove_playlist,
- login_oauth,
- refresh_oauth_token,
search,
)
CONF_COOKIE = "cookie"
-CONF_ACTION_AUTH = "auth"
-CONF_AUTH_TOKEN = "auth_token"
-CONF_REFRESH_TOKEN = "refresh_token"
-CONF_TOKEN_TYPE = "token_type"
-CONF_EXPIRY_TIME = "expiry_time"
YTM_DOMAIN = "https://music.youtube.com"
+YTM_COOKIE_DOMAIN = ".youtube.com"
YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
VARIOUS_ARTISTS_YTM_ID = "UCUTXlgdcKU5vfzFqHOWIvkA"
# Playlist ID's are not unique across instances for lists like 'Liked videos', 'SuperMix' etc.
async def get_config_entries(
- mass: MusicAssistant,
+ mass: MusicAssistant, # noqa: ARG001
instance_id: str | None = None, # noqa: ARG001
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
+ action: str | None = None, # noqa: ARG001
+ values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
) -> tuple[ConfigEntry, ...]:
"""
Return Config entries to setup this provider.
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- if action == CONF_ACTION_AUTH:
- async with AuthenticationHelper(mass, values["session_id"]) as auth_helper:
- token = await login_oauth(auth_helper)
- values[CONF_AUTH_TOKEN] = token["access_token"]
- values[CONF_REFRESH_TOKEN] = token["refresh_token"]
- values[CONF_EXPIRY_TIME] = token["expires_in"]
- values[CONF_TOKEN_TYPE] = token["token_type"]
- # return the collected config entries
return (
ConfigEntry(
- key=CONF_AUTH_TOKEN,
- type=ConfigEntryType.SECURE_STRING,
- label="Authentication token for Youtube Music",
- description="You need to link Music Assistant to your Youtube Music account. "
- "Please ignore the code on the page the next page and click 'Next'.",
- action=CONF_ACTION_AUTH,
- action_label="Authenticate on Youtube Music",
- value=values.get(CONF_AUTH_TOKEN) if values else None,
+ key=CONF_USERNAME, type=ConfigEntryType.STRING, label="Username", required=True
),
ConfigEntry(
- key=CONF_REFRESH_TOKEN,
+ key=CONF_COOKIE,
type=ConfigEntryType.SECURE_STRING,
- label=CONF_REFRESH_TOKEN,
- hidden=True,
- value=values.get(CONF_REFRESH_TOKEN) if values else None,
- ),
- ConfigEntry(
- key=CONF_EXPIRY_TIME,
- type=ConfigEntryType.INTEGER,
- label="Expiry time of auth token for Youtube Music",
- hidden=True,
- value=values.get(CONF_EXPIRY_TIME) if values else None,
- ),
- ConfigEntry(
- key=CONF_TOKEN_TYPE,
- type=ConfigEntryType.STRING,
- label="The token type required to create headers",
- hidden=True,
- value=values.get(CONF_TOKEN_TYPE) if values else None,
+ label="Login Cookie",
+ required=True,
+ description="The Login cookie you grabbed from an existing session, "
+ "see the documentation.",
),
)
_context = None
_cookies = None
_cipher = None
+ _yt_user = None
+ _cookie = None
async def handle_async_init(self) -> None:
"""Set up the YTMusic provider."""
logging.getLogger("yt_dlp").setLevel(self.logger.level + 10)
- if not self.config.get_value(CONF_AUTH_TOKEN):
- msg = "Invalid login credentials"
- raise LoginFailed(msg)
+ self._cookie = self.config.get_value(CONF_COOKIE)
+ yt_username = self.config.get_value(CONF_USERNAME)
+ self._yt_user = yt_username if is_brand_account(yt_username) else None
+ # yt-dlp needs a netscape formatted cookie
+ self._netscape_cookie = convert_to_netscape(self._cookie, YTM_COOKIE_DOMAIN)
self._initialize_headers()
self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Youtube Music."""
- await self._check_oauth_token()
- artists_obj = await get_library_artists(headers=self._headers, language=self.language)
+ artists_obj = await get_library_artists(
+ headers=self._headers, language=self.language, user=self._yt_user
+ )
for artist in artists_obj:
yield self._parse_artist(artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Youtube Music."""
- await self._check_oauth_token()
- albums_obj = await get_library_albums(headers=self._headers, language=self.language)
+ albums_obj = await get_library_albums(
+ headers=self._headers, language=self.language, user=self._yt_user
+ )
for album in albums_obj:
yield self._parse_album(album, album["browseId"])
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
- await self._check_oauth_token()
- playlists_obj = await get_library_playlists(headers=self._headers, language=self.language)
+ playlists_obj = await get_library_playlists(
+ headers=self._headers, language=self.language, user=self._yt_user
+ )
for playlist in playlists_obj:
yield self._parse_playlist(playlist)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Youtube Music."""
- await self._check_oauth_token()
- tracks_obj = await get_library_tracks(headers=self._headers, language=self.language)
+ tracks_obj = await get_library_tracks(
+ headers=self._headers, language=self.language, user=self._yt_user
+ )
for track in tracks_obj:
# Library tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
- await self._check_oauth_token()
if album_obj := await get_album(prov_album_id=prov_album_id, language=self.language):
return self._parse_album(album_obj=album_obj, album_id=prov_album_id)
msg = f"Item {prov_album_id} not found"
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
- await self._check_oauth_token()
album_obj = await get_album(prov_album_id=prov_album_id, language=self.language)
if not album_obj.get("tracks"):
return []
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
- await self._check_oauth_token()
if artist_obj := await get_artist(
prov_artist_id=prov_artist_id, headers=self._headers, language=self.language
):
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- await self._check_oauth_token()
if track_obj := await get_track(
prov_track_id=prov_track_id,
headers=self._headers,
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
if playlist_obj := await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers, language=self.language
+ prov_playlist_id=prov_playlist_id,
+ headers=self._headers,
+ language=self.language,
+ user=self._yt_user,
):
return self._parse_playlist(playlist_obj)
msg = f"Item {prov_playlist_id} not found"
if page > 0:
# paging not supported, we always return the whole list at once
return []
- await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
# Add a try to prevent MA from stopping syncing whenever we fail a single playlist
try:
playlist_obj = await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers
+ prov_playlist_id=prov_playlist_id, headers=self._headers, user=self._yt_user
)
except KeyError as ke:
self.logger.warning("Could not load playlist: %s: %s", prov_playlist_id, ke)
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
- await self._check_oauth_token()
artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if "albums" in artist_obj and "results" in artist_obj["albums"]:
albums = []
async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
"""Get a list of 25 most popular tracks for the given artist."""
- await self._check_oauth_token()
artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
prov_playlist_id = artist_obj["songs"]["browseId"]
async def library_add(self, item: MediaItemType) -> bool:
"""Add an item to the library."""
- await self._check_oauth_token()
result = False
if item.media_type == MediaType.ARTIST:
result = await library_add_remove_artist(
- headers=self._headers, prov_artist_id=item.item_id, add=True
+ headers=self._headers, prov_artist_id=item.item_id, add=True, user=self._yt_user
)
elif item.media_type == MediaType.ALBUM:
result = await library_add_remove_album(
- headers=self._headers, prov_item_id=item.item_id, add=True
+ headers=self._headers, prov_item_id=item.item_id, add=True, user=self._yt_user
)
elif item.media_type == MediaType.PLAYLIST:
result = await library_add_remove_playlist(
- headers=self._headers, prov_item_id=item.item_id, add=True
+ headers=self._headers, prov_item_id=item.item_id, add=True, user=self._yt_user
)
elif item.media_type == MediaType.TRACK:
raise NotImplementedError
async def library_remove(self, prov_item_id, media_type: MediaType):
"""Remove an item from the library."""
- await self._check_oauth_token()
result = False
try:
if media_type == MediaType.ARTIST:
result = await library_add_remove_artist(
- headers=self._headers, prov_artist_id=prov_item_id, add=False
+ headers=self._headers,
+ prov_artist_id=prov_item_id,
+ add=False,
+ user=self._yt_user,
)
elif media_type == MediaType.ALBUM:
result = await library_add_remove_album(
- headers=self._headers, prov_item_id=prov_item_id, add=False
+ headers=self._headers, prov_item_id=prov_item_id, add=False, user=self._yt_user
)
elif media_type == MediaType.PLAYLIST:
result = await library_add_remove_playlist(
- headers=self._headers, prov_item_id=prov_item_id, add=False
+ headers=self._headers, prov_item_id=prov_item_id, add=False, user=self._yt_user
)
elif media_type == MediaType.TRACK:
raise NotImplementedError
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
- await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
prov_playlist_id=prov_playlist_id,
prov_track_ids=prov_track_ids,
add=True,
+ user=self._yt_user,
)
async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
) -> None:
"""Remove track(s) from playlist."""
- await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
prov_playlist_id=prov_playlist_id,
prov_track_ids=tracks_to_delete,
add=False,
+ user=self._yt_user,
)
async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
"""Retrieve a dynamic list of tracks based on the provided item."""
- await self._check_oauth_token()
result = []
result = await get_song_radio_tracks(
- headers=self._headers,
- prov_item_id=prov_track_id,
- limit=limit,
+ headers=self._headers, prov_item_id=prov_track_id, limit=limit, user=self._yt_user
)
if "tracks" in result:
tracks = []
async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
"""Post data to the given endpoint."""
- await self._check_oauth_token()
url = f"{YTM_BASE_URL}{endpoint}"
data.update(self._context)
async with self.mass.http_session.post(
async def _get_data(self, url: str, params: dict | None = None):
"""Get data from the given URL."""
- await self._check_oauth_token()
async with self.mass.http_session.get(
url, headers=self._headers, params=params, cookies=self._cookies
) as response:
return await response.text()
- async def _check_oauth_token(self) -> None:
- """Verify the OAuth token is valid and refresh if needed."""
- if self.config.get_value(CONF_EXPIRY_TIME) < time():
- token = await refresh_oauth_token(
- self.mass.http_session, self.config.get_value(CONF_REFRESH_TOKEN)
- )
- self.config.update({CONF_AUTH_TOKEN: token["access_token"]})
- self.config.update({CONF_EXPIRY_TIME: time() + token["expires_in"]})
- self.config.update({CONF_TOKEN_TYPE: token["token_type"]})
- self._initialize_headers()
-
def _initialize_headers(self) -> dict[str, str]:
"""Return headers to include in the requests."""
- auth = f"{self.config.get_value(CONF_TOKEN_TYPE)} {self.config.get_value(CONF_AUTH_TOKEN)}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", # noqa: E501
"Accept": "*/*",
"Content-Type": "application/json",
"X-Goog-AuthUser": "0",
"x-origin": YTM_DOMAIN,
- "X-Goog-Request-Time": str(int(time())),
- "Authorization": auth,
+ "Cookie": self._cookie,
}
+ sapisid = sapisid_from_cookie(self._cookie)
+ headers["Authorization"] = get_authorization(sapisid + " " + YTM_DOMAIN)
self._headers = headers
def _initialize_context(self) -> dict[str, str]:
async def _get_stream_format(self, item_id: str) -> dict[str, Any]:
"""Figure out the stream URL to use and return the highest quality."""
- await self._check_oauth_token()
def _extract_best_stream_url_format() -> dict[str, Any]:
url = f"{YTM_DOMAIN}/watch?v={item_id}"
- auth = (
- f"{self.config.get_value(CONF_TOKEN_TYPE)} {self.config.get_value(CONF_AUTH_TOKEN)}"
- )
ydl_opts = {
"quiet": self.logger.level > logging.DEBUG,
- # This enables the access token plugin so we can grab the best
- # available quality audio stream
- "username": auth,
+ "cookiefile": StringIO(self._netscape_cookie),
# This enforces a player client and skips unnecessary scraping to increase speed
"extractor_args": {
"youtube": {"skip": ["translated_subs", "dash"], "player_client": ["ios"]}
"""
import asyncio
+from http.cookies import SimpleCookie
from time import time
import ytmusicapi
-from aiohttp import ClientSession
-from ytmusicapi.constants import (
- OAUTH_CLIENT_ID,
- OAUTH_CLIENT_SECRET,
- OAUTH_CODE_URL,
- OAUTH_SCOPE,
- OAUTH_TOKEN_URL,
- OAUTH_USER_AGENT,
-)
-
-from music_assistant.helpers.auth import AuthenticationHelper
async def get_artist(
async def get_playlist(
- prov_playlist_id: str, headers: dict[str, str], language: str = "en"
+ prov_playlist_id: str, headers: dict[str, str], language: str = "en", user: str | None = None
) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_playlist():
- ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=None)
playlist["checksum"] = get_playlist_checksum(playlist)
# Fix missing playlist id in some edge cases
return await asyncio.to_thread(_get_song)
-async def get_library_artists(headers: dict[str, str], language: str = "en") -> dict[str, str]:
+async def get_library_artists(
+ headers: dict[str, str], language: str = "en", user: str | None = None
+) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_artists function."""
def _get_library_artists():
- ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
artists = ytm.get_library_subscriptions(limit=9999)
# Sync properties with uniformal artist object
for artist in artists:
return await asyncio.to_thread(_get_library_artists)
-async def get_library_albums(headers: dict[str, str], language: str = "en") -> dict[str, str]:
+async def get_library_albums(
+ headers: dict[str, str], language: str = "en", user: str | None = None
+) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_albums function."""
def _get_library_albums():
- ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
return ytm.get_library_albums(limit=9999)
return await asyncio.to_thread(_get_library_albums)
-async def get_library_playlists(headers: dict[str, str], language: str = "en") -> dict[str, str]:
+async def get_library_playlists(
+ headers: dict[str, str], language: str = "en", user: str | None = None
+) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_playlists function."""
def _get_library_playlists():
- ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
playlists = ytm.get_library_playlists(limit=9999)
# Sync properties with uniformal playlist object
for playlist in playlists:
return await asyncio.to_thread(_get_library_playlists)
-async def get_library_tracks(headers: dict[str, str], language: str = "en") -> dict[str, str]:
+async def get_library_tracks(
+ headers: dict[str, str], language: str = "en", user: str | None = None
+) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_tracks function."""
def _get_library_tracks():
- ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
return ytm.get_library_songs(limit=9999)
return await asyncio.to_thread(_get_library_tracks)
async def library_add_remove_artist(
- headers: dict[str, str], prov_artist_id: str, add: bool = True
+ headers: dict[str, str], prov_artist_id: str, add: bool = True, user: str | None = None
) -> bool:
"""Add or remove an artist to the user's library."""
def _library_add_remove_artist():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, user=user)
if add:
return "actions" in ytm.subscribe_artists(channelIds=[prov_artist_id])
if not add:
async def library_add_remove_album(
- headers: dict[str, str], prov_item_id: str, add: bool = True
+ headers: dict[str, str], prov_item_id: str, add: bool = True, user: str | None = None
) -> bool:
"""Add or remove an album or playlist to the user's library."""
album = await get_album(prov_album_id=prov_item_id)
def _library_add_remove_album():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, user=user)
playlist_id = album["audioPlaylistId"]
if add:
return ytm.rate_playlist(playlist_id, "LIKE")
async def library_add_remove_playlist(
- headers: dict[str, str], prov_item_id: str, add: bool = True
+ headers: dict[str, str], prov_item_id: str, add: bool = True, user: str | None = None
) -> bool:
"""Add or remove an album or playlist to the user's library."""
def _library_add_remove_playlist():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, user=user)
if add:
return "actions" in ytm.rate_playlist(prov_item_id, "LIKE")
if not add:
async def add_remove_playlist_tracks(
- headers: dict[str, str], prov_playlist_id: str, prov_track_ids: list[str], add: bool
+ headers: dict[str, str],
+ prov_playlist_id: str,
+ prov_track_ids: list[str],
+ add: bool,
+ user: str | None = None,
) -> bool:
"""Async wrapper around adding/removing tracks to a playlist."""
def _add_playlist_tracks():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, user=user)
if add:
return ytm.add_playlist_items(playlistId=prov_playlist_id, videoIds=prov_track_ids)
if not add:
async def get_song_radio_tracks(
- headers: dict[str, str], prov_item_id: str, limit=25
+ headers: dict[str, str], prov_item_id: str, limit=25, user: str | None = None
) -> dict[str, str]:
"""Async wrapper around the ytmusicapi radio function."""
def _get_song_radio_tracks():
- ytm = ytmusicapi.YTMusic(auth=headers)
+ ytm = ytmusicapi.YTMusic(auth=headers, user=user)
playlist_id = f"RDAMVM{prov_item_id}"
result = ytm.get_watch_playlist(
videoId=prov_item_id, playlistId=playlist_id, limit=limit, radio=True
return 0
-async def login_oauth(auth_helper: AuthenticationHelper):
- """Use device login to get a token."""
- http_session = auth_helper.mass.http_session
- code = await get_oauth_code(http_session)
- return await visit_oauth_auth_url(auth_helper, code)
-
-
-def _get_data_and_headers(data: dict):
- """Prepare headers for OAuth requests."""
- data.update({"client_id": OAUTH_CLIENT_ID})
- headers = {"User-Agent": OAUTH_USER_AGENT}
- return data, headers
-
-
-async def get_oauth_code(session: ClientSession):
- """Get the OAuth code from the server."""
- data, headers = _get_data_and_headers({"scope": OAUTH_SCOPE})
- async with session.post(OAUTH_CODE_URL, json=data, headers=headers) as code_response:
- return await code_response.json()
-
-
-async def visit_oauth_auth_url(auth_helper: AuthenticationHelper, code: dict[str, str]):
- """Redirect the user to the OAuth login page and wait for the token."""
- auth_url = f"{code['verification_url']}?user_code={code['user_code']}"
- auth_helper.send_url(auth_url=auth_url)
- device_code = code["device_code"]
- expiry = code["expires_in"]
- interval = code["interval"]
- while expiry > 0:
- token = await get_oauth_token_from_code(auth_helper.mass.http_session, device_code)
- if token.get("access_token"):
- return token
- await asyncio.sleep(interval)
- expiry -= interval
- msg = "You took too long to log in"
- raise TimeoutError(msg)
-
-
-async def get_oauth_token_from_code(session: ClientSession, device_code: str):
- """Check if the OAuth token is ready yet."""
- data, headers = _get_data_and_headers(
- data={
- "client_secret": OAUTH_CLIENT_SECRET,
- "grant_type": "http://oauth.net/grant_type/device/1.0",
- "code": device_code,
- }
- )
- async with session.post(
- OAUTH_TOKEN_URL,
- json=data,
- headers=headers,
- ) as token_response:
- return await token_response.json()
-
-
-async def refresh_oauth_token(session: ClientSession, refresh_token: str):
- """Refresh an expired OAuth token."""
- data, headers = _get_data_and_headers(
- {
- "client_secret": OAUTH_CLIENT_SECRET,
- "grant_type": "refresh_token",
- "refresh_token": refresh_token,
- }
- )
- async with session.post(
- OAUTH_TOKEN_URL,
- json=data,
- headers=headers,
- ) as response:
- return await response.json()
+def convert_to_netscape(raw_cookie_str: str, domain: str) -> str:
+ """Convert a raw cookie into Netscape format, so yt-dl can use it."""
+ domain = domain.replace("https://", "")
+ cookie = SimpleCookie()
+ cookie.load(rawdata=raw_cookie_str)
+ netscape_cookie = "# Netscape HTTP Cookie File\n"
+ for morsel in cookie.values():
+ netscape_cookie += f"{domain}\tTRUE\t/\tTRUE\t0\t{morsel.key}\t{morsel.value}\n"
+ return netscape_cookie