from __future__ import annotations
-import asyncio
from collections.abc import AsyncGenerator, Sequence
from typing import Any, cast
from .constants import (
BROWSE_CATEGORIES,
- BROWSE_MY_SUBSCRIPTIONS,
BROWSE_RECENT,
BROWSE_TRENDING,
CONF_API_KEY,
if path == base:
# Return main browse categories
return [
- BrowseFolder(
- item_id=BROWSE_MY_SUBSCRIPTIONS,
- provider=self.domain,
- path=f"{base}{BROWSE_MY_SUBSCRIPTIONS}",
- name="My Subscriptions",
- ),
BrowseFolder(
item_id=BROWSE_TRENDING,
provider=self.domain,
subpath_parts = path[len(base) :].split("/")
subpath = subpath_parts[0] if subpath_parts else ""
- if subpath == BROWSE_MY_SUBSCRIPTIONS:
- return await self._browse_subscriptions()
- elif subpath == BROWSE_TRENDING:
+ if subpath == BROWSE_TRENDING:
return await self._browse_trending()
elif subpath == BROWSE_RECENT:
return await self._browse_recent_episodes()
return []
- async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
- """
- Retrieve subscribed podcasts from the provider.
-
- Uses MA's cache system and concurrent fetching to minimize API calls
- and improve performance for multiple subscriptions.
- """
- stored_podcasts = cast("list[str]", self.config.get_value(CONF_STORED_PODCASTS))
-
- if not stored_podcasts:
- return
-
- async def fetch_podcast(feed_url: str) -> Podcast | None:
- """Fetch a single podcast."""
- try:
- # Fetch from API
- response = await self._api_request("podcasts/byfeedurl", params={"url": feed_url})
- if response.get("feed"):
- return parse_podcast_from_feed(
- response["feed"], self.lookup_key, self.domain, self.instance_id
- )
-
- except (ProviderUnavailableError, InvalidDataError) as err:
- self.logger.warning("Failed to get podcast %s: %s", feed_url, err)
- except Exception as err:
- self.logger.warning("Unexpected error getting podcast %s: %s", feed_url, err)
- return None
-
- # Fetch podcasts concurrently with retry
- semaphore = asyncio.Semaphore(3) # Max 3 concurrent requests
-
- async def fetch_with_semaphore_and_retry(feed_url: str) -> Podcast | None:
- async with semaphore:
- # Try once
- result = await fetch_podcast(feed_url)
- if result:
- return result
-
- # Retry once after 10 seconds
- await asyncio.sleep(10)
- return await fetch_podcast(feed_url)
-
- # Gather all podcast fetches concurrently with retry
- tasks = [fetch_with_semaphore_and_retry(feed_url) for feed_url in stored_podcasts]
- podcasts = await asyncio.gather(*tasks, return_exceptions=True)
-
- # Yield successfully fetched podcasts
- for podcast in podcasts:
- if isinstance(podcast, Podcast):
- yield podcast
-
async def library_add(self, item: MediaItemType) -> bool:
"""
Add podcast to library.
)
return None
- async def _browse_subscriptions(self) -> list[Podcast]:
- """Browse user subscriptions."""
- try:
- return [podcast async for podcast in self.get_library_podcasts()]
- except (ProviderUnavailableError, InvalidDataError):
- # Re-raise these specific errors
- raise
- except Exception as err:
- self.logger.warning("Unexpected error browsing subscriptions: %s", err, exc_info=True)
- return []
-
@use_cache(7200) # Cache for 2 hours
async def _browse_trending(self) -> list[Podcast]:
"""Browse trending podcasts."""