YT_DOMAIN = "https://www.youtube.com"
YTM_DOMAIN = "https://music.youtube.com"
YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
+# Youtube Music has the very unique id of "LM" for the likes playlist
+# when this playlist ID is detected, we make the id unique to the user
+# by adding the user's instance id to it
+YT_YOUR_LIKES_PLAYLIST_ID = "LM"
VARIOUS_ARTISTS_YTM_ID = "UCUTXlgdcKU5vfzFqHOWIvkA"
SUPPORTED_FEATURES = (
key=CONF_AUTH_TOKEN,
type=ConfigEntryType.SECURE_STRING,
label="Authentication token for Youtube Music",
- description="You need to link Music Assistant to your Youtube Music account.",
+ description="You need to link Music Assistant to your Youtube Music account. "
+ "Please ignore the code on the page the next page and click 'Next'.",
action=CONF_ACTION_AUTH,
action_label="Authenticate on Youtube Music",
value=values.get(CONF_AUTH_TOKEN) if values else None,
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
- if artist_obj := await get_artist(prov_artist_id=prov_artist_id):
+ if artist_obj := await get_artist(prov_artist_id=prov_artist_id, headers=self._headers):
return await self._parse_artist(artist_obj=artist_obj)
raise MediaNotFoundError(f"Item {prov_artist_id} not found")
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
await self._check_oauth_token()
- if playlist_obj := await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers
- ):
+ playlist_id = (
+ YT_YOUR_LIKES_PLAYLIST_ID
+ if prov_playlist_id == f"{YT_YOUR_LIKES_PLAYLIST_ID}-{self.instance_id}"
+ else prov_playlist_id
+ )
+ if playlist_obj := await get_playlist(prov_playlist_id=playlist_id, headers=self._headers):
return await self._parse_playlist(playlist_obj)
- raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
+ raise MediaNotFoundError(f"Item {playlist_id} not found")
async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
await self._check_oauth_token()
- playlist_obj = await get_playlist(prov_playlist_id=prov_playlist_id, headers=self._headers)
+ playlist_id = (
+ YT_YOUR_LIKES_PLAYLIST_ID
+ if prov_playlist_id == f"{YT_YOUR_LIKES_PLAYLIST_ID}-{self.instance_id}"
+ else prov_playlist_id
+ )
+ playlist_obj = await get_playlist(prov_playlist_id=playlist_id, headers=self._headers)
if "tracks" not in playlist_obj:
return
for index, track in enumerate(playlist_obj["tracks"]):
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
- artist_obj = await get_artist(prov_artist_id=prov_artist_id)
+ artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if "albums" in artist_obj and "results" in artist_obj["albums"]:
albums = []
for album_obj in artist_obj["albums"]["results"]:
async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
"""Get a list of 25 most popular tracks for the given artist."""
- artist_obj = await get_artist(prov_artist_id=prov_artist_id)
+ artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
prov_playlist_id = artist_obj["songs"]["browseId"]
playlist_tracks = [
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
await self._check_oauth_token()
+ playlist_id = (
+ YT_YOUR_LIKES_PLAYLIST_ID
+ if prov_playlist_id == f"{YT_YOUR_LIKES_PLAYLIST_ID}-{self.instance_id}"
+ else prov_playlist_id
+ )
return await add_remove_playlist_tracks(
headers=self._headers,
- prov_playlist_id=prov_playlist_id,
+ prov_playlist_id=playlist_id,
prov_track_ids=prov_track_ids,
add=True,
)
) -> None:
"""Remove track(s) from playlist."""
await self._check_oauth_token()
- playlist_obj = await get_playlist(prov_playlist_id=prov_playlist_id, headers=self._headers)
+ playlist_id = (
+ YT_YOUR_LIKES_PLAYLIST_ID
+ if prov_playlist_id == f"{YT_YOUR_LIKES_PLAYLIST_ID}-{self.instance_id}"
+ else prov_playlist_id
+ )
+ playlist_obj = await get_playlist(prov_playlist_id=playlist_id, headers=self._headers)
if "tracks" not in playlist_obj:
return None
tracks_to_delete = []
async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
"""Parse a YT Playlist response to a Playlist object."""
- playlist = Playlist(
- item_id=playlist_obj["id"], provider=self.domain, name=playlist_obj["title"]
+ playlist_id = (
+ f"{YT_YOUR_LIKES_PLAYLIST_ID}-{self.instance_id}"
+ if playlist_obj["id"] == YT_YOUR_LIKES_PLAYLIST_ID
+ else playlist_obj["id"]
)
+ playlist = Playlist(item_id=playlist_id, provider=self.domain, name=playlist_obj["title"])
if "description" in playlist_obj:
playlist.metadata.description = playlist_obj["description"]
if "thumbnails" in playlist_obj and playlist_obj["thumbnails"]:
playlist.is_editable = is_editable
playlist.add_provider_mapping(
ProviderMapping(
- item_id=playlist_obj["id"],
+ item_id=playlist_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
)
async def _is_valid_deciphered_url(self, url: str) -> bool:
"""Verify whether the URL has been deciphered using a valid cipher."""
async with self.mass.http_session.head(url) as response:
- return response.status == 200
+ return response.status != 403
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
return ItemMapping(