async def get_playlist(
- prov_playlist_id: str, headers: Dict[str, str]
+ prov_playlist_id: str, headers: Dict[str, str], username: str
) -> Dict[str, str]:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_playlist():
- ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
+ user = username if is_brand_account(username) else None
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
playlist = ytm.get_playlist(playlistId=prov_playlist_id)
playlist["checksum"] = get_playlist_checksum(playlist)
return playlist
def _get_library_artists():
user = username if is_brand_account(username) else None
ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
- artists = ytm.get_library_artists(limit=9999)
+ artists = ytm.get_library_subscriptions(limit=9999)
# Sync properties with uniformal artist object
for artist in artists:
artist["id"] = artist["browseId"]
return await loop.run_in_executor(None, _get_library_tracks)
+async def library_add_remove_artist(
+ headers: Dict[str, str], prov_artist_id: str, add: bool = True, username: str = None
+) -> bool:
+ """Add or remove an artist to the user's library."""
+
+ def _library_add_remove_artist():
+ user = username if is_brand_account(username) else None
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ if add:
+ return "actions" in ytm.subscribe_artists(channelIds=[prov_artist_id])
+ if not add:
+ return "actions" in ytm.unsubscribe_artists(channelIds=[prov_artist_id])
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, _library_add_remove_artist)
+
+
+async def library_add_remove_album(
+ headers: Dict[str, str], prov_item_id: str, add: bool = True, username: str = None
+) -> bool:
+ """Add or remove an album or playlist to the user's library."""
+ album = await get_album(prov_album_id=prov_item_id)
+
+ def _library_add_remove_album():
+ user = username if is_brand_account(username) else None
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ playlist_id = album["audioPlaylistId"]
+ if add:
+ return ytm.rate_playlist(playlist_id, "LIKE")
+ if not add:
+ return ytm.rate_playlist(playlist_id, "INDIFFERENT")
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, _library_add_remove_album)
+
+
+async def library_add_remove_playlist(
+ headers: Dict[str, str], prov_item_id: str, add: bool = True, username: str = None
+) -> bool:
+ """Add or remove an album or playlist to the user's library."""
+
+ def _library_add_remove_playlist():
+ user = username if is_brand_account(username) else None
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ if add:
+ return "actions" in ytm.rate_playlist(prov_item_id, "LIKE")
+ if not add:
+ return "actions" in ytm.rate_playlist(prov_item_id, "INDIFFERENT")
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, _library_add_remove_playlist)
+
+
+async def add_remove_playlist_tracks(
+ headers: Dict[str, str],
+ prov_playlist_id: str,
+ prov_track_ids: List[str],
+ add: bool,
+ username: str = None,
+) -> bool:
+ """Async wrapper around adding/removing tracks to a playlist."""
+
+ def _add_playlist_tracks():
+ user = username if is_brand_account(username) else None
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers), user=user)
+ if add:
+ return ytm.add_playlist_items(
+ playlistId=prov_playlist_id, videoIds=prov_track_ids
+ )
+ if not add:
+ return ytm.remove_playlist_items(
+ playlistId=prov_playlist_id, videos=prov_track_ids
+ )
+
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, _add_playlist_tracks)
+
+
async def search(query: str, ytm_filter: str = None, limit: int = 20) -> List[Dict]:
"""Async wrapper around the ytmusicapi search function."""
)
from music_assistant.models.music_provider import MusicProvider
from music_assistant.music_providers.ytmusic.helpers import (
+ add_remove_playlist_tracks,
get_album,
get_artist,
get_library_albums,
get_library_tracks,
get_playlist,
get_track,
+ library_add_remove_album,
+ library_add_remove_artist,
+ library_add_remove_playlist,
search,
)
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
playlist_obj = await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers
+ prov_playlist_id=prov_playlist_id,
+ headers=self._headers,
+ username=self.config.username,
)
return await self._parse_playlist(playlist_obj)
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
playlist_obj = await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers
+ prov_playlist_id=prov_playlist_id,
+ headers=self._headers,
+ username=self.config.username,
)
if "tracks" in playlist_obj:
tracks = []
]
return []
+ async def library_add(self, prov_item_id, media_type: MediaType) -> None:
+ """Add an item to the library."""
+ result = False
+ if media_type == MediaType.ARTIST:
+ result = await library_add_remove_artist(
+ headers=self._headers,
+ prov_artist_id=prov_item_id,
+ add=True,
+ username=self.config.username,
+ )
+ elif media_type == MediaType.ALBUM:
+ result = await library_add_remove_album(
+ headers=self._headers,
+ prov_item_id=prov_item_id,
+ add=True,
+ username=self.config.username,
+ )
+ elif media_type == MediaType.PLAYLIST:
+ result = await library_add_remove_playlist(
+ headers=self._headers,
+ prov_item_id=prov_item_id,
+ add=True,
+ username=self.config.username,
+ )
+ elif media_type == MediaType.TRACK:
+ raise NotImplementedError
+ return result
+
+ async def library_remove(self, prov_item_id, media_type: MediaType):
+ """Remove an item from the library."""
+ result = False
+ if media_type == MediaType.ARTIST:
+ result = await library_add_remove_artist(
+ headers=self._headers,
+ prov_artist_id=prov_item_id,
+ add=False,
+ username=self.config.username,
+ )
+ elif media_type == MediaType.ALBUM:
+ result = await library_add_remove_album(
+ headers=self._headers,
+ prov_item_id=prov_item_id,
+ add=False,
+ username=self.config.username,
+ )
+ elif media_type == MediaType.PLAYLIST:
+ result = await library_add_remove_playlist(
+ headers=self._headers,
+ prov_item_id=prov_item_id,
+ add=False,
+ username=self.config.username,
+ )
+ elif media_type == MediaType.TRACK:
+ raise NotImplementedError
+ return result
+
+ async def add_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
+ """Add track(s) to playlist."""
+ return await add_remove_playlist_tracks(
+ headers=self._headers,
+ prov_playlist_id=prov_playlist_id,
+ prov_track_ids=prov_track_ids,
+ add=True,
+ username=self.config.username,
+ )
+
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
+ """Remove track(s) from playlist."""
+ # YT needs both the videoId and de setVideoId in order to remove
+ # the track. Thus, we need to obtain the playlist details and
+ # grab the info from there.
+ playlist_obj = await get_playlist(
+ prov_playlist_id=prov_playlist_id,
+ headers=self._headers,
+ username=self.config.username,
+ )
+ if playlist_obj.get("tracks"):
+ tracks_to_delete = [
+ {"videoId": track["videoId"], "setVideoId": track["setVideoId"]}
+ for track in playlist_obj.get("tracks")
+ if track.get("videoId") in prov_track_ids
+ ]
+ return await add_remove_playlist_tracks(
+ headers=self._headers,
+ prov_playlist_id=prov_playlist_id,
+ prov_track_ids=tracks_to_delete,
+ add=False,
+ username=self.config.username,
+ )
+
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
data = {
playlist.metadata.images = await self._parse_thumbnails(
playlist_obj["thumbnails"]
)
+ is_editable = False
+ if playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE":
+ is_editable = True
+ playlist.is_editable = is_editable
playlist.add_provider_id(
MediaItemProviderId(
item_id=playlist_obj["id"], prov_type=self.type, prov_id=self.id