from urllib.parse import unquote
import yt_dlp
+from duration_parser import parse as parse_str_duration
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
from music_assistant_models.enums import (
AlbumType,
MediaItemType,
MediaType,
Playlist,
+ Podcast,
+ PodcastEpisode,
ProviderMapping,
SearchResults,
Track,
get_library_albums,
get_library_artists,
get_library_playlists,
+ get_library_podcasts,
get_library_tracks,
get_playlist,
+ get_podcast,
+ get_podcast_episode,
get_song_radio_tracks,
get_track,
is_brand_account,
# Playlist ID's are not unique across instances for lists like 'Liked videos', 'SuperMix' etc.
# So we need to add a delimiter to make them unique
YT_PLAYLIST_ID_DELIMITER = "🎵"
+PODCAST_EPISODE_SPLITTER = "|"
YT_PERSONAL_PLAYLISTS = (
"LM", # Liked songs
"SE" # Episodes for Later
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
+ ProviderFeature.LIBRARY_PODCASTS,
}
track = await self.get_track(track["videoId"])
yield track
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+ """Retrieve the library podcasts from Youtube Music."""
+ podcasts_obj = await get_library_podcasts(
+ headers=self._headers, language=self.language, user=self._yt_user
+ )
+ for podcast in podcasts_obj:
+ yield self._parse_podcast(podcast)
+
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
if album_obj := await get_album(prov_album_id=prov_album_id, language=self.language):
return playlist_tracks[:25]
return []
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get the full details of a Podcast."""
+ podcast_obj = await get_podcast(prov_podcast_id, headers=self._headers)
+ return self._parse_podcast(podcast_obj)
+
+ async def get_podcast_episodes(self, prov_podcast_id: str) -> list[PodcastEpisode]:
+ """Get all episodes from a podcast."""
+ podcast_obj = await get_podcast(prov_podcast_id, headers=self._headers)
+ podcast_obj["podcastId"] = prov_podcast_id
+ podcast = self._parse_podcast(podcast_obj)
+ episodes = []
+ for index, episode_obj in enumerate(podcast_obj.get("episodes", []), start=1):
+ episode = self._parse_podcast_episode(episode_obj, podcast)
+ ep_index = episode_obj.get("index") or index
+ episode.position = ep_index
+ episodes.append(episode)
+ return episodes
+
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get a single Podcast Episode."""
+ podcast_id, episode_id = prov_episode_id.split(PODCAST_EPISODE_SPLITTER)
+ podcast = await self.get_podcast(podcast_id)
+ episode_obj = await get_podcast_episode(episode_id, headers=self._headers)
+ episode = self._parse_podcast_episode(episode_obj, podcast)
+ episode.position = 0
+ return episode
+
async def library_add(self, item: MediaItemType) -> bool:
"""Add an item to the library."""
result = False
self, item_id: str, media_type: MediaType = MediaType.TRACK
) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
+ if media_type == MediaType.PODCAST_EPISODE:
+ item_id = item_id.split(PODCAST_EPISODE_SPLITTER)[1]
stream_format = await self._get_stream_format(item_id=item_id)
self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)
stream_details = StreamDetails(
"""Parse a YT Playlist response to a Playlist object."""
playlist_id = playlist_obj["id"]
playlist_name = playlist_obj["title"]
- is_editable = playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE"
+ is_editable = playlist_obj.get("privacy", "") == "PRIVATE"
# Playlist ID's are not unique across instances for lists like 'Likes', 'Supermix', etc.
# So suffix with the instance id to make them unique
if playlist_id in YT_PERSONAL_PLAYLISTS:
track.duration = int(track_obj["duration_seconds"])
return track
+ def _parse_podcast(self, podcast_obj: dict) -> Podcast:
+ """Parse a YTM Podcast into a MA Podcast."""
+ podcast = Podcast(
+ item_id=podcast_obj["podcastId"],
+ name=podcast_obj["title"],
+ provider=self.lookup_key,
+ provider_mappings={
+ ProviderMapping(
+ item_id=podcast_obj["podcastId"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ if description := podcast_obj.get("description"):
+ podcast.metadata.description = description
+ if author := podcast_obj.get("author"):
+ podcast.publisher = author["name"]
+ if thumbnails := podcast_obj.get("thumbnails"):
+ podcast.metadata.images = self._parse_thumbnails(thumbnails)
+ return podcast
+
+ def _parse_podcast_episode(self, episode_obj: dict, podcast: Podcast | None) -> PodcastEpisode:
+ """Parse a raw episode into a PodcastEpisode."""
+ episode_id = episode_obj.get("videoId")
+ if not episode_id:
+ msg = "Podcast episode is missing videoId"
+ raise InvalidDataError(msg)
+ item_id = f"{podcast.item_id}{PODCAST_EPISODE_SPLITTER}{episode_id}"
+ episode = PodcastEpisode(
+ item_id=item_id,
+ provider=self.lookup_key,
+ name=episode_obj.get("title"),
+ podcast=podcast,
+ provider_mappings={
+ ProviderMapping(
+ item_id=item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.M4A,
+ ),
+ url=f"{YTM_DOMAIN}/watch?v={episode_id}",
+ )
+ },
+ )
+ if duration := episode_obj.get("duration"):
+ duration_sec = parse_str_duration(duration)
+ episode.duration = duration_sec
+ if description := episode_obj.get("description"):
+ episode.metadata.description = description
+ if thumbnails := episode_obj.get("thumbnails"):
+ episode.metadata.images = self._parse_thumbnails(thumbnails)
+ if release_date := episode_obj.get("date"):
+ episode.metadata.release_date = release_date
+ return episode
+
async def _get_stream_format(self, item_id: str) -> dict[str, Any]:
"""Figure out the stream URL to use and return the highest quality."""
return await asyncio.to_thread(_get_song)
+async def get_podcast(
+ prov_podcast_id: str, headers: dict[str, str], language: str = "en"
+) -> dict[str, str] | None:
+ """Async wrapper around the get_podcast function."""
+
+ def _get_podcast():
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ podcast_obj = ytm.get_podcast(playlistId=prov_podcast_id)
+ if "podcastId" not in podcast_obj:
+ podcast_obj["podcastId"] = prov_podcast_id
+ return podcast_obj
+
+ return await asyncio.to_thread(_get_podcast)
+
+
+async def get_podcast_episode(
+ prov_episode_id: str, headers: dict[str, str], language: str = "en"
+) -> dict[str, str] | None:
+ """Async wrapper around the podcast episode function."""
+
+ def _get_podcast_episode():
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+ episode = ytm.get_episode(videoId=prov_episode_id)
+ if "videoId" not in episode:
+ episode["videoId"] = prov_episode_id
+ return episode
+
+ return await asyncio.to_thread(_get_podcast_episode)
+
+
async def get_library_artists(
headers: dict[str, str], language: str = "en", user: str | None = None
) -> dict[str, str]:
return await asyncio.to_thread(_get_library_tracks)
+async def get_library_podcasts(
+ headers: dict[str, str], language: str = "en", user: str | None = None
+) -> dict[str, str]:
+ """Async wrapper around the ytmusic api get_library_podcasts function."""
+
+ def _get_library_podcasts():
+ ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
+ return ytm.get_library_podcasts(limit=None)
+
+ return await asyncio.to_thread(_get_library_podcasts)
+
+
async def library_add_remove_artist(
headers: dict[str, str], prov_artist_id: str, add: bool = True, user: str | None = None
) -> bool: