return mass_podcast
-def get_stream_url_and_guid_from_episode(
- *, episode: dict[str, Any]
-) -> tuple[str | None, str | None]:
+def get_stream_url_and_guid_from_episode(*, episode: dict[str, Any]) -> tuple[str, str | None]:
"""Give episode's stream url and guid, if it exists."""
episode_enclosures = episode.get("enclosures", [])
if len(episode_enclosures) < 1:
- raise RuntimeError
- stream_url = episode_enclosures[0].get("url", None)
- guid = episode.get("guid")
- return stream_url, guid
+ raise ValueError("Episode enclosure is missing")
+ if stream_url := episode_enclosures[0].get("url"):
+ guid = episode.get("guid")
+ return stream_url, guid
+ raise ValueError("Stream URL is missing.")
def parse_podcast_episode(
domain: str,
instance_id: str,
mass_item_id: str | None = None,
-) -> PodcastEpisode:
+) -> PodcastEpisode | None:
"""Podcast Episode -> Mass Podcast Episode.
The item_id is {prov_podcast_id} {guid_or_stream_url} by default, or the optional mass_item_id
instead. The podcast_cover is used, if the episode should not have its own cover.
+
+ The function returns None, if the episode enclosure is missing, i.e. there is no stream
+ information present.
"""
episode_duration = episode.get("total_time", 0.0)
episode_title = episode.get("title", "NO_EPISODE_TITLE")
if episode_published == 0:
episode_published = None
- stream_url, guid = get_stream_url_and_guid_from_episode(episode=episode)
+ try:
+ stream_url, guid = get_stream_url_and_guid_from_episode(episode=episode)
+ except ValueError:
+ # we are missing the episode enclosure or stream information
+ return None
guid_or_stream_url = guid if guid is not None else stream_url
- if stream_url is None:
- raise RuntimeError("Episode has no stream information!")
# Default episode id. A guid is preferred as identification.
episode_id = f"{prov_podcast_id} {guid_or_stream_url}" if mass_item_id is None else mass_item_id
lookup_key=self.lookup_key,
instance_id=self.instance_id,
)
- stream_url, guid = get_stream_url_and_guid_from_episode(episode=parsed_episode)
+ if mass_episode is None:
+ # faulty episode
+ continue
+ try:
+ stream_url, guid = get_stream_url_and_guid_from_episode(episode=parsed_episode)
+ except ValueError:
+ # episode enclosure or stream url missing
+ continue
for action in episode_actions:
# we have to test both, as we are comparing to external input.
podcast_cover = podcast.get("cover_url")
episodes = podcast.get("episodes", [])
for cnt, episode in enumerate(episodes):
- yield parse_podcast_episode(
+ if mass_episode := parse_podcast_episode(
episode=episode,
prov_podcast_id=prov_podcast_id,
episode_cnt=cnt,
domain=self.domain,
lookup_key=self.lookup_key,
instance_id=self.instance_id,
- )
+ ):
+ yield mass_episode
async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
"""Get single podcast episode."""
- prov_podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
- podcast = await self._cache_get_podcast(prov_podcast_id)
- podcast_cover = podcast.get("cover_url")
- episodes = podcast.get("episodes", [])
- for cnt, episode in enumerate(episodes):
- episode_enclosures = episode.get("enclosures", [])
- if len(episode_enclosures) < 1:
- raise MediaNotFoundError
- stream_url = episode_enclosures[0].get("url", None)
- if guid_or_stream_url == episode.get("guid", stream_url):
- return parse_podcast_episode(
- episode=episode,
- prov_podcast_id=prov_podcast_id,
- episode_cnt=cnt,
- podcast_cover=podcast_cover,
- domain=self.domain,
- lookup_key=self.lookup_key,
- instance_id=self.instance_id,
- )
-
+ podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
+ async for mass_episode in self.get_podcast_episodes(podcast_id):
+ _, _guid_or_stream_url = mass_episode.item_id.split(" ")
+ # this is enough, as internal
+ if guid_or_stream_url == _guid_or_stream_url:
+ return mass_episode
raise MediaNotFoundError("Episode not found")
async def recommendations(self) -> list[RecommendationFolder]:
"""Get (full) podcast episode details by id."""
for idx, episode in enumerate(self.parsed_podcast["episodes"]):
if prov_episode_id == episode["guid"]:
- return await self._parse_episode(episode, idx)
+ if mass_episode := self._parse_episode(episode, idx):
+ return mass_episode
raise MediaNotFoundError("Episode not found")
async def get_podcast_episodes(
if episodes and episodes[0].get("published", 0) != 0:
episodes.sort(key=lambda x: x.get("published", 0))
for idx, episode in enumerate(episodes):
- yield await self._parse_episode(episode, idx)
+ if mass_episode := self._parse_episode(episode, idx):
+ yield mass_episode
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
mass_item_id=self.podcast_id,
)
- async def _parse_episode(
+ def _parse_episode(
self, episode_obj: dict[str, Any], fallback_position: int
- ) -> PodcastEpisode:
+ ) -> PodcastEpisode | None:
return parse_podcast_episode(
episode=episode_obj,
prov_podcast_id=self.podcast_id,