from urllib.parse import unquote
import pytube
-import ytmusicapi
from music_assistant.common.helpers.uri import create_uri
from music_assistant.common.helpers.util import create_sort_name
StreamDetails,
Track,
)
-from music_assistant.constants import CONF_USERNAME
+from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.models.music_provider import MusicProvider
from .helpers import (
library_add_remove_album,
library_add_remove_artist,
library_add_remove_playlist,
+ login_oauth,
+ refresh_oauth_token,
search,
)
CONF_COOKIE = "cookie"
+CONF_ACTION_AUTH = "auth"
+CONF_AUTH_TOKEN = "auth_token"
+CONF_REFRESH_TOKEN = "refresh_token"
+CONF_TOKEN_TYPE = "token_type"
+CONF_EXPIRY_TIME = "expiry_time"
YT_DOMAIN = "https://www.youtube.com"
YTM_DOMAIN = "https://music.youtube.com"
async def get_config_entries(
mass: MusicAssistant,
- instance_id: str | None = None,
+ instance_id: str | None = None, # noqa: ARG001
action: str | None = None,
values: dict[str, ConfigValueType] | None = None,
) -> tuple[ConfigEntry, ...]:
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- # ruff: noqa: ARG001
+ if action == CONF_ACTION_AUTH:
+ async with AuthenticationHelper(mass, values["session_id"]) as auth_helper:
+ token = await login_oauth(auth_helper)
+ values[CONF_AUTH_TOKEN] = token["access_token"]
+ values[CONF_REFRESH_TOKEN] = token["refresh_token"]
+ values[CONF_EXPIRY_TIME] = token["expires_in"]
+ values[CONF_TOKEN_TYPE] = token["token_type"]
+ # return the collected config entries
return (
ConfigEntry(
- key=CONF_USERNAME, type=ConfigEntryType.STRING, label="Username", required=True
+ key=CONF_AUTH_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Authentication token for Youtube Music",
+ description="You need to link Music Assistant to your Youtube Music account.",
+ action=CONF_ACTION_AUTH,
+ action_label="Authenticate on Youtube Music",
+ value=values.get(CONF_AUTH_TOKEN) if values else None,
),
ConfigEntry(
- key=CONF_COOKIE,
+ key=CONF_REFRESH_TOKEN,
type=ConfigEntryType.SECURE_STRING,
- label="Login Cookie",
- required=True,
- description="The Login cookie you grabbed from an existing session, "
- "see the documentation.",
+ label=CONF_REFRESH_TOKEN,
+ hidden=True,
+ value=values.get(CONF_REFRESH_TOKEN) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_EXPIRY_TIME,
+ type=ConfigEntryType.INTEGER,
+ label="Expiry time of auth token for Youtube Music",
+ hidden=True,
+ value=values.get(CONF_EXPIRY_TIME) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_TOKEN_TYPE,
+ type=ConfigEntryType.STRING,
+ label="The token type required to create headers",
+ hidden=True,
+ value=values.get(CONF_TOKEN_TYPE) if values else None,
),
)
async def handle_setup(self) -> None:
"""Set up the YTMusic provider."""
- if not self.config.get_value(CONF_USERNAME) or not self.config.get_value(CONF_COOKIE):
+ if not self.config.get_value(CONF_AUTH_TOKEN):
raise LoginFailed("Invalid login credentials")
- await self._initialize_headers(cookie=self.config.get_value(CONF_COOKIE))
+ await self._initialize_headers()
await self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
self._signature_timestamp = await self._get_signature_timestamp()
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Youtube Music."""
+ await self._check_oauth_token()
artists_obj = await get_library_artists(
- headers=self._headers, username=self.config.get_value(CONF_USERNAME)
+ headers=self._headers,
)
for artist in artists_obj:
yield await self._parse_artist(artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Youtube Music."""
+ await self._check_oauth_token()
albums_obj = await get_library_albums(
- headers=self._headers, username=self.config.get_value(CONF_USERNAME)
+ headers=self._headers,
)
for album in albums_obj:
yield await self._parse_album(album, album["browseId"])
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
+ await self._check_oauth_token()
playlists_obj = await get_library_playlists(
- headers=self._headers, username=self.config.get_value(CONF_USERNAME)
+ headers=self._headers,
)
for playlist in playlists_obj:
yield await self._parse_playlist(playlist)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Youtube Music."""
+ await self._check_oauth_token()
tracks_obj = await get_library_tracks(
- headers=self._headers, username=self.config.get_value(CONF_USERNAME)
+ headers=self._headers,
)
for track in tracks_obj:
# Library tracks sometimes do not have a valid artist id
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
+ await self._check_oauth_token()
if playlist_obj := await get_playlist(
- prov_playlist_id=prov_playlist_id,
- headers=self._headers,
- username=self.config.get_value(CONF_USERNAME),
+ prov_playlist_id=prov_playlist_id, headers=self._headers
):
return await self._parse_playlist(playlist_obj)
raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
- playlist_obj = await get_playlist(
- prov_playlist_id=prov_playlist_id,
- headers=self._headers,
- username=self.config.get_value(CONF_USERNAME),
- )
+ await self._check_oauth_token()
+ playlist_obj = await get_playlist(prov_playlist_id=prov_playlist_id, headers=self._headers)
if "tracks" not in playlist_obj:
return
for index, track in enumerate(playlist_obj["tracks"]):
async def library_add(self, prov_item_id, media_type: MediaType) -> None:
"""Add an item to the library."""
+ await self._check_oauth_token()
result = False
if media_type == MediaType.ARTIST:
result = await library_add_remove_artist(
- headers=self._headers,
- prov_artist_id=prov_item_id,
- add=True,
- username=self.config.get_value(CONF_USERNAME),
+ headers=self._headers, prov_artist_id=prov_item_id, add=True
)
elif media_type == MediaType.ALBUM:
result = await library_add_remove_album(
- headers=self._headers,
- prov_item_id=prov_item_id,
- add=True,
- username=self.config.get_value(CONF_USERNAME),
+ headers=self._headers, prov_item_id=prov_item_id, add=True
)
elif media_type == MediaType.PLAYLIST:
result = await library_add_remove_playlist(
- headers=self._headers,
- prov_item_id=prov_item_id,
- add=True,
- username=self.config.get_value(CONF_USERNAME),
+ headers=self._headers, prov_item_id=prov_item_id, add=True
)
elif media_type == MediaType.TRACK:
raise NotImplementedError
async def library_remove(self, prov_item_id, media_type: MediaType):
"""Remove an item from the library."""
+ await self._check_oauth_token()
result = False
if media_type == MediaType.ARTIST:
result = await library_add_remove_artist(
- headers=self._headers,
- prov_artist_id=prov_item_id,
- add=False,
- username=self.config.get_value(CONF_USERNAME),
+ headers=self._headers, prov_artist_id=prov_item_id, add=False
)
elif media_type == MediaType.ALBUM:
result = await library_add_remove_album(
- headers=self._headers,
- prov_item_id=prov_item_id,
- add=False,
- username=self.config.get_value(CONF_USERNAME),
+ headers=self._headers, prov_item_id=prov_item_id, add=False
)
elif media_type == MediaType.PLAYLIST:
result = await library_add_remove_playlist(
- headers=self._headers,
- prov_item_id=prov_item_id,
- add=False,
- username=self.config.get_value(CONF_USERNAME),
+ headers=self._headers, prov_item_id=prov_item_id, add=False
)
elif media_type == MediaType.TRACK:
raise NotImplementedError
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
+ await self._check_oauth_token()
return await add_remove_playlist_tracks(
headers=self._headers,
prov_playlist_id=prov_playlist_id,
prov_track_ids=prov_track_ids,
add=True,
- username=self.config.get_value(CONF_USERNAME),
)
async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
) -> None:
"""Remove track(s) from playlist."""
- playlist_obj = await get_playlist(
- prov_playlist_id=prov_playlist_id,
- headers=self._headers,
- username=self.config.get_value(CONF_USERNAME),
- )
+ await self._check_oauth_token()
+ playlist_obj = await get_playlist(prov_playlist_id=prov_playlist_id, headers=self._headers)
if "tracks" not in playlist_obj:
return None
tracks_to_delete = []
prov_playlist_id=prov_playlist_id,
prov_track_ids=tracks_to_delete,
add=False,
- username=self.config.get_value(CONF_USERNAME),
)
async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
"""Retrieve a dynamic list of tracks based on the provided item."""
+ await self._check_oauth_token()
result = []
result = await get_song_radio_tracks(
headers=self._headers,
- username=self.config.get_value(CONF_USERNAME),
prov_item_id=prov_track_id,
limit=limit,
)
return stream_details
async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs): # noqa: ARG002
+ """Post data to the given endpoint."""
+ await self._check_oauth_token()
url = f"{YTM_BASE_URL}{endpoint}"
data.update(self._context)
async with self.mass.http_session.post(
return await response.json()
async def _get_data(self, url: str, params: dict = None):
+ """Get data from the given URL."""
+ await self._check_oauth_token()
async with self.mass.http_session.get(
url, headers=self._headers, params=params, cookies=self._cookies
) as response:
return await response.text()
- async def _initialize_headers(self, cookie: str) -> dict[str, str]:
+ async def _check_oauth_token(self) -> None:
+ """Verify the OAuth token is valid and refresh if needed."""
+ if self.config.get_value(CONF_EXPIRY_TIME) < time():
+ token = await refresh_oauth_token(
+ self.mass.http_session, self.config.get_value(CONF_REFRESH_TOKEN)
+ )
+ self.config.update({CONF_AUTH_TOKEN: token["access_token"]})
+ self.config.update({CONF_EXPIRY_TIME: time() + token["expires_in"]})
+ self.config.update({CONF_TOKEN_TYPE: token["token_type"]})
+ await self._initialize_headers()
+
+ async def _initialize_headers(self) -> dict[str, str]:
"""Return headers to include in the requests."""
+ auth = f"{self.config.get_value(CONF_TOKEN_TYPE)} {self.config.get_value(CONF_AUTH_TOKEN)}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", # noqa: E501
"Accept": "*/*",
"Content-Type": "application/json",
"X-Goog-AuthUser": "0",
"x-origin": "https://music.youtube.com",
- "Cookie": cookie,
+ "X-Goog-Request-Time": str(int(time())),
+ "Authorization": auth,
}
- sapisid = ytmusicapi.helpers.sapisid_from_cookie(cookie)
- origin = headers.get("origin", headers.get("x-origin"))
- headers["Authorization"] = ytmusicapi.helpers.get_authorization(sapisid + " " + origin)
self._headers = headers
async def _initialize_context(self) -> dict[str, str]:
from time import time
import ytmusicapi
+from aiohttp import ClientSession
+from ytmusicapi.constants import (
+ OAUTH_CLIENT_ID,
+ OAUTH_CLIENT_SECRET,
+ OAUTH_CODE_URL,
+ OAUTH_SCOPE,
+ OAUTH_TOKEN_URL,
+ OAUTH_USER_AGENT,
+)
+
+from music_assistant.server.helpers.auth import AuthenticationHelper
async def get_artist(prov_artist_id: str) -> dict[str, str]:
return await asyncio.to_thread(_get_album)
-async def get_playlist(
- prov_playlist_id: str, headers: dict[str, str], username: str
-) -> dict[str, str]:
+async def get_playlist(prov_playlist_id: str, headers: dict[str, str]) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_playlist():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=None)
playlist["checksum"] = get_playlist_checksum(playlist)
return playlist
return await asyncio.to_thread(_get_song)
-async def get_library_artists(headers: dict[str, str], username: str) -> dict[str, str]:
+async def get_library_artists(headers: dict[str, str]) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_artists function."""
def _get_library_artists():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
artists = ytm.get_library_subscriptions(limit=9999)
# Sync properties with uniformal artist object
for artist in artists:
return await asyncio.to_thread(_get_library_artists)
-async def get_library_albums(headers: dict[str, str], username: str) -> dict[str, str]:
+async def get_library_albums(headers: dict[str, str]) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_albums function."""
def _get_library_albums():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
return ytm.get_library_albums(limit=9999)
return await asyncio.to_thread(_get_library_albums)
-async def get_library_playlists(headers: dict[str, str], username: str) -> dict[str, str]:
+async def get_library_playlists(headers: dict[str, str]) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_playlists function."""
def _get_library_playlists():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
playlists = ytm.get_library_playlists(limit=9999)
# Sync properties with uniformal playlist object
for playlist in playlists:
return await asyncio.to_thread(_get_library_playlists)
-async def get_library_tracks(headers: dict[str, str], username: str) -> dict[str, str]:
+async def get_library_tracks(headers: dict[str, str]) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_library_tracks function."""
def _get_library_tracks():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
tracks = ytm.get_library_songs(limit=9999)
return tracks
async def library_add_remove_artist(
- headers: dict[str, str], prov_artist_id: str, add: bool = True, username: str = None
+ headers: dict[str, str], prov_artist_id: str, add: bool = True
) -> bool:
"""Add or remove an artist to the user's library."""
def _library_add_remove_artist():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
if add:
return "actions" in ytm.subscribe_artists(channelIds=[prov_artist_id])
if not add:
async def library_add_remove_album(
- headers: dict[str, str], prov_item_id: str, add: bool = True, username: str = None
+ headers: dict[str, str], prov_item_id: str, add: bool = True
) -> bool:
"""Add or remove an album or playlist to the user's library."""
album = await get_album(prov_album_id=prov_item_id)
def _library_add_remove_album():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
playlist_id = album["audioPlaylistId"]
if add:
return ytm.rate_playlist(playlist_id, "LIKE")
async def library_add_remove_playlist(
- headers: dict[str, str], prov_item_id: str, add: bool = True, username: str = None
+ headers: dict[str, str], prov_item_id: str, add: bool = True
) -> bool:
"""Add or remove an album or playlist to the user's library."""
def _library_add_remove_playlist():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
if add:
return "actions" in ytm.rate_playlist(prov_item_id, "LIKE")
if not add:
async def add_remove_playlist_tracks(
- headers: dict[str, str],
- prov_playlist_id: str,
- prov_track_ids: list[str],
- add: bool,
- username: str = None,
+ headers: dict[str, str], prov_playlist_id: str, prov_track_ids: list[str], add: bool
) -> bool:
"""Async wrapper around adding/removing tracks to a playlist."""
def _add_playlist_tracks():
- user = username if is_brand_account(username) else None
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
if add:
return ytm.add_playlist_items(playlistId=prov_playlist_id, videoIds=prov_track_ids)
if not add:
async def get_song_radio_tracks(
- headers: dict[str, str], username: str, prov_item_id: str, limit=25
+ headers: dict[str, str], prov_item_id: str, limit=25
) -> dict[str, str]:
"""Async wrapper around the ytmusicapi radio function."""
- user = username if is_brand_account(username) else None
def _get_song_radio_tracks():
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
playlist_id = f"RDAMVM{prov_item_id}"
result = ytm.get_watch_playlist(videoId=prov_item_id, playlistId=playlist_id, limit=limit)
# Replace inconsistensies for easier parsing
if len(parts) == 2:
return int(parts[0]) * 60 + int(parts[1])
return 0
+
+
+async def login_oauth(auth_helper: AuthenticationHelper):
+ """Use device login to get a token."""
+ http_session = auth_helper.mass.http_session
+ code = await get_oauth_code(http_session)
+ token = await visit_oauth_auth_url(auth_helper, code)
+ return token
+
+
+def _get_data_and_headers(data: dict):
+ """Prepare headers for OAuth requests."""
+ data.update({"client_id": OAUTH_CLIENT_ID})
+ headers = {"User-Agent": OAUTH_USER_AGENT}
+ return data, headers
+
+
+async def get_oauth_code(session: ClientSession):
+ """Get the OAuth code from the server."""
+ data, headers = _get_data_and_headers({"scope": OAUTH_SCOPE})
+ async with session.post(OAUTH_CODE_URL, json=data, headers=headers) as code_response:
+ return await code_response.json()
+
+
+async def visit_oauth_auth_url(auth_helper: AuthenticationHelper, code: dict[str, str]):
+ """Redirect the user to the OAuth login page and wait for the token."""
+ auth_url = f"{code['verification_url']}?user_code={code['user_code']}"
+ auth_helper.send_url(auth_url=auth_url)
+ device_code = code["device_code"]
+ expiry = code["expires_in"]
+ interval = code["interval"]
+ while expiry > 0:
+ token = await get_oauth_token_from_code(auth_helper.mass.http_session, device_code)
+ if token.get("access_token"):
+ return token
+ await asyncio.sleep(interval)
+ expiry -= interval
+ raise TimeoutError("You took too long to log in")
+
+
+async def get_oauth_token_from_code(session: ClientSession, device_code: str):
+ """Check if the OAuth token is ready yet."""
+ data, headers = _get_data_and_headers(
+ data={
+ "client_secret": OAUTH_CLIENT_SECRET,
+ "grant_type": "http://oauth.net/grant_type/device/1.0",
+ "code": device_code,
+ }
+ )
+ async with session.post(
+ OAUTH_TOKEN_URL,
+ json=data,
+ headers=headers,
+ ) as token_response:
+ return await token_response.json()
+
+
+async def refresh_oauth_token(session: ClientSession, refresh_token: str):
+ """Refresh an expired OAuth token."""
+ data, headers = _get_data_and_headers(
+ {
+ "client_secret": OAUTH_CLIENT_SECRET,
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token,
+ }
+ )
+ async with session.post(
+ OAUTH_TOKEN_URL,
+ json=data,
+ headers=headers,
+ ) as response:
+ return await response.json()