# So we need to add a delimiter to make them unique
YT_PLAYLIST_ID_DELIMITER = "🎵"
PODCAST_EPISODE_SPLITTER = "|"
+YT_LIKED_SONGS_PLAYLIST_ID = "LM"
YT_PERSONAL_PLAYLISTS = (
- "LM", # Liked songs
- "SE" # Episodes for Later
+ YT_LIKED_SONGS_PLAYLIST_ID, # Liked songs
+ "SE", # Episodes for Later
"RDTMAK5uy_kset8DisdE7LSD4TNjEVvrKRTmG7a56sY", # SuperMix
"RDTMAK5uy_nGQKSMIkpr4o9VI_2i56pkGliD6FQRo50", # My Mix 1
"RDTMAK5uy_lz2owBgwWf1mjzyn_NbxzMViQzIg8IAIg", # My Mix 2
"RDTMAK5uy_nilrsVWxrKskY0ZUpVZ3zpB0u4LwWTVJ4", # Replay Mix
"RDTMAK5uy_mZtXeU08kxXJOUhL0ETdAuZTh1z7aAFAo", # Archive Mix
)
+DYNAMIC_PLAYLIST_TRACK_LIMIT = 300
YTM_PREMIUM_CHECK_TRACK_ID = "dQw4w9WgXcQ"
PACKAGES_TO_INSTALL = ("yt-dlp", "bgutil-ytdlp-pot-provider")
@use_cache(3600 * 24 * 7) # Cache for 7 days
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
+ # Grab the full playlist by default
+ limit = None
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
+ if (
+ prov_playlist_id in YT_PERSONAL_PLAYLISTS
+ and prov_playlist_id != YT_LIKED_SONGS_PLAYLIST_ID
+ ):
+ # Personal playlists are dynamic and can result in endless tracks
+ # limit to avoid memory issues
+ limit = DYNAMIC_PLAYLIST_TRACK_LIMIT
if playlist_obj := await get_playlist(
prov_playlist_id=prov_playlist_id,
headers=self._headers,
language=self.language,
user=self._yt_user,
+ limit=limit,
):
return self._parse_playlist(playlist_obj)
msg = f"Item {prov_playlist_id} not found"
if page > 0:
# paging not supported, we always return the whole list at once
return []
+ # Grab the full playlist by default
+ limit = None
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
+ if (
+ prov_playlist_id in YT_PERSONAL_PLAYLISTS
+ and prov_playlist_id != YT_LIKED_SONGS_PLAYLIST_ID
+ ):
+ # Personal playlists are dynamic and can result in endless tracks
+ # limit to avoid memory issues
+ limit = DYNAMIC_PLAYLIST_TRACK_LIMIT
# Add a try to prevent MA from stopping syncing whenever we fail a single playlist
try:
playlist_obj = await get_playlist(
- prov_playlist_id=prov_playlist_id, headers=self._headers, user=self._yt_user
+ prov_playlist_id=prov_playlist_id,
+ headers=self._headers,
+ user=self._yt_user,
+ limit=limit,
)
except KeyError as ke:
self.logger.warning("Could not load playlist: %s: %s", prov_playlist_id, ke)
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
) -> None:
"""Remove track(s) from playlist."""
+ # Grab the full playlist by default
+ limit = None
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
- playlist_obj = await get_playlist(prov_playlist_id=prov_playlist_id, headers=self._headers)
+ if (
+ prov_playlist_id in YT_PERSONAL_PLAYLISTS
+ and prov_playlist_id != YT_LIKED_SONGS_PLAYLIST_ID
+ ):
+ # Personal playlists are dynamic and can result in endless tracks
+ # limit to avoid memory issues
+ limit = DYNAMIC_PLAYLIST_TRACK_LIMIT
+ playlist_obj = await get_playlist(
+ prov_playlist_id=prov_playlist_id, headers=self._headers, limit=limit
+ )
if "tracks" not in playlist_obj:
return None
tracks_to_delete = []
async def get_playlist(
- prov_playlist_id: str, headers: dict[str, str], language: str = "en", user: str | None = None
+ prov_playlist_id: str,
+ headers: dict[str, str],
+ language: str = "en",
+ user: str | None = None,
+ limit=None,
) -> dict[str, str]:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_playlist():
ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
- playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=None)
+ playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=limit)
playlist["checksum"] = get_playlist_checksum(playlist)
# Fix missing playlist id in some edge cases
playlist["id"] = prov_playlist_id if not playlist.get("id") else playlist["id"]