--- /dev/null
+"""gPodder provider for Music Assistant.
+
+Tested against opodsync, https://github.com/kd2org/opodsync
+and nextcloud-gpodder, https://github.com/thrillfall/nextcloud-gpodder
+gpodder.net is not supported due to responsiveness/ frequent downtimes of domain.
+
+Note:
+ - it can happen, that we have the guid and use that for identification, but the sync state
+ provider, eg. opodsync might use only the stream url. So always make sure, to compare both
+ when relying on an external service
+ - The service calls have a timestamp (int, unix epoch s), which give the changes since then.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import time
+from collections.abc import AsyncGenerator
+from io import BytesIO
+from typing import TYPE_CHECKING, Any
+
+import podcastparser
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ EventType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import (
+ LoginFailed,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
+from music_assistant_models.media_items import (
+ AudioFormat,
+ MediaItemType,
+ Podcast,
+ PodcastEpisode,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.helpers.podcast_parsers import (
+ get_stream_url_and_guid_from_episode,
+ parse_podcast,
+ parse_podcast_episode,
+)
+from music_assistant.models.music_provider import MusicProvider
+from music_assistant.providers.gpodder.client import EpisodeActionNew, GPodderClient
+
+if TYPE_CHECKING:
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+# Config for "classic" gpodder api
+CONF_URL = "url"
+CONF_USERNAME = "username"
+CONF_PASSWORD = "password"
+CONF_DEVICE_ID = "device_id"
+CONF_USING_GPODDER = "using_gpodder" # hidden, bool, true if not nextcloud used
+
+# Config for nextcloud
+CONF_ACTION_AUTH_NC = "authenticate_nc"
+CONF_TOKEN_NC = "token"
+CONF_URL_NC = "url_nc"
+
+# General config
+CONF_VERIFY_SSL = "verify_ssl"
+CONF_MAX_NUM_EPISODES = "max_num_episodes"
+
+CACHE_CATEGORY_PODCAST_ITEMS = 0 # the individual parsed podcast (dict from podcastparser)
+CACHE_CATEGORY_OTHER = 1
+CACHE_KEY_TIMESTAMP = (
+ "timestamp" # tuple of two ints, timestamp_subscriptions and timestamp_actions
+)
+CACHE_KEY_FEEDS = "feeds" # list[str] : all available rss feed urls
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return GPodder(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ if values is None:
+ values = {}
+
+ if action == CONF_ACTION_AUTH_NC:
+ session = mass.http_session
+ response = await session.post(
+ str(values[CONF_URL_NC]).rstrip("/") + "/index.php/login/v2",
+ headers={"User-Agent": "Music Assistant"},
+ )
+ data = await response.json()
+ poll_endpoint = data["poll"]["endpoint"]
+ poll_token = data["poll"]["token"]
+ login_url = data["login"]
+ session_id = str(values["session_id"])
+ mass.signal_event(EventType.AUTH_SESSION, session_id, login_url)
+ while True:
+ response = await session.post(poll_endpoint, data={"token": poll_token})
+ if response.status not in [200, 404]:
+ raise LoginFailed("The specified url seems not to belong to a nextcloud instance.")
+ if response.status == 200:
+ data = await response.json()
+ values[CONF_TOKEN_NC] = data["appPassword"]
+ break
+ await asyncio.sleep(1)
+
+ authenticated_nc = True
+ if values.get(CONF_TOKEN_NC, None) is None:
+ authenticated_nc = False
+
+ using_gpodder = bool(values.get(CONF_USING_GPODDER, False))
+
+ return (
+ ConfigEntry(
+ key="label_text",
+ type=ConfigEntryType.LABEL,
+ label="Authentication did succeed! Please press save to continue.",
+ hidden=not authenticated_nc,
+ ),
+ ConfigEntry(
+ key="label_gpodder",
+ type=ConfigEntryType.LABEL,
+ label="Authentication with gPodder compatible web service, e.g. opodsync:",
+ hidden=authenticated_nc,
+ ),
+ ConfigEntry(
+ key=CONF_URL,
+ type=ConfigEntryType.STRING,
+ label="gPodder Service URL",
+ required=False,
+ description="URL of gPodder instance.",
+ value=values.get(CONF_URL),
+ hidden=authenticated_nc,
+ ),
+ ConfigEntry(
+ key=CONF_USERNAME,
+ type=ConfigEntryType.STRING,
+ label="Username",
+ required=False,
+ description="Username of gPodder instance.",
+ hidden=authenticated_nc,
+ value=values.get(CONF_USERNAME),
+ ),
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Password",
+ required=False,
+ description="Password for gPodder instance.",
+ hidden=authenticated_nc,
+ value=values.get(CONF_PASSWORD),
+ ),
+ ConfigEntry(
+ key=CONF_DEVICE_ID,
+ type=ConfigEntryType.STRING,
+ label="Device ID",
+ required=False,
+ description="Device ID of user.",
+ hidden=authenticated_nc,
+ value=values.get(CONF_DEVICE_ID),
+ ),
+ ConfigEntry(
+ key="label_nextcloud",
+ type=ConfigEntryType.LABEL,
+ label="Authentication with Nextcloud with gPodder Sync (nextcloud-gpodder) installed:",
+ hidden=authenticated_nc or using_gpodder,
+ ),
+ ConfigEntry(
+ key=CONF_URL_NC,
+ type=ConfigEntryType.STRING,
+ label="Nextcloud URL",
+ required=False,
+ description="URL of Nextcloud instance.",
+ value=values.get(CONF_URL_NC),
+ hidden=using_gpodder,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_AUTH_NC,
+ type=ConfigEntryType.ACTION,
+ label="(Re)Authenticate with Nextcloud",
+ description="This button will redirect you to your Nextcloud instance to authenticate.",
+ action=CONF_ACTION_AUTH_NC,
+ required=False,
+ hidden=using_gpodder,
+ ),
+ ConfigEntry(
+ key="label_general",
+ type=ConfigEntryType.LABEL,
+ label="General config:",
+ ),
+ ConfigEntry(
+ key=CONF_MAX_NUM_EPISODES,
+ type=ConfigEntryType.INTEGER,
+ label="Maximum amount of episodes (0 for unlimited)",
+ required=False,
+ description="Maximum amount of episodes to sync per feed. Use 0 for unlimited",
+ default_value=0,
+ value=values.get(CONF_MAX_NUM_EPISODES),
+ ),
+ ConfigEntry(
+ key=CONF_VERIFY_SSL,
+ type=ConfigEntryType.BOOLEAN,
+ label="Verify SSL",
+ required=False,
+ description="Whether or not to verify the certificate of SSL/TLS connections.",
+ category="advanced",
+ default_value=True,
+ value=values.get(CONF_VERIFY_SSL),
+ ),
+ ConfigEntry(
+ key=CONF_TOKEN_NC,
+ type=ConfigEntryType.SECURE_STRING,
+ label="token",
+ hidden=True,
+ required=False,
+ value=values.get(CONF_TOKEN_NC),
+ ),
+ ConfigEntry(
+ key=CONF_USING_GPODDER,
+ type=ConfigEntryType.BOOLEAN,
+ label="using_gpodder",
+ hidden=True,
+ required=False,
+ value=values.get(CONF_USING_GPODDER),
+ ),
+ )
+
+
+class GPodder(MusicProvider):
+ """gPodder MusicProvider."""
+
+ @property
+ def supported_features(self) -> set[ProviderFeature]:
+ """Features supported by this Provider."""
+ return {
+ ProviderFeature.LIBRARY_PODCASTS,
+ ProviderFeature.BROWSE,
+ }
+
+ async def handle_async_init(self) -> None:
+ """Pass config values to client and initialize."""
+ base_url = str(self.config.get_value(CONF_URL))
+ _username = self.config.get_value(CONF_USERNAME)
+ _password = self.config.get_value(CONF_PASSWORD)
+ _device_id = self.config.get_value(CONF_DEVICE_ID)
+ nc_url = str(self.config.get_value(CONF_URL_NC))
+ nc_token = self.config.get_value(CONF_TOKEN_NC)
+
+ self.max_episodes = int(float(str(self.config.get_value(CONF_MAX_NUM_EPISODES))))
+
+ self._client = GPodderClient(session=self.mass.http_session, logger=self.logger)
+
+ if nc_token is not None:
+ assert nc_url is not None
+ self._client.init_nc(base_url=nc_url, nc_token=str(nc_token))
+ else:
+ self.update_config_value(CONF_USING_GPODDER, True)
+ if _username is None or _password is None or _device_id is None:
+ raise LoginFailed("Must provide username, password and device_id.")
+ username = str(_username)
+ password = str(_password)
+ device_id = str(_device_id)
+
+ if base_url.rstrip("/") == "https://gpodder.net":
+ raise LoginFailed("Do not use gpodder.net. See docs for explanation.")
+ try:
+ await self._client.init_gpodder(
+ username=username, password=password, base_url=base_url, device=device_id
+ )
+ except RuntimeError as exc:
+ raise LoginFailed("Login failed.") from exc
+
+ timestamps = await self.mass.cache.get(
+ key=CACHE_KEY_TIMESTAMP,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_OTHER,
+ default=None,
+ )
+ if timestamps is None:
+ self.timestamp_subscriptions: int = 0
+ self.timestamp_actions: int = 0
+ else:
+ self.timestamp_subscriptions, self.timestamp_actions = timestamps
+
+ self.logger.debug(
+ "Our timestamps are (subscriptions, actions) (%s, %s)",
+ self.timestamp_subscriptions,
+ self.timestamp_actions,
+ )
+
+ feeds = await self.mass.cache.get(
+ key=CACHE_KEY_FEEDS,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_OTHER,
+ default=None,
+ )
+ if feeds is None:
+ self.feeds: set[str] = set()
+ else:
+ self.feeds = set(feeds) # feeds is a list here
+
+ # we are syncing the playlog, but not event based. A simple check in on_played,
+ # should be sufficient
+ self.progress_guard_timestamp = 0.0
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ # For streaming providers return True here but for local file based providers return False.
+ # While the streams are remote, the user controls what is added.
+ return False
+
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+ """Retrieve library/subscribed podcasts from the provider."""
+ try:
+ subscriptions = await self._client.get_subscriptions()
+ except RuntimeError:
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
+ if subscriptions is None:
+ return
+
+ for feed_url in subscriptions.add:
+ self.feeds.add(feed_url)
+ for feed_url in subscriptions.remove:
+ try:
+ self.feeds.remove(feed_url)
+ except KeyError:
+ # a podcast might have been added and removed in our absence...
+ continue
+
+ progresses, timestamp_action = await self._client.get_progresses()
+ for feed_url in self.feeds:
+ self.logger.debug("Adding podcast with feed %s to library", feed_url)
+ # parse podcast
+ try:
+ parsed_podcast = await self._get_podcast(feed_url)
+ except RuntimeError:
+ self.logger.warning(f"Was unable to obtain podcast with feed {feed_url}")
+ continue
+ await self._cache_set_podcast(feed_url, parsed_podcast)
+
+ # playlog
+ # be safe, if there should be multiple episodeactions. client already sorts
+ # progresses in descending order.
+ _already_processed = set()
+ _podcast_progresses = [x for x in progresses if x.podcast == feed_url]
+ for _progress in _podcast_progresses:
+ if _progress.episode not in _already_processed:
+ _already_processed.add(_progress.episode)
+ # we do not have to add the progress, these would make calls twice,
+ # and we only use the object to propagate to playlog
+ self.progress_guard_timestamp = time.time()
+ _episode_id = f"{feed_url} {_progress.episode}"
+ mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
+ if isinstance(_progress, EpisodeActionNew):
+ await self.mass.music.mark_item_unplayed(mass_episode)
+ else:
+ await self.mass.music.mark_item_played(
+ mass_episode,
+ fully_played=_progress.position >= _progress.total,
+ seconds_played=_progress.position,
+ )
+
+ # cache
+ yield parse_podcast(
+ feed_url=feed_url,
+ parsed_feed=parsed_podcast,
+ lookup_key=self.lookup_key,
+ domain=self.domain,
+ instance_id=self.instance_id,
+ )
+
+ self.timestamp_subscriptions = subscriptions.timestamp
+ if timestamp_action is not None:
+ self.timestamp_actions = timestamp_action
+ await self._cache_set_timestamps()
+ await self._cache_set_feeds()
+
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get Podcast."""
+ parsed_podcast = await self._cache_get_podcast(prov_podcast_id)
+
+ return parse_podcast(
+ feed_url=prov_podcast_id,
+ parsed_feed=parsed_podcast,
+ lookup_key=self.lookup_key,
+ domain=self.domain,
+ instance_id=self.instance_id,
+ )
+
+ async def get_podcast_episodes(
+ self, prov_podcast_id: str, add_progress: bool = True
+ ) -> AsyncGenerator[PodcastEpisode, None]:
+ """Get Podcast episodes. Add progress information."""
+ if add_progress:
+ progresses, timestamp = await self._client.get_progresses()
+ else:
+ progresses, timestamp = [], None
+
+ podcast = await self._cache_get_podcast(prov_podcast_id)
+ podcast_cover = podcast.get("cover_url")
+ parsed_episodes = podcast.get("episodes", [])
+
+ if timestamp is not None:
+ self.timestamp_actions = timestamp
+ await self._cache_set_timestamps()
+
+ for cnt, parsed_episode in enumerate(parsed_episodes):
+ mass_episode = parse_podcast_episode(
+ episode=parsed_episode,
+ prov_podcast_id=prov_podcast_id,
+ episode_cnt=cnt,
+ podcast_cover=podcast_cover,
+ domain=self.domain,
+ lookup_key=self.lookup_key,
+ instance_id=self.instance_id,
+ )
+ stream_url, guid = get_stream_url_and_guid_from_episode(episode=parsed_episode)
+
+ for progress in progresses:
+ # we have to test both, as we are comparing to external input.
+ _test = [progress.guid, progress.episode]
+ if prov_podcast_id == progress.podcast and (guid in _test or stream_url in _test):
+ self.progress_guard_timestamp = time.time()
+ if isinstance(progress, EpisodeActionNew):
+ mass_episode.resume_position_ms = 0
+ mass_episode.fully_played = False
+
+ # propagate to playlog
+ await self.mass.music.mark_item_unplayed(
+ mass_episode,
+ )
+ else:
+ fully_played = progress.position >= progress.total
+ resume_position_s = progress.position
+ mass_episode.resume_position_ms = resume_position_s * 1000
+ mass_episode.fully_played = fully_played
+
+ # propagate progress to playlog
+ await self.mass.music.mark_item_played(
+ mass_episode,
+ fully_played=fully_played,
+ seconds_played=resume_position_s,
+ )
+ break
+
+ yield mass_episode
+
+ async def get_podcast_episode(
+ self, prov_episode_id: str, add_progress: bool = True
+ ) -> PodcastEpisode:
+ """Get Podcast Episode. Add progress information."""
+ podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
+ async for mass_episode in self.get_podcast_episodes(podcast_id, add_progress=add_progress):
+ _, _guid_or_stream_url = mass_episode.item_id.split(" ")
+ # this is enough, as internal
+ if guid_or_stream_url == _guid_or_stream_url:
+ return mass_episode
+ raise MediaNotFoundError("Did not find episode.")
+
+ async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
+ """Return: finished, position_ms."""
+ assert media_type == MediaType.PODCAST_EPISODE
+ podcast_id, guid_or_stream_url = item_id.split(" ")
+ stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
+ try:
+ progresses, timestamp = await self._client.get_progresses(since=self.timestamp_actions)
+ except RuntimeError:
+ self.logger.warning("Was unable to obtain progresses.")
+ raise NotImplementedError # fallback to internal position.
+ for action in progresses:
+ _test = [action.guid, action.episode]
+ # progress is external, compare guid and stream_url
+ if action.podcast == podcast_id and (
+ guid_or_stream_url in _test or stream_url in _test
+ ):
+ if timestamp is not None:
+ self.timestamp_actions = timestamp
+ await self._cache_set_timestamps()
+ if isinstance(action, EpisodeActionNew):
+ # no progress, it might have been actively reset
+ return False, 0
+ _progress = (action.position >= action.total, max(action.position * 1000, 0))
+ self.logger.debug("Found an updated external resume position.")
+ return action.position >= action.total, max(action.position * 1000, 0)
+ self.logger.debug("Did not find an updated resume position, falling back to stored.")
+ # If we did not find a resume position, nothing changed since our last timestamp
+ # we raise NotImplementedError, such that MA falls back to the already stored
+ # resume_position in its playlog.
+ raise NotImplementedError
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ prov_item_id: str,
+ fully_played: bool,
+ position: int,
+ media_item: MediaItemType,
+ is_playing: bool = False,
+ ) -> None:
+ """Update progress."""
+ if media_item is None or not isinstance(media_item, PodcastEpisode):
+ return
+ if media_type != MediaType.PODCAST_EPISODE:
+ return
+ if time.time() - self.progress_guard_timestamp <= 5:
+ return
+ podcast_id, guid_or_stream_url = prov_item_id.split(" ")
+ stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
+ assert stream_url is not None
+ duration = media_item.duration
+ try:
+ await self._client.update_progress(
+ podcast_id=podcast_id,
+ episode_id=stream_url,
+ guid=guid_or_stream_url,
+ position_s=position,
+ duration_s=duration,
+ )
+ self.logger.debug(f"Updated progress to {position / duration * 100:.2f}%")
+ except RuntimeError as exc:
+ self.logger.debug(exc)
+ self.logger.debug("Failed to update progress.")
+
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ """Get streamdetails for item."""
+ podcast_id, guid_or_stream_url = item_id.split(" ")
+ stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
+ if stream_url is None:
+ raise MediaNotFoundError
+ return StreamDetails(
+ provider=self.lookup_key,
+ item_id=item_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(stream_url),
+ ),
+ media_type=MediaType.PODCAST_EPISODE,
+ stream_type=StreamType.HTTP,
+ path=stream_url,
+ can_seek=True,
+ allow_seek=True,
+ )
+
+ async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
+ podcast = await self._cache_get_podcast(podcast_id)
+ episodes = podcast.get("episodes", [])
+ for cnt, episode in enumerate(episodes):
+ episode_enclosures = episode.get("enclosures", [])
+ if len(episode_enclosures) < 1:
+ raise MediaNotFoundError
+ stream_url: str | None = episode_enclosures[0].get("url", None)
+ if guid_or_stream_url == episode.get("guid", stream_url):
+ return stream_url
+ return None
+
+ async def _get_podcast(self, feed_url: str) -> dict[str, Any]:
+ # see music-assistant/server@6aae82e
+ response = await self.mass.http_session.get(feed_url, headers={"User-Agent": "Mozilla/5.0"})
+ if response.status != 200:
+ raise RuntimeError
+ feed_data = await response.read()
+ feed_stream = BytesIO(feed_data)
+ return podcastparser.parse(feed_url, feed_stream, max_episodes=self.max_episodes) # type: ignore[no-any-return]
+
+ async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
+ parsed_podcast = await self.mass.cache.get(
+ key=prov_podcast_id,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_PODCAST_ITEMS,
+ default=None,
+ )
+ if parsed_podcast is None:
+ parsed_podcast = await self._get_podcast(feed_url=prov_podcast_id)
+ await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
+
+ # this is a dictionary from podcastparser
+ return parsed_podcast # type: ignore[no-any-return]
+
+ async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
+ await self.mass.cache.set(
+ key=feed_url,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_PODCAST_ITEMS,
+ data=parsed_podcast,
+ expiration=60 * 60 * 24, # 1 day
+ )
+
+ async def _cache_set_timestamps(self) -> None:
+ # seven days default
+ await self.mass.cache.set(
+ key=CACHE_KEY_TIMESTAMP,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_OTHER,
+ data=[self.timestamp_subscriptions, self.timestamp_actions],
+ )
+
+ async def _cache_set_feeds(self) -> None:
+ # seven days default
+ await self.mass.cache.set(
+ key=CACHE_KEY_FEEDS,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_OTHER,
+ data=self.feeds,
+ )
--- /dev/null
+"""Simplest client for gPodder.
+
+Should be compatible with Nextcloud App GPodder Sync, and the original api
+of gpodder.net (mygpo) or drop-in replacements like opodsync.
+Gpodder Sync uses guid optionally.
+"""
+
+import datetime
+import logging
+from contextlib import suppress
+from dataclasses import dataclass, field
+from typing import Any
+
+import aiohttp
+from aiohttp.client_exceptions import ClientResponseError
+from mashumaro.config import BaseConfig
+from mashumaro.mixins.json import DataClassJSONMixin
+from mashumaro.types import Discriminator
+
+
+# https://gpoddernet.readthedocs.io/en/latest/api/reference/subscriptions.html#upload-subscription-changes
+@dataclass(kw_only=True)
+class SubscriptionsChangeRequest(DataClassJSONMixin):
+ """SubscriptionChangeRequest."""
+
+ add: list[str] = field(default_factory=list)
+ remove: list[str] = field(default_factory=list)
+
+
+# https://gpoddernet.readthedocs.io/en/latest/api/reference/subscriptions.html#upload-subscription-changes
+@dataclass(kw_only=True)
+class SubscriptionsGet(SubscriptionsChangeRequest):
+ """SubscriptionsGet."""
+
+ timestamp: int
+
+
+def action_tagger(cls: "type[EpisodeAction]") -> list[str]:
+ """Use action field to distinguish classes.
+
+ NC Gpodder uses upper case values, opodsync lower case.
+ This however does not work with a StrEnum, so plain string as action.
+ """
+ action = cls.__name__.replace("EpisodeAction", "")
+ return [action.upper(), action.lower()]
+
+
+@dataclass(kw_only=True)
+class EpisodeAction(DataClassJSONMixin):
+ """General EpisodeAction.
+
+ See https://gpoddernet.readthedocs.io/en/latest/api/reference/events.html
+ """
+
+ class Config(BaseConfig):
+ """Config."""
+
+ discriminator = Discriminator(
+ field="action", include_subtypes=True, variant_tagger_fn=action_tagger
+ )
+ omit_none = True # only nextcloud supports guid
+
+ podcast: str
+ episode: str
+ timestamp: str = ""
+ guid: str | None = None
+
+
+@dataclass(kw_only=True)
+class EpisodeActionDownload(EpisodeAction):
+ """EpisodeActionDownload."""
+
+ action: str = "download"
+
+
+@dataclass(kw_only=True)
+class EpisodeActionDelete(EpisodeAction):
+ """EpisodeActionDelete."""
+
+ action: str = "delete"
+
+
+@dataclass(kw_only=True)
+class EpisodeActionNew(EpisodeAction):
+ """EpisodeActionNew."""
+
+ action: str = "new"
+
+
+@dataclass(kw_only=True)
+class EpisodeActionFlattr(EpisodeAction):
+ """EpisodeActionFlattr."""
+
+ action: str = "flattr"
+
+
+@dataclass(kw_only=True)
+class EpisodeActionPlay(EpisodeAction):
+ """EpisodeActionPlay."""
+
+ action: str = "play"
+
+ # all in seconds
+ started: int = 0
+ position: int = 0
+ total: int = 0
+
+
+@dataclass(kw_only=True)
+class EpisodeActionGet(DataClassJSONMixin):
+ """EpisodeActionGet."""
+
+ actions: list[EpisodeAction]
+ timestamp: int
+
+
+class GPodderClient:
+ """GPodderClient."""
+
+ def __init__(
+ self, session: aiohttp.ClientSession, logger: logging.Logger, verify_ssl: bool = True
+ ) -> None:
+ """Init for GPodderClient."""
+ self.session = session
+ self.verify_ssl = verify_ssl
+
+ self.is_nextcloud = False
+ self.base_url: str
+ self.token: str | None
+
+ self.username: str
+ self.device: str
+ self.auth: aiohttp.BasicAuth | None = None # only for gpodder
+
+ self.logger = logger
+
+ self._nextcloud_prefix = "index.php/apps/gpoddersync"
+
+ def init_nc(self, base_url: str, nc_token: str | None = None) -> None:
+ """Init values for a nextcloud client."""
+ self.is_nextcloud = True
+ self.token = nc_token
+ self.base_url = base_url.rstrip("/")
+
+ async def init_gpodder(self, username: str, password: str, device: str, base_url: str) -> None:
+ """Init via basic auth."""
+ self.username = username
+ self.device = device
+ self.base_url = base_url.rstrip("/")
+ self.auth = aiohttp.BasicAuth(username, password)
+ await self._post(endpoint=f"api/2/auth/{username}/login.json")
+
+ @property
+ def headers(self) -> dict[str, str]:
+ """Session headers."""
+ if self.token is None:
+ raise RuntimeError("Token not set.")
+ return {"Authorization": f"Bearer {self.token}"}
+
+ async def _post(
+ self,
+ endpoint: str,
+ data: dict[str, Any] | list[Any] | None = None,
+ ) -> bytes:
+ """POST request."""
+ try:
+ response = await self.session.post(
+ f"{self.base_url}/{endpoint}",
+ json=data,
+ ssl=self.verify_ssl,
+ headers=self.headers if self.is_nextcloud else None,
+ raise_for_status=True,
+ auth=self.auth,
+ )
+ except ClientResponseError as exc:
+ self.logger.debug(exc)
+ raise RuntimeError(f"API POST call to {endpoint} failed.") from exc
+ if response.status != 200:
+ self.logger.debug(f"Call failed with status {response.status}")
+ raise RuntimeError(f"Api post call failed to {endpoint} failed!")
+ return await response.read()
+
+ async def _get(self, endpoint: str, params: dict[str, str | int] | None = None) -> bytes:
+ """GET request."""
+ response = await self.session.get(
+ f"{self.base_url}/{endpoint}",
+ params=params,
+ ssl=self.verify_ssl,
+ headers=self.headers if self.is_nextcloud else None,
+ auth=self.auth,
+ )
+ status = response.status
+ if response.content_type == "application/json" and status == 200:
+ return await response.read()
+ if status == 404:
+ return b""
+ self.logger.debug(f"Call failed with status {response.status}")
+ raise RuntimeError(f"API GET call to {endpoint} failed.")
+
+ async def get_subscriptions(self, since: int = 0) -> SubscriptionsGet | None:
+ """Get subscriptions.
+
+ since is unix time epoch - this may return none if there are no
+ subscriptions.
+ """
+ if self.is_nextcloud:
+ endpoint = f"{self._nextcloud_prefix}/subscriptions"
+ else:
+ endpoint = f"api/2/subscriptions/{self.username}/{self.device}.json"
+
+ response = await self._get(endpoint, params={"since": since})
+ if not response:
+ return None
+ return SubscriptionsGet.from_json(response)
+
+ async def get_progresses(
+ self, since: int = 0
+ ) -> tuple[list[EpisodeActionPlay | EpisodeActionNew], int | None]:
+ """Get progresses. Timestamp is second return value.
+
+ gpodder net may filter by podcast
+ https://gpoddernet.readthedocs.io/en/latest/api/reference/events.html
+ -> we do not use this for now, since nextcloud implementation is not
+ capable of it. Also, implementation in drop-in replacements varies.
+ """
+ params: dict[str, str | int] = {"since": since}
+ if self.is_nextcloud:
+ endpoint = f"{self._nextcloud_prefix}/episode_action"
+ else:
+ endpoint = f"api/2/episodes/{self.username}.json"
+ params["device"] = self.device
+ response = await self._get(endpoint, params=params)
+ if not response:
+ return [], None
+ actions_response = EpisodeActionGet.from_json(response)
+
+ # play has progress information
+ # new means, there is no progress (i.e. mark unplayed)
+ actions = [
+ x
+ for x in actions_response.actions
+ if isinstance(x, EpisodeActionPlay | EpisodeActionNew)
+ ]
+
+ with suppress(ValueError):
+ actions = sorted(actions, key=lambda x: datetime.datetime.fromisoformat(x.timestamp))[
+ ::-1
+ ]
+
+ return actions, actions_response.timestamp
+
+ async def update_subscriptions(
+ self, add: list[str] | None = None, remove: list[str] | None = None
+ ) -> None:
+ """Update subscriptions."""
+ if add is None:
+ add = []
+ if remove is None:
+ remove = []
+ request = SubscriptionsChangeRequest(add=add, remove=remove)
+ if self.is_nextcloud:
+ endpoint = f"{self._nextcloud_prefix}/subscription_change/create"
+ else:
+ endpoint = f"api/2/subscriptions/{self.username}/{self.device}.json"
+
+ await self._post(endpoint=endpoint, data=request.to_dict())
+
+ async def update_progress(
+ self,
+ *,
+ podcast_id: str,
+ episode_id: str,
+ guid: str | None,
+ position_s: float,
+ duration_s: float,
+ ) -> None:
+ """Update progress."""
+ utc_timestamp = (
+ datetime.datetime.now(datetime.UTC).replace(microsecond=0, tzinfo=None).isoformat()
+ )
+
+ episode_action: EpisodeActionNew | EpisodeActionPlay
+ if position_s == 0:
+ # mark unplayed
+ episode_action = EpisodeActionNew(
+ podcast=podcast_id, episode=episode_id, timestamp=utc_timestamp
+ )
+ else:
+ episode_action = EpisodeActionPlay(
+ podcast=podcast_id,
+ episode=episode_id,
+ timestamp=utc_timestamp,
+ position=int(position_s),
+ started=0,
+ total=int(duration_s),
+ )
+
+ # It is a bit unclear here, if other gpodder alternatives then nextcloud support the guid
+ # for episodes. I didn't see that in the source for opodsync at least...
+ if self.is_nextcloud:
+ episode_action.guid = guid
+ endpoint = f"{self._nextcloud_prefix}/episode_action/create"
+ else:
+ endpoint = f"api/2/episodes/{self.username}.json"
+ await self._post(endpoint=endpoint, data=[episode_action.to_dict()])