From: Fabian Munkes <105975993+fmunkes@users.noreply.github.com> Date: Thu, 13 Mar 2025 17:34:26 +0000 (-0700) Subject: [Chore] Move podcastparser parse functions from itunes search provider to helpers... X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=7593d8bfc2ad1f5decaf2b509f5612c4a4540897;p=music-assistant-server.git [Chore] Move podcastparser parse functions from itunes search provider to helpers (#2034) --- diff --git a/music_assistant/helpers/podcast_parsers.py b/music_assistant/helpers/podcast_parsers.py new file mode 100644 index 00000000..c4ae1473 --- /dev/null +++ b/music_assistant/helpers/podcast_parsers.py @@ -0,0 +1,168 @@ +"""Podcastfeed -> Mass.""" + +from typing import Any + +from music_assistant_models.enums import ContentType, ImageType, MediaType +from music_assistant_models.media_items import ( + AudioFormat, + ItemMapping, + MediaItemChapter, + MediaItemImage, + Podcast, + PodcastEpisode, + ProviderMapping, + UniqueList, +) + + +def parse_podcast( + *, + feed_url: str, + parsed_feed: dict[str, Any], + lookup_key: str, + domain: str, + instance_id: str, + mass_item_id: str | None = None, +) -> Podcast: + """Podcast -> Mass Podcast. + + The item_id is the feed url by default, or the optional mass_item_id instead. + """ + publisher = parsed_feed.get("author") or parsed_feed.get("itunes_author", "NO_AUTHOR") + item_id = feed_url if mass_item_id is None else mass_item_id + mass_podcast = Podcast( + item_id=item_id, + name=parsed_feed.get("title", "NO_TITLE"), + publisher=publisher, + provider=lookup_key, + uri=parsed_feed.get("link"), + provider_mappings={ + ProviderMapping( + item_id=item_id, + provider_domain=domain, + provider_instance=instance_id, + ) + }, + ) + genres: list[str] = [] + if _genres := parsed_feed.get("itunes_categories"): + for _sub_genre in _genres: + if isinstance(_sub_genre, list): + genres.extend(x for x in _sub_genre if isinstance(x, str)) + elif isinstance(_sub_genre, str): + genres.append(_sub_genre) + + mass_podcast.metadata.genres = set(genres) + mass_podcast.metadata.description = parsed_feed.get("description", "") + mass_podcast.metadata.explicit = parsed_feed.get("explicit", False) + language = parsed_feed.get("language") + if language is not None: + mass_podcast.metadata.languages = UniqueList([language]) + episodes = parsed_feed.get("episodes", []) + mass_podcast.total_episodes = len(episodes) + podcast_cover = parsed_feed.get("cover_url") + if podcast_cover is not None: + mass_podcast.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=podcast_cover, + provider=lookup_key, + remotely_accessible=True, + ) + ] + ) + return mass_podcast + + +def get_stream_url_and_guid_from_episode( + *, episode: dict[str, Any] +) -> tuple[str | None, str | None]: + """Give episode's stream url and guid, if it exists.""" + episode_enclosures = episode.get("enclosures", []) + if len(episode_enclosures) < 1: + raise RuntimeError + stream_url = episode_enclosures[0].get("url", None) + guid = episode.get("guid") + return stream_url, guid + + +def parse_podcast_episode( + *, + episode: dict[str, Any], + prov_podcast_id: str, + episode_cnt: int, + podcast_cover: str | None = None, + lookup_key: str, + domain: str, + instance_id: str, + mass_item_id: str | None = None, +) -> PodcastEpisode: + """Podcast Episode -> Mass Podcast Episode. + + The item_id is {prov_podcast_id} {guid_or_stream_url} by default, or the optional mass_item_id + instead. The podcast_cover is used, if the episode should not have its own cover. + """ + episode_duration = episode.get("total_time", 0.0) + episode_title = episode.get("title", "NO_EPISODE_TITLE") + episode_cover = episode.get("episode_art_url", podcast_cover) + episode_published = episode.get("published") + + stream_url, guid = get_stream_url_and_guid_from_episode(episode=episode) + guid_or_stream_url = guid if guid is not None else stream_url + if stream_url is None: + raise RuntimeError("Episode has no stream information!") + + # Default episode id. A guid is preferred as identification. + episode_id = f"{prov_podcast_id} {guid_or_stream_url}" if mass_item_id is None else mass_item_id + mass_episode = PodcastEpisode( + item_id=episode_id, + provider=lookup_key, + name=episode_title, + duration=int(episode_duration), + position=episode_cnt, + podcast=ItemMapping( + item_id=prov_podcast_id, + provider=lookup_key, + name=episode_title, + media_type=MediaType.PODCAST, + ), + provider_mappings={ + ProviderMapping( + item_id=episode_id, + provider_domain=domain, + provider_instance=instance_id, + audio_format=AudioFormat( + content_type=ContentType.try_parse(stream_url), + ), + url=stream_url, + ) + }, + ) + mass_episode.metadata.release_date = episode_published + + # chapter + if chapters := episode.get("chapters"): + _chapters = [] + for cnt, chapter in chapters: + if not isinstance(chapter, dict): + continue + title = chapter.get("title") + start = chapter.get("start") + if title and start: + _chapters.append(MediaItemChapter(position=cnt + 1, name=title, start=start)) + + # cover image + if episode_cover is not None: + mass_episode.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=episode_cover, + provider=lookup_key, + remotely_accessible=True, + ) + ] + ) + + return mass_episode diff --git a/music_assistant/providers/itunes_podcasts/__init__.py b/music_assistant/providers/itunes_podcasts/__init__.py index f54bf29c..0b124f7e 100644 --- a/music_assistant/providers/itunes_podcasts/__init__.py +++ b/music_assistant/providers/itunes_podcasts/__init__.py @@ -31,9 +31,9 @@ from music_assistant_models.media_items import ( ) from music_assistant_models.streamdetails import StreamDetails +from music_assistant.helpers.podcast_parsers import parse_podcast, parse_podcast_episode from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries from music_assistant.models.music_provider import MusicProvider -from music_assistant.providers.itunes_podcasts.parsers import parse_podcast, parse_podcast_episode from music_assistant.providers.itunes_podcasts.schema import ITunesSearchResults if TYPE_CHECKING: @@ -201,7 +201,7 @@ class ITunesPodcastsProvider(MusicProvider): async def get_podcast(self, prov_podcast_id: str) -> Podcast: """Get podcast.""" - parsed = await self._get_cached_podcast(prov_podcast_id) + parsed = await self._cache_get_podcast(prov_podcast_id) return parse_podcast( feed_url=prov_podcast_id, @@ -215,7 +215,7 @@ class ITunesPodcastsProvider(MusicProvider): self, prov_podcast_id: str ) -> AsyncGenerator[PodcastEpisode, None]: """Get podcast episodes.""" - podcast = await self._get_cached_podcast(prov_podcast_id) + podcast = await self._cache_get_podcast(prov_podcast_id) podcast_cover = podcast.get("cover_url") episodes = podcast.get("episodes", []) for cnt, episode in enumerate(episodes): @@ -232,7 +232,7 @@ class ITunesPodcastsProvider(MusicProvider): async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode: """Get single podcast episode.""" prov_podcast_id, guid_or_stream_url = prov_episode_id.split(" ") - podcast = await self._get_cached_podcast(prov_podcast_id) + podcast = await self._cache_get_podcast(prov_podcast_id) podcast_cover = podcast.get("cover_url") episodes = podcast.get("episodes", []) for cnt, episode in enumerate(episodes): @@ -254,7 +254,7 @@ class ITunesPodcastsProvider(MusicProvider): raise MediaNotFoundError("Episode not found") async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None: - podcast = await self._get_cached_podcast(podcast_id) + podcast = await self._cache_get_podcast(podcast_id) episodes = podcast.get("episodes", []) for cnt, episode in enumerate(episodes): episode_enclosures = episode.get("enclosures", []) @@ -284,7 +284,7 @@ class ITunesPodcastsProvider(MusicProvider): allow_seek=True, ) - async def _get_cached_podcast(self, prov_podcast_id: str) -> dict[str, Any]: + async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]: parsed_podcast = await self.mass.cache.get( key=prov_podcast_id, base_key=self.lookup_key, diff --git a/music_assistant/providers/itunes_podcasts/manifest.json b/music_assistant/providers/itunes_podcasts/manifest.json index 264c9f54..4210726c 100644 --- a/music_assistant/providers/itunes_podcasts/manifest.json +++ b/music_assistant/providers/itunes_podcasts/manifest.json @@ -6,9 +6,6 @@ "codeowners": [ "@fmunkes" ], - "requirements": [ - "podcastparser==0.6.10" - ], "icon": "podcast", "documentation": "https://music-assistant.io/music-providers/itunes-podcasts/", "multi_instance": false diff --git a/music_assistant/providers/itunes_podcasts/parsers.py b/music_assistant/providers/itunes_podcasts/parsers.py deleted file mode 100644 index 5201868d..00000000 --- a/music_assistant/providers/itunes_podcasts/parsers.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Podcastfeed -> Mass.""" - -from typing import Any - -from music_assistant_models.enums import ContentType, ImageType, MediaType -from music_assistant_models.media_items import ( - AudioFormat, - ItemMapping, - MediaItemChapter, - MediaItemImage, - Podcast, - PodcastEpisode, - ProviderMapping, - UniqueList, -) - - -def parse_podcast( - *, feed_url: str, parsed_feed: dict[str, Any], lookup_key: str, domain: str, instance_id: str -) -> Podcast: - """Podcast -> Mass Podcast.""" - publisher = parsed_feed.get("author") or parsed_feed.get("itunes_author", "NO_AUTHOR") - mass_podcast = Podcast( - item_id=feed_url, - name=parsed_feed.get("title", "NO_TITLE"), - publisher=publisher, - provider=lookup_key, - uri=parsed_feed.get("link"), - provider_mappings={ - ProviderMapping( - item_id=feed_url, - provider_domain=domain, - provider_instance=instance_id, - ) - }, - ) - genres: list[str] = [] - if _genres := parsed_feed.get("itunes_categories"): - for _sub_genre in _genres: - if isinstance(_sub_genre, list): - genres.extend(x for x in _sub_genre if isinstance(x, str)) - elif isinstance(_sub_genre, str): - genres.append(_sub_genre) - - mass_podcast.metadata.genres = set(genres) - mass_podcast.metadata.description = parsed_feed.get("description", "") - mass_podcast.metadata.explicit = parsed_feed.get("explicit", False) - language = parsed_feed.get("language") - if language is not None: - mass_podcast.metadata.languages = UniqueList([language]) - episodes = parsed_feed.get("episodes", []) - mass_podcast.total_episodes = len(episodes) - podcast_cover = parsed_feed.get("cover_url") - if podcast_cover is not None: - mass_podcast.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=podcast_cover, - provider=lookup_key, - remotely_accessible=True, - ) - ] - ) - return mass_podcast - - -def parse_podcast_episode( - *, - episode: dict[str, Any], - prov_podcast_id: str, - episode_cnt: int, - podcast_cover: str | None = None, - lookup_key: str, - domain: str, - instance_id: str, -) -> PodcastEpisode: - """Podcast Episode -> Mass Podcast Episode.""" - episode_duration = episode.get("total_time", 0.0) - episode_title = episode.get("title", "NO_EPISODE_TITLE") - episode_cover = episode.get("episode_art_url", podcast_cover) - episode_published = episode.get("published") - episode_enclosures = episode.get("enclosures", []) - if len(episode_enclosures) < 1: - raise RuntimeError - stream_url = episode_enclosures[0].get("url", None) - # not all feeds have a guid, but a guid is preferred as identification - guid_or_stream_url = episode.get("guid", stream_url) - - episode_id = f"{prov_podcast_id} {guid_or_stream_url}" - mass_episode = PodcastEpisode( - item_id=episode_id, - provider=lookup_key, - name=episode_title, - duration=int(episode_duration), - position=episode_cnt, - podcast=ItemMapping( - item_id=prov_podcast_id, - provider=lookup_key, - name=episode_title, - media_type=MediaType.PODCAST, - ), - provider_mappings={ - ProviderMapping( - item_id=episode_id, - provider_domain=domain, - provider_instance=instance_id, - audio_format=AudioFormat( - content_type=ContentType.try_parse(stream_url), - ), - url=stream_url, - ) - }, - ) - mass_episode.metadata.release_date = episode_published - - # chapter - if chapters := episode.get("chapters"): - _chapters = [] - for cnt, chapter in chapters: - if not isinstance(chapter, dict): - continue - title = chapter.get("title") - start = chapter.get("start") - if title and start: - _chapters.append(MediaItemChapter(position=cnt + 1, name=title, start=start)) - - # cover image - if episode_cover is not None: - mass_episode.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=episode_cover, - provider=lookup_key, - remotely_accessible=True, - ) - ] - ) - - return mass_episode