[Chore] Move podcastparser parse functions from itunes search provider to helpers...
authorFabian Munkes <105975993+fmunkes@users.noreply.github.com>
Thu, 13 Mar 2025 17:34:26 +0000 (10:34 -0700)
committerGitHub <noreply@github.com>
Thu, 13 Mar 2025 17:34:26 +0000 (18:34 +0100)
music_assistant/helpers/podcast_parsers.py [new file with mode: 0644]
music_assistant/providers/itunes_podcasts/__init__.py
music_assistant/providers/itunes_podcasts/manifest.json
music_assistant/providers/itunes_podcasts/parsers.py [deleted file]

diff --git a/music_assistant/helpers/podcast_parsers.py b/music_assistant/helpers/podcast_parsers.py
new file mode 100644 (file)
index 0000000..c4ae147
--- /dev/null
@@ -0,0 +1,168 @@
+"""Podcastfeed -> Mass."""
+
+from typing import Any
+
+from music_assistant_models.enums import ContentType, ImageType, MediaType
+from music_assistant_models.media_items import (
+    AudioFormat,
+    ItemMapping,
+    MediaItemChapter,
+    MediaItemImage,
+    Podcast,
+    PodcastEpisode,
+    ProviderMapping,
+    UniqueList,
+)
+
+
+def parse_podcast(
+    *,
+    feed_url: str,
+    parsed_feed: dict[str, Any],
+    lookup_key: str,
+    domain: str,
+    instance_id: str,
+    mass_item_id: str | None = None,
+) -> Podcast:
+    """Podcast -> Mass Podcast.
+
+    The item_id is the feed url by default, or the optional mass_item_id instead.
+    """
+    publisher = parsed_feed.get("author") or parsed_feed.get("itunes_author", "NO_AUTHOR")
+    item_id = feed_url if mass_item_id is None else mass_item_id
+    mass_podcast = Podcast(
+        item_id=item_id,
+        name=parsed_feed.get("title", "NO_TITLE"),
+        publisher=publisher,
+        provider=lookup_key,
+        uri=parsed_feed.get("link"),
+        provider_mappings={
+            ProviderMapping(
+                item_id=item_id,
+                provider_domain=domain,
+                provider_instance=instance_id,
+            )
+        },
+    )
+    genres: list[str] = []
+    if _genres := parsed_feed.get("itunes_categories"):
+        for _sub_genre in _genres:
+            if isinstance(_sub_genre, list):
+                genres.extend(x for x in _sub_genre if isinstance(x, str))
+            elif isinstance(_sub_genre, str):
+                genres.append(_sub_genre)
+
+    mass_podcast.metadata.genres = set(genres)
+    mass_podcast.metadata.description = parsed_feed.get("description", "")
+    mass_podcast.metadata.explicit = parsed_feed.get("explicit", False)
+    language = parsed_feed.get("language")
+    if language is not None:
+        mass_podcast.metadata.languages = UniqueList([language])
+    episodes = parsed_feed.get("episodes", [])
+    mass_podcast.total_episodes = len(episodes)
+    podcast_cover = parsed_feed.get("cover_url")
+    if podcast_cover is not None:
+        mass_podcast.metadata.images = UniqueList(
+            [
+                MediaItemImage(
+                    type=ImageType.THUMB,
+                    path=podcast_cover,
+                    provider=lookup_key,
+                    remotely_accessible=True,
+                )
+            ]
+        )
+    return mass_podcast
+
+
+def get_stream_url_and_guid_from_episode(
+    *, episode: dict[str, Any]
+) -> tuple[str | None, str | None]:
+    """Give episode's stream url and guid, if it exists."""
+    episode_enclosures = episode.get("enclosures", [])
+    if len(episode_enclosures) < 1:
+        raise RuntimeError
+    stream_url = episode_enclosures[0].get("url", None)
+    guid = episode.get("guid")
+    return stream_url, guid
+
+
+def parse_podcast_episode(
+    *,
+    episode: dict[str, Any],
+    prov_podcast_id: str,
+    episode_cnt: int,
+    podcast_cover: str | None = None,
+    lookup_key: str,
+    domain: str,
+    instance_id: str,
+    mass_item_id: str | None = None,
+) -> PodcastEpisode:
+    """Podcast Episode -> Mass Podcast Episode.
+
+    The item_id is {prov_podcast_id} {guid_or_stream_url} by default, or the optional mass_item_id
+    instead. The podcast_cover is used, if the episode should not have its own cover.
+    """
+    episode_duration = episode.get("total_time", 0.0)
+    episode_title = episode.get("title", "NO_EPISODE_TITLE")
+    episode_cover = episode.get("episode_art_url", podcast_cover)
+    episode_published = episode.get("published")
+
+    stream_url, guid = get_stream_url_and_guid_from_episode(episode=episode)
+    guid_or_stream_url = guid if guid is not None else stream_url
+    if stream_url is None:
+        raise RuntimeError("Episode has no stream information!")
+
+    # Default episode id. A guid is preferred as identification.
+    episode_id = f"{prov_podcast_id} {guid_or_stream_url}" if mass_item_id is None else mass_item_id
+    mass_episode = PodcastEpisode(
+        item_id=episode_id,
+        provider=lookup_key,
+        name=episode_title,
+        duration=int(episode_duration),
+        position=episode_cnt,
+        podcast=ItemMapping(
+            item_id=prov_podcast_id,
+            provider=lookup_key,
+            name=episode_title,
+            media_type=MediaType.PODCAST,
+        ),
+        provider_mappings={
+            ProviderMapping(
+                item_id=episode_id,
+                provider_domain=domain,
+                provider_instance=instance_id,
+                audio_format=AudioFormat(
+                    content_type=ContentType.try_parse(stream_url),
+                ),
+                url=stream_url,
+            )
+        },
+    )
+    mass_episode.metadata.release_date = episode_published
+
+    # chapter
+    if chapters := episode.get("chapters"):
+        _chapters = []
+        for cnt, chapter in chapters:
+            if not isinstance(chapter, dict):
+                continue
+            title = chapter.get("title")
+            start = chapter.get("start")
+            if title and start:
+                _chapters.append(MediaItemChapter(position=cnt + 1, name=title, start=start))
+
+    # cover image
+    if episode_cover is not None:
+        mass_episode.metadata.images = UniqueList(
+            [
+                MediaItemImage(
+                    type=ImageType.THUMB,
+                    path=episode_cover,
+                    provider=lookup_key,
+                    remotely_accessible=True,
+                )
+            ]
+        )
+
+    return mass_episode
index f54bf29c680a04df12ea843f0e03034e25554926..0b124f7ed14a7d2a4e6f2ac47d05be901a03694e 100644 (file)
@@ -31,9 +31,9 @@ from music_assistant_models.media_items import (
 )
 from music_assistant_models.streamdetails import StreamDetails
 
+from music_assistant.helpers.podcast_parsers import parse_podcast, parse_podcast_episode
 from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
 from music_assistant.models.music_provider import MusicProvider
-from music_assistant.providers.itunes_podcasts.parsers import parse_podcast, parse_podcast_episode
 from music_assistant.providers.itunes_podcasts.schema import ITunesSearchResults
 
 if TYPE_CHECKING:
@@ -201,7 +201,7 @@ class ITunesPodcastsProvider(MusicProvider):
 
     async def get_podcast(self, prov_podcast_id: str) -> Podcast:
         """Get podcast."""
-        parsed = await self._get_cached_podcast(prov_podcast_id)
+        parsed = await self._cache_get_podcast(prov_podcast_id)
 
         return parse_podcast(
             feed_url=prov_podcast_id,
@@ -215,7 +215,7 @@ class ITunesPodcastsProvider(MusicProvider):
         self, prov_podcast_id: str
     ) -> AsyncGenerator[PodcastEpisode, None]:
         """Get podcast episodes."""
-        podcast = await self._get_cached_podcast(prov_podcast_id)
+        podcast = await self._cache_get_podcast(prov_podcast_id)
         podcast_cover = podcast.get("cover_url")
         episodes = podcast.get("episodes", [])
         for cnt, episode in enumerate(episodes):
@@ -232,7 +232,7 @@ class ITunesPodcastsProvider(MusicProvider):
     async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
         """Get single podcast episode."""
         prov_podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
-        podcast = await self._get_cached_podcast(prov_podcast_id)
+        podcast = await self._cache_get_podcast(prov_podcast_id)
         podcast_cover = podcast.get("cover_url")
         episodes = podcast.get("episodes", [])
         for cnt, episode in enumerate(episodes):
@@ -254,7 +254,7 @@ class ITunesPodcastsProvider(MusicProvider):
         raise MediaNotFoundError("Episode not found")
 
     async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
-        podcast = await self._get_cached_podcast(podcast_id)
+        podcast = await self._cache_get_podcast(podcast_id)
         episodes = podcast.get("episodes", [])
         for cnt, episode in enumerate(episodes):
             episode_enclosures = episode.get("enclosures", [])
@@ -284,7 +284,7 @@ class ITunesPodcastsProvider(MusicProvider):
             allow_seek=True,
         )
 
-    async def _get_cached_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
+    async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
         parsed_podcast = await self.mass.cache.get(
             key=prov_podcast_id,
             base_key=self.lookup_key,
index 264c9f543a52ead6deb3ff6bedcce8c44d3e15d4..4210726cec3e7b607fcdfebcea1d8c2a395e9e02 100644 (file)
@@ -6,9 +6,6 @@
   "codeowners": [
     "@fmunkes"
   ],
-  "requirements": [
-    "podcastparser==0.6.10"
-  ],
   "icon": "podcast",
   "documentation": "https://music-assistant.io/music-providers/itunes-podcasts/",
   "multi_instance": false
diff --git a/music_assistant/providers/itunes_podcasts/parsers.py b/music_assistant/providers/itunes_podcasts/parsers.py
deleted file mode 100644 (file)
index 5201868..0000000
+++ /dev/null
@@ -1,141 +0,0 @@
-"""Podcastfeed -> Mass."""
-
-from typing import Any
-
-from music_assistant_models.enums import ContentType, ImageType, MediaType
-from music_assistant_models.media_items import (
-    AudioFormat,
-    ItemMapping,
-    MediaItemChapter,
-    MediaItemImage,
-    Podcast,
-    PodcastEpisode,
-    ProviderMapping,
-    UniqueList,
-)
-
-
-def parse_podcast(
-    *, feed_url: str, parsed_feed: dict[str, Any], lookup_key: str, domain: str, instance_id: str
-) -> Podcast:
-    """Podcast -> Mass Podcast."""
-    publisher = parsed_feed.get("author") or parsed_feed.get("itunes_author", "NO_AUTHOR")
-    mass_podcast = Podcast(
-        item_id=feed_url,
-        name=parsed_feed.get("title", "NO_TITLE"),
-        publisher=publisher,
-        provider=lookup_key,
-        uri=parsed_feed.get("link"),
-        provider_mappings={
-            ProviderMapping(
-                item_id=feed_url,
-                provider_domain=domain,
-                provider_instance=instance_id,
-            )
-        },
-    )
-    genres: list[str] = []
-    if _genres := parsed_feed.get("itunes_categories"):
-        for _sub_genre in _genres:
-            if isinstance(_sub_genre, list):
-                genres.extend(x for x in _sub_genre if isinstance(x, str))
-            elif isinstance(_sub_genre, str):
-                genres.append(_sub_genre)
-
-    mass_podcast.metadata.genres = set(genres)
-    mass_podcast.metadata.description = parsed_feed.get("description", "")
-    mass_podcast.metadata.explicit = parsed_feed.get("explicit", False)
-    language = parsed_feed.get("language")
-    if language is not None:
-        mass_podcast.metadata.languages = UniqueList([language])
-    episodes = parsed_feed.get("episodes", [])
-    mass_podcast.total_episodes = len(episodes)
-    podcast_cover = parsed_feed.get("cover_url")
-    if podcast_cover is not None:
-        mass_podcast.metadata.images = UniqueList(
-            [
-                MediaItemImage(
-                    type=ImageType.THUMB,
-                    path=podcast_cover,
-                    provider=lookup_key,
-                    remotely_accessible=True,
-                )
-            ]
-        )
-    return mass_podcast
-
-
-def parse_podcast_episode(
-    *,
-    episode: dict[str, Any],
-    prov_podcast_id: str,
-    episode_cnt: int,
-    podcast_cover: str | None = None,
-    lookup_key: str,
-    domain: str,
-    instance_id: str,
-) -> PodcastEpisode:
-    """Podcast Episode -> Mass Podcast Episode."""
-    episode_duration = episode.get("total_time", 0.0)
-    episode_title = episode.get("title", "NO_EPISODE_TITLE")
-    episode_cover = episode.get("episode_art_url", podcast_cover)
-    episode_published = episode.get("published")
-    episode_enclosures = episode.get("enclosures", [])
-    if len(episode_enclosures) < 1:
-        raise RuntimeError
-    stream_url = episode_enclosures[0].get("url", None)
-    # not all feeds have a guid, but a guid is preferred as identification
-    guid_or_stream_url = episode.get("guid", stream_url)
-
-    episode_id = f"{prov_podcast_id} {guid_or_stream_url}"
-    mass_episode = PodcastEpisode(
-        item_id=episode_id,
-        provider=lookup_key,
-        name=episode_title,
-        duration=int(episode_duration),
-        position=episode_cnt,
-        podcast=ItemMapping(
-            item_id=prov_podcast_id,
-            provider=lookup_key,
-            name=episode_title,
-            media_type=MediaType.PODCAST,
-        ),
-        provider_mappings={
-            ProviderMapping(
-                item_id=episode_id,
-                provider_domain=domain,
-                provider_instance=instance_id,
-                audio_format=AudioFormat(
-                    content_type=ContentType.try_parse(stream_url),
-                ),
-                url=stream_url,
-            )
-        },
-    )
-    mass_episode.metadata.release_date = episode_published
-
-    # chapter
-    if chapters := episode.get("chapters"):
-        _chapters = []
-        for cnt, chapter in chapters:
-            if not isinstance(chapter, dict):
-                continue
-            title = chapter.get("title")
-            start = chapter.get("start")
-            if title and start:
-                _chapters.append(MediaItemChapter(position=cnt + 1, name=title, start=start))
-
-    # cover image
-    if episode_cover is not None:
-        mass_episode.metadata.images = UniqueList(
-            [
-                MediaItemImage(
-                    type=ImageType.THUMB,
-                    path=episode_cover,
-                    provider=lookup_key,
-                    remotely_accessible=True,
-                )
-            ]
-        )
-
-    return mass_episode