YTMusic: Add podcast support (#1924)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Thu, 30 Jan 2025 14:18:50 +0000 (15:18 +0100)
committerGitHub <noreply@github.com>
Thu, 30 Jan 2025 14:18:50 +0000 (15:18 +0100)
* Add library and parse functions.

* Add podcast playback.

* Add podcast episode splitter.

music_assistant/providers/ytmusic/__init__.py
music_assistant/providers/ytmusic/helpers.py
music_assistant/providers/ytmusic/manifest.json
requirements_all.txt

index 374cad8607209e069b86311bcebda011228ac3a1..97e9f99984448138e75f7b544c6bae0e0d1e6a08 100644 (file)
@@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any
 from urllib.parse import unquote
 
 import yt_dlp
+from duration_parser import parse as parse_str_duration
 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant_models.enums import (
     AlbumType,
@@ -34,6 +35,8 @@ from music_assistant_models.media_items import (
     MediaItemType,
     MediaType,
     Playlist,
+    Podcast,
+    PodcastEpisode,
     ProviderMapping,
     SearchResults,
     Track,
@@ -54,8 +57,11 @@ from .helpers import (
     get_library_albums,
     get_library_artists,
     get_library_playlists,
+    get_library_podcasts,
     get_library_tracks,
     get_playlist,
+    get_podcast,
+    get_podcast_episode,
     get_song_radio_tracks,
     get_track,
     is_brand_account,
@@ -82,6 +88,7 @@ VARIOUS_ARTISTS_YTM_ID = "UCUTXlgdcKU5vfzFqHOWIvkA"
 # Playlist ID's are not unique across instances for lists like 'Liked videos', 'SuperMix' etc.
 # So we need to add a delimiter to make them unique
 YT_PLAYLIST_ID_DELIMITER = "🎵"
+PODCAST_EPISODE_SPLITTER = "|"
 YT_PERSONAL_PLAYLISTS = (
     "LM",  # Liked songs
     "SE"  # Episodes for Later
@@ -110,6 +117,7 @@ SUPPORTED_FEATURES = {
     ProviderFeature.ARTIST_ALBUMS,
     ProviderFeature.ARTIST_TOPTRACKS,
     ProviderFeature.SIMILAR_TRACKS,
+    ProviderFeature.LIBRARY_PODCASTS,
 }
 
 
@@ -273,6 +281,14 @@ class YoutubeMusicProvider(MusicProvider):
                 track = await self.get_track(track["videoId"])
                 yield track
 
+    async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+        """Retrieve the library podcasts from Youtube Music."""
+        podcasts_obj = await get_library_podcasts(
+            headers=self._headers, language=self.language, user=self._yt_user
+        )
+        for podcast in podcasts_obj:
+            yield self._parse_podcast(podcast)
+
     async def get_album(self, prov_album_id) -> Album:
         """Get full album details by id."""
         if album_obj := await get_album(prov_album_id=prov_album_id, language=self.language):
@@ -387,6 +403,33 @@ class YoutubeMusicProvider(MusicProvider):
             return playlist_tracks[:25]
         return []
 
+    async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+        """Get the full details of a Podcast."""
+        podcast_obj = await get_podcast(prov_podcast_id, headers=self._headers)
+        return self._parse_podcast(podcast_obj)
+
+    async def get_podcast_episodes(self, prov_podcast_id: str) -> list[PodcastEpisode]:
+        """Get all episodes from a podcast."""
+        podcast_obj = await get_podcast(prov_podcast_id, headers=self._headers)
+        podcast_obj["podcastId"] = prov_podcast_id
+        podcast = self._parse_podcast(podcast_obj)
+        episodes = []
+        for index, episode_obj in enumerate(podcast_obj.get("episodes", []), start=1):
+            episode = self._parse_podcast_episode(episode_obj, podcast)
+            ep_index = episode_obj.get("index") or index
+            episode.position = ep_index
+            episodes.append(episode)
+        return episodes
+
+    async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+        """Get a single Podcast Episode."""
+        podcast_id, episode_id = prov_episode_id.split(PODCAST_EPISODE_SPLITTER)
+        podcast = await self.get_podcast(podcast_id)
+        episode_obj = await get_podcast_episode(episode_id, headers=self._headers)
+        episode = self._parse_podcast_episode(episode_obj, podcast)
+        episode.position = 0
+        return episode
+
     async def library_add(self, item: MediaItemType) -> bool:
         """Add an item to the library."""
         result = False
@@ -498,6 +541,8 @@ class YoutubeMusicProvider(MusicProvider):
         self, item_id: str, media_type: MediaType = MediaType.TRACK
     ) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
+        if media_type == MediaType.PODCAST_EPISODE:
+            item_id = item_id.split(PODCAST_EPISODE_SPLITTER)[1]
         stream_format = await self._get_stream_format(item_id=item_id)
         self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)
         stream_details = StreamDetails(
@@ -645,7 +690,7 @@ class YoutubeMusicProvider(MusicProvider):
         """Parse a YT Playlist response to a Playlist object."""
         playlist_id = playlist_obj["id"]
         playlist_name = playlist_obj["title"]
-        is_editable = playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE"
+        is_editable = playlist_obj.get("privacy", "") == "PRIVATE"
         # Playlist ID's are not unique across instances for lists like 'Likes', 'Supermix', etc.
         # So suffix with the instance id to make them unique
         if playlist_id in YT_PERSONAL_PLAYLISTS:
@@ -737,6 +782,63 @@ class YoutubeMusicProvider(MusicProvider):
             track.duration = int(track_obj["duration_seconds"])
         return track
 
+    def _parse_podcast(self, podcast_obj: dict) -> Podcast:
+        """Parse a YTM Podcast into a MA Podcast."""
+        podcast = Podcast(
+            item_id=podcast_obj["podcastId"],
+            name=podcast_obj["title"],
+            provider=self.lookup_key,
+            provider_mappings={
+                ProviderMapping(
+                    item_id=podcast_obj["podcastId"],
+                    provider_domain=self.domain,
+                    provider_instance=self.instance_id,
+                )
+            },
+        )
+        if description := podcast_obj.get("description"):
+            podcast.metadata.description = description
+        if author := podcast_obj.get("author"):
+            podcast.publisher = author["name"]
+        if thumbnails := podcast_obj.get("thumbnails"):
+            podcast.metadata.images = self._parse_thumbnails(thumbnails)
+        return podcast
+
+    def _parse_podcast_episode(self, episode_obj: dict, podcast: Podcast | None) -> PodcastEpisode:
+        """Parse a raw episode into a PodcastEpisode."""
+        episode_id = episode_obj.get("videoId")
+        if not episode_id:
+            msg = "Podcast episode is missing videoId"
+            raise InvalidDataError(msg)
+        item_id = f"{podcast.item_id}{PODCAST_EPISODE_SPLITTER}{episode_id}"
+        episode = PodcastEpisode(
+            item_id=item_id,
+            provider=self.lookup_key,
+            name=episode_obj.get("title"),
+            podcast=podcast,
+            provider_mappings={
+                ProviderMapping(
+                    item_id=item_id,
+                    provider_domain=self.domain,
+                    provider_instance=self.instance_id,
+                    audio_format=AudioFormat(
+                        content_type=ContentType.M4A,
+                    ),
+                    url=f"{YTM_DOMAIN}/watch?v={episode_id}",
+                )
+            },
+        )
+        if duration := episode_obj.get("duration"):
+            duration_sec = parse_str_duration(duration)
+            episode.duration = duration_sec
+        if description := episode_obj.get("description"):
+            episode.metadata.description = description
+        if thumbnails := episode_obj.get("thumbnails"):
+            episode.metadata.images = self._parse_thumbnails(thumbnails)
+        if release_date := episode_obj.get("date"):
+            episode.metadata.release_date = release_date
+        return episode
+
     async def _get_stream_format(self, item_id: str) -> dict[str, Any]:
         """Figure out the stream URL to use and return the highest quality."""
 
index e6c77ca2b9b23ee16220759e6d0a2ca895c70a6b..9ac336def0efb1b434c3ae75aa1f4e8eae197d01 100644 (file)
@@ -91,6 +91,36 @@ async def get_track(
     return await asyncio.to_thread(_get_song)
 
 
+async def get_podcast(
+    prov_podcast_id: str, headers: dict[str, str], language: str = "en"
+) -> dict[str, str] | None:
+    """Async wrapper around the get_podcast function."""
+
+    def _get_podcast():
+        ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+        podcast_obj = ytm.get_podcast(playlistId=prov_podcast_id)
+        if "podcastId" not in podcast_obj:
+            podcast_obj["podcastId"] = prov_podcast_id
+        return podcast_obj
+
+    return await asyncio.to_thread(_get_podcast)
+
+
+async def get_podcast_episode(
+    prov_episode_id: str, headers: dict[str, str], language: str = "en"
+) -> dict[str, str] | None:
+    """Async wrapper around the podcast episode function."""
+
+    def _get_podcast_episode():
+        ytm = ytmusicapi.YTMusic(auth=headers, language=language)
+        episode = ytm.get_episode(videoId=prov_episode_id)
+        if "videoId" not in episode:
+            episode["videoId"] = prov_episode_id
+        return episode
+
+    return await asyncio.to_thread(_get_podcast_episode)
+
+
 async def get_library_artists(
     headers: dict[str, str], language: str = "en", user: str | None = None
 ) -> dict[str, str]:
@@ -152,6 +182,18 @@ async def get_library_tracks(
     return await asyncio.to_thread(_get_library_tracks)
 
 
+async def get_library_podcasts(
+    headers: dict[str, str], language: str = "en", user: str | None = None
+) -> dict[str, str]:
+    """Async wrapper around the ytmusic api get_library_podcasts function."""
+
+    def _get_library_podcasts():
+        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
+        return ytm.get_library_podcasts(limit=None)
+
+    return await asyncio.to_thread(_get_library_podcasts)
+
+
 async def library_add_remove_artist(
     headers: dict[str, str], prov_artist_id: str, add: bool = True, user: str | None = None
 ) -> bool:
index 85975030d54bc76e2501807a74a21086b3fc6dce..ba8b48737eba19787206692f035d04f78101f987 100644 (file)
@@ -4,7 +4,7 @@
   "name": "YouTube Music",
   "description": "Support for the YouTube Music streaming provider in Music Assistant.",
   "codeowners": ["@MarvinSchenkel"],
-  "requirements": ["ytmusicapi==1.9.1", "yt-dlp==2024.12.23"],
+  "requirements": ["ytmusicapi==1.9.1", "yt-dlp==2024.12.23", "duration-parser==1.0.1"],
   "documentation": "https://music-assistant.io/music-providers/youtube-music/",
   "multi_instance": true
 }
index c9a03d5c6fb1eb7c07fc65e5e8290206aaf639a2..a9289c70e10851ba369842177390a048b91c455e 100644 (file)
@@ -17,6 +17,7 @@ colorlog==6.9.0
 cryptography==44.0.0
 deezer-python-async==0.3.0
 defusedxml==0.7.1
+duration-parser==1.0.1
 eyeD3==0.9.7
 faust-cchardet>=2.1.18
 hass-client==1.2.0