from __future__ import annotations
-import asyncio
from typing import TYPE_CHECKING, Any
from music_assistant_models.enums import MediaType, ProviderFeature
from music_assistant_models.errors import InvalidDataError
-from music_assistant_models.media_items import Artist, Audiobook, Chapter, UniqueList
+from music_assistant_models.media_items import Artist, Audiobook, UniqueList
-from music_assistant.constants import DB_TABLE_AUDIOBOOKS, DB_TABLE_PLAYLOG
+from music_assistant.constants import DB_TABLE_AUDIOBOOKS
from music_assistant.controllers.media.base import MediaControllerBase
from music_assistant.helpers.compare import (
compare_audiobook,
FROM audiobooks""" # noqa: E501
# register (extra) api handlers
api_base = self.api_base
- self.mass.register_api_command(f"music/{api_base}/audiobook_chapters", self.chapters)
self.mass.register_api_command(f"music/{api_base}/audiobook_versions", self.versions)
async def library_items(
)
return result
- async def chapters(
- self,
- item_id: str,
- provider_instance_id_or_domain: str,
- ) -> UniqueList[Chapter]:
- """Return audiobook chapters for the given provider audiobook id."""
- if library_audiobook := await self.get_library_item_by_prov_id(
- item_id, provider_instance_id_or_domain
- ):
- # return items from first/only provider
- for provider_mapping in library_audiobook.provider_mappings:
- return await self._get_provider_audiobook_chapters(
- provider_mapping.item_id, provider_mapping.provider_instance
- )
- return await self._get_provider_audiobook_chapters(item_id, provider_instance_id_or_domain)
-
async def versions(
self,
item_id: str,
"metadata": serialize_to_json(item.metadata),
"external_ids": serialize_to_json(item.external_ids),
"publisher": item.publisher,
- "total_chapters": item.total_chapters,
"authors": serialize_to_json(item.authors),
"narrators": serialize_to_json(item.narrators),
+ "duration": item.duration,
},
)
# update/set provider_mappings table
update.external_ids if overwrite else cur_item.external_ids
),
"publisher": cur_item.publisher or update.publisher,
- "total_chapters": cur_item.total_chapters or update.total_chapters,
"authors": serialize_to_json(
update.authors if overwrite else cur_item.authors or update.authors
),
"narrators": serialize_to_json(
update.narrators if overwrite else cur_item.narrators or update.narrators
),
+ "duration": update.duration or update.duration,
},
)
# update/set provider_mappings table
await self._set_provider_mappings(db_id, provider_mappings, overwrite)
self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
- async def _get_provider_audiobook_chapters(
- self, item_id: str, provider_instance_id_or_domain: str
- ) -> list[Chapter]:
- """Return audiobook chapters for the given provider audiobook id."""
- prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
- if prov is None:
- return []
- # grab the chapters from the provider
- # note that we do not cache any of this because its
- # always a rather small list and we want fresh resume info
- items = await prov.get_audiobook_chapters(item_id)
-
- async def set_resume_position(chapter: Chapter) -> None:
- if chapter.fully_played is not None or chapter.resume_position_ms:
- return
- # TODO: inject resume position info here for providers that do not natively provide it
- resume_info_db_row = await self.mass.music.database.get_row(
- DB_TABLE_PLAYLOG,
- {
- "item_id": chapter.item_id,
- "provider": prov.lookup_key,
- "media_type": MediaType.CHAPTER,
- },
- )
- if resume_info_db_row is None:
- return
- if resume_info_db_row["seconds_played"]:
- chapter.resume_position_ms = int(resume_info_db_row["seconds_played"] * 1000)
- if resume_info_db_row["fully_played"] is not None:
- chapter.fully_played = resume_info_db_row["fully_played"]
-
- await asyncio.gather(*[set_resume_position(chapter) for chapter in items])
- return items
-
async def radio_mode_base_tracks(
self,
item_id: str,
from music_assistant_models.enums import MediaType, ProviderFeature
from music_assistant_models.errors import InvalidDataError
-from music_assistant_models.media_items import Artist, Episode, Podcast, UniqueList
+from music_assistant_models.media_items import (
+ Artist,
+ Podcast,
+ PodcastEpisode,
+ UniqueList,
+)
from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PODCASTS
from music_assistant.controllers.media.base import MediaControllerBase
# register (extra) api handlers
api_base = self.api_base
self.mass.register_api_command(f"music/{api_base}/podcast_episodes", self.episodes)
+ self.mass.register_api_command(f"music/{api_base}/podcast_episode", self.episode)
self.mass.register_api_command(f"music/{api_base}/podcast_versions", self.versions)
async def library_items(
self,
item_id: str,
provider_instance_id_or_domain: str,
- ) -> UniqueList[Episode]:
+ ) -> UniqueList[PodcastEpisode]:
"""Return podcast episodes for the given provider podcast id."""
# always check if we have a library item for this podcast
if library_podcast := await self.get_library_item_by_prov_id(
):
# return items from first/only provider
for provider_mapping in library_podcast.provider_mappings:
- return await self._get_provider_podcast_episodes(
+ episodes = await self._get_provider_podcast_episodes(
provider_mapping.item_id, provider_mapping.provider_instance
)
- return await self._get_provider_podcast_episodes(item_id, provider_instance_id_or_domain)
+ return sorted(episodes, key=lambda x: x.position)
+ episodes = await self._get_provider_podcast_episodes(
+ item_id, provider_instance_id_or_domain
+ )
+ return sorted(episodes, key=lambda x: x.position)
+
+ async def episode(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> UniqueList[PodcastEpisode]:
+ """Return single podcast episode by the given provider podcast id."""
+ prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
+ if not prov:
+ raise InvalidDataError("Provider not found")
+ return await prov.get_podcast_episode(item_id)
async def versions(
self,
async def _get_provider_podcast_episodes(
self, item_id: str, provider_instance_id_or_domain: str
- ) -> list[Episode]:
+ ) -> list[PodcastEpisode]:
"""Return podcast episodes for the given provider podcast id."""
prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
# always a rather small list and we want fresh resume info
items = await prov.get_podcast_episodes(item_id)
- async def set_resume_position(episode: Episode) -> None:
+ async def set_resume_position(episode: PodcastEpisode) -> None:
if episode.fully_played is not None or episode.resume_position_ms:
return
# TODO: inject resume position info here for providers that do not natively provide it
{
"item_id": episode.item_id,
"provider": prov.lookup_key,
- "media_type": MediaType.EPISODE,
+ "media_type": MediaType.PODCAST_EPISODE,
},
)
if resume_info_db_row is None:
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
CONF_ADD_LIBRARY_ON_PLAY = "add_library_on_play"
-DB_SCHEMA_VERSION: Final[int] = 12
+DB_SCHEMA_VERSION: Final[int] = 13
class MusicController(CoreController):
else:
back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
prepend_items.append(
- BrowseFolder(item_id="back", provider=provider_instance, path=back_path, name="..")
+ BrowseFolder(
+ item_id="back",
+ provider=provider_instance,
+ path=back_path,
+ name="..",
+ )
)
# limit -1 to account for the prepended items
prov_items = await prov.browse(path=path)
if provider_instance_id_or_domain == "builtin":
# handle special case of 'builtin' MusicProvider which allows us to play regular url's
return await self.mass.get_provider("builtin").parse_item(item_id)
+ if media_type == MediaType.PODCAST_EPISODE:
+ # special case for podcast episodes
+ return await self.podcasts.episode(item_id, provider_instance_id_or_domain)
ctrl = self.get_controller(media_type)
return await ctrl.get(
item_id=item_id,
continue
with suppress(MediaNotFoundError):
media_item = await ctrl.get_provider_item(
- prov_mapping.item_id, prov_mapping.provider_instance, force_refresh=True
+ prov_mapping.item_id,
+ prov_mapping.provider_instance,
+ force_refresh=True,
)
provider = media_item.provider
item_id = media_item.item_id
return self.playlists
if media_type == MediaType.AUDIOBOOK:
return self.audiobooks
- if media_type == MediaType.CHAPTER:
- return self.audiobooks
- if media_type == MediaType.EPISODE:
- return self.podcasts
if media_type == MediaType.PODCAST:
return self.podcasts
- if media_type == MediaType.EPISODE:
+ if media_type == MediaType.PODCAST_EPISODE:
return self.podcasts
return None
# cleanup media items from db matched to deleted provider
self.logger.info(
- "Removing provider %s from library, this can take a a while...", provider_instance
+ "Removing provider %s from library, this can take a a while...",
+ provider_instance,
)
errors = 0
for ctrl in (
DB_TABLE_PLAYLOG, f"timestamp < strftime('%s','now') - {3600 * 24 * 90}"
)
# db tables cleanup
- for ctrl in (self.albums, self.artists, self.tracks, self.playlists, self.radio):
+ for ctrl in (
+ self.albums,
+ self.artists,
+ self.tracks,
+ self.playlists,
+ self.radio,
+ ):
# Provider mappings where the db item is removed
query = (
f"item_id not in (SELECT item_id from {ctrl.db_table}) "
await self.database.execute("DROP TABLE IF EXISTS track_loudness")
if prev_version <= 10:
- # recreate db tables for audiobooks and podcasts due to some mistakes in early version
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_AUDIOBOOKS}")
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_PODCASTS}")
- await self.__create_database_tables()
+ # add new columns to playlog table
try:
await self.database.execute(
f"ALTER TABLE {DB_TABLE_PLAYLOG} ADD COLUMN fully_played BOOLEAN"
if "duplicate column" not in str(err):
raise
- if prev_version <= 11:
+ if prev_version <= 12:
# Need to drop the NOT NULL requirement on podcasts.publisher and audiobooks.publisher
# However, because there is no ALTER COLUMN support in sqlite, we will need
# to create the tables again.
[version] TEXT,
[favorite] BOOLEAN DEFAULT 0,
[publisher] TEXT,
- [total_chapters] INTEGER,
[authors] json NOT NULL,
[narrators] json NOT NULL,
[metadata] json NOT NULL,
+ [duration] INTEGER,
[external_ids] json NOT NULL,
[play_count] INTEGER DEFAULT 0,
[last_played] INTEGER DEFAULT 0,
UnsupportedFeaturedException,
)
from music_assistant_models.media_items import (
- Chapter,
- Episode,
MediaItemType,
PlayableMediaItemType,
Playlist,
+ PodcastEpisode,
media_from_dict,
)
from music_assistant_models.player import PlayerMedia
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
-from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, MASS_LOGO_ONLINE
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_FLOW_MODE,
+ DB_TABLE_PLAYLOG,
+ MASS_LOGO_ONLINE,
+)
from music_assistant.helpers.api import api_command
from music_assistant.helpers.audio import get_stream_details
from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
-CONF_DEFAULT_ENQUEUE_OPTION_CHAPTER = "default_enqueue_option_chapter"
CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
-CONF_DEFAULT_ENQUEUE_OPTION_EPISODE = "default_enqueue_option_episode"
+CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
hidden=True,
),
ConfigEntry(
- key=CONF_DEFAULT_ENQUEUE_OPTION_CHAPTER,
- type=ConfigEntryType.STRING,
- default_value=QueueOption.REPLACE.value,
- label="Default enqueue option for Audiobook-chapter item(s).",
- options=enqueue_options,
- hidden=True,
- ),
- ConfigEntry(
- key=CONF_DEFAULT_ENQUEUE_OPTION_EPISODE,
+ key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
type=ConfigEntryType.STRING,
default_value=QueueOption.REPLACE.value,
label="Default enqueue option for Podcast-episode item(s).",
result.append(playlist_track)
return result
- async def get_next_audio_book_chapters(
- self, audio_book: Audiobook | None, chapter: Chapter | str | None
- ) -> UniqueList[Chapter]:
- """Return (next) chapter(s) and resume point for given audio book."""
- if audio_book is None and isinstance(chapter, str | NoneType):
- raise InvalidDataError("Either audio_book or chapter must be provided")
- if audio_book is None:
- audio_book = chapter.audiobook
+ async def get_audiobook_resume_point(
+ self, audio_book: Audiobook, chapter: str | int | None = None
+ ) -> int:
+ """Return resume point (in milliseconds) for given audio book."""
self.logger.debug(
- "Fetching chapter(s) and resume point to play for audio book %s",
+ "Fetching resume point to play for audio book %s",
audio_book.name,
)
- all_chapters = await self.mass.music.audiobooks.chapters(
- audio_book.item_id, audio_book.provider
- )
- # if a chapter was provided, a user explicitly selected a chapter to play
- # so we need to find the index of the chapter in the list
- if isinstance(chapter, Chapter):
- chapter = next((x for x in all_chapters if x.uri == chapter.uri), None)
- elif isinstance(chapter, str):
- chapter = next((x for x in all_chapters if x.uri == chapter), None)
- else:
- # get first chapter that is not fully played
- chapter = next((x for x in all_chapters if not x.fully_played), None)
- if chapter is None:
- # no chapters found that are not fully played, so we start at the beginning
- chapter = next((x for x in all_chapters), None)
- if chapter is None:
+ if chapter is not None:
+ # user explicitly selected a chapter to play
+ if isinstance(chapter, str):
+ start_chapter = int(chapter)
+ if chapters := audio_book.metadata.chapters:
+ if _chapter := next((x for x in chapters if x.position == start_chapter), None):
+ return _chapter.start * 1000
raise InvalidDataError(
- f"Unable to resolve chapter to play for Audio Book {audio_book.name}"
+ f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
)
- # get the index of the chapter
- chapter_index = all_chapters.index(chapter)
- # return the (remaining) chapter(s) to play
- return all_chapters[chapter_index:]
+ # prefer the resume point from the provider's item
+ for prov_mapping in audio_book.provider_mappings:
+ if not (provider := self.mass.get_provider(prov_mapping.provider_instance)):
+ continue
+ if provider_item := await provider.get_audiobook(prov_mapping.item_id):
+ if provider_item.fully_played:
+ return 0
+ if provider_item.resume_position_ms is not None:
+ return provider_item.resume_position_ms
+ # fallback to the resume point from the playlog (if available)
+ resume_info_db_row = await self.mass.music.database.get_row(
+ DB_TABLE_PLAYLOG,
+ {
+ "item_id": prov_mapping.item_id,
+ "provider": provider.lookup_key,
+ "media_type": MediaType.AUDIOBOOK,
+ },
+ )
+ if resume_info_db_row is None:
+ continue
+ if resume_info_db_row["fully_played"]:
+ return 0
+ if resume_info_db_row["seconds_played"]:
+ return int(resume_info_db_row["seconds_played"] * 1000)
+ return 0
async def get_next_podcast_episodes(
- self, podcast: Podcast | None, episode: Episode | str | None
- ) -> UniqueList[Episode]:
+ self, podcast: Podcast | None, episode: PodcastEpisode | str | None
+ ) -> UniqueList[PodcastEpisode]:
"""Return (next) episode(s) and resume point for given podcast."""
if podcast is None and isinstance(episode, str | NoneType):
raise InvalidDataError("Either podcast or episode must be provided")
all_episodes = await self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
# if a episode was provided, a user explicitly selected a episode to play
# so we need to find the index of the episode in the list
- if isinstance(episode, Episode):
+ if isinstance(episode, PodcastEpisode):
episode = next((x for x in all_episodes if x.uri == episode.uri), None)
elif isinstance(episode, str):
- episode = next((x for x in all_episodes if x.uri == episode), None)
+ episode = next((x for x in all_episodes if episode in (x.uri, x.item_id)), None)
else:
# get first episode that is not fully played
episode = next((x for x in all_episodes if not x.fully_played), None)
)
return await self.get_album_tracks(media_item, start_item)
if media_item.media_type == MediaType.AUDIOBOOK:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
- return await self.get_next_audio_book_chapters(media_item, start_item)
- if media_item.media_type == MediaType.CHAPTER:
- return await self.get_next_audio_book_chapters(None, media_item)
+ if resume_point := await self.get_audiobook_resume_point(media_item, start_item):
+ media_item.resume_position_ms = resume_point
+ return [media_item]
if media_item.media_type == MediaType.PODCAST:
self.mass.create_task(
self.mass.music.mark_item_played(
)
)
return await self.get_next_podcast_episodes(media_item, start_item or media_item)
- if media_item.media_type == MediaType.EPISODE:
+ if media_item.media_type == MediaType.PODCAST_EPISODE:
return await self.get_next_podcast_episodes(None, media_item)
# all other: single track or radio item
return [media_item]
# NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
play_log_entry.seconds_streamed
if play_log_entry.seconds_streamed is not None
- else play_log_entry.duration
+ else play_log_entry.duration or 3600 * 24 * 7
)
if elapsed_time_queue_total > (queue_item_duration + played_time):
# total elapsed time is more than (streamed) track duration
if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
return True
# for strict matching we REQUIRE both items to be a real Podcast object
- assert isinstance(base_item, Audiobook)
- assert isinstance(compare_item, Audiobook)
+ assert isinstance(base_item, Podcast)
+ assert isinstance(compare_item, Podcast)
# compare publisher
return not (
base_item.publisher
base_versions = sorted(base_version.lower().split(" "))
compare_versions = sorted(compare_version.lower().split(" "))
# filter out words we can ignore (such as 'version')
- ignore_words = [*IGNORE_VERSIONS, "version", "edition", "variant", "versie", "versione"]
+ ignore_words = [
+ *IGNORE_VERSIONS,
+ "version",
+ "edition",
+ "variant",
+ "versie",
+ "versione",
+ ]
base_versions = [x for x in base_versions if x not in ignore_words]
compare_versions = [x for x in compare_versions if x not in ignore_words]
Artist,
Audiobook,
BrowseFolder,
- Chapter,
- Episode,
ItemMapping,
MediaItemType,
Playlist,
Podcast,
+ PodcastEpisode,
Radio,
SearchResults,
Track,
if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
raise NotImplementedError
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get (full) podcast episode details by id."""
+ if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
+ raise NotImplementedError
+
async def get_album_tracks(
self,
prov_album_id: str, # type: ignore[return]
if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
raise NotImplementedError
- async def get_audiobook_chapters(
- self,
- prov_audiobook_id: str,
- ) -> list[Chapter]:
- """Get all Chapters for given audiobook id."""
- if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
- raise NotImplementedError
-
async def get_podcast_episodes(
self,
prov_podcast_id: str,
- ) -> list[Episode]:
- """Get all Episodes for given podcast id."""
+ ) -> list[PodcastEpisode]:
+ """Get all PodcastEpisodes for given podcast id."""
if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
raise NotImplementedError
return await self.get_audiobook(prov_item_id)
if media_type == MediaType.PODCAST:
return await self.get_podcast(prov_item_id)
- if media_type == MediaType.CHAPTER:
- return await self.get_chapter(prov_item_id)
- if media_type == MediaType.EPISODE:
- return await self.get_episode(prov_item_id)
+ if media_type == MediaType.PODCAST_EPISODE:
+ return await self.get_podcast_episode(prov_item_id)
return await self.get_track(prov_item_id)
async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping]: # noqa: PLR0915
Album,
Artist,
AudioFormat,
- Episode,
ItemMapping,
MediaItemImage,
Playlist,
Podcast,
+ PodcastEpisode,
ProviderMapping,
SearchResults,
Track,
return podcast
- def _parse_epsiode(self, sonic_episode: SonicEpisode, sonic_channel: SonicPodcast) -> Episode:
+ def _parse_epsiode(
+ self, sonic_episode: SonicEpisode, sonic_channel: SonicPodcast
+ ) -> PodcastEpisode:
eid = f"{sonic_episode.channel_id}{EP_CHAN_SEP}{sonic_episode.id}"
pos = 1
for ep in sonic_channel.episodes:
break
pos += 1
- episode = Episode(
+ episode = PodcastEpisode(
item_id=eid,
provider=self.domain,
name=sonic_episode.title,
raise MediaNotFoundError(msg) from e
return self._parse_playlist(sonic_playlist)
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get (full) podcast episode details by id."""
+ podcast_id, _ = prov_episode_id.split(EP_CHAN_SEP)
+ for episode in await self.get_podcast_episodes(podcast_id):
+ if episode.item_id == prov_episode_id:
+ return episode
+ msg = f"Episode {prov_episode_id} not found"
+ raise MediaNotFoundError(msg)
+
async def get_podcast_episodes(
self,
prov_podcast_id: str,
- ) -> list[Episode]:
+ ) -> list[PodcastEpisode]:
"""Get all Episodes for given podcast id."""
if not self._enable_podcasts:
return []
)
self.mass.create_task(self._report_playback_started(item_id))
- elif media_type == MediaType.EPISODE:
+ elif media_type == MediaType.PODCAST_EPISODE:
item: SonicEpisode = await self._get_podcast_episode(item_id)
self.logger.debug(
from music_assistant_models.errors import InvalidProviderURI, MediaNotFoundError
from music_assistant_models.media_items import (
AudioFormat,
- Episode,
ItemMapping,
MediaItemImage,
Podcast,
+ PodcastEpisode,
ProviderMapping,
)
from music_assistant_models.streamdetails import StreamDetails
@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return {ProviderFeature.BROWSE, ProviderFeature.SEARCH, ProviderFeature.LIBRARY_PODCASTS}
+ return {
+ ProviderFeature.BROWSE,
+ ProviderFeature.LIBRARY_PODCASTS,
+ }
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
else:
raise Exception(f"Podcast id not in provider: {prov_podcast_id}")
- async def get_episode(self, prov_episode_id: str) -> Episode:
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
"""Get (full) podcast episode details by id."""
for episode in self.parsed["episodes"]:
- if prov_episode_id in episode["guid"]:
+ if prov_episode_id == episode["guid"]:
return await self._parse_episode(episode)
raise MediaNotFoundError("Track not found")
async def get_podcast_episodes(
self,
prov_podcast_id: str,
- ) -> list[Episode]:
+ ) -> list[PodcastEpisode]:
"""List all episodes for the podcast."""
episodes = []
-
for episode in self.parsed["episodes"]:
- episodes.append(await self._parse_episode(episode, prov_podcast_id))
-
+ episodes.append(await self._parse_episode(episode))
return episodes
async def get_stream_details(
) -> StreamDetails:
"""Get streamdetails for a track/radio."""
for episode in self.parsed["episodes"]:
- if item_id in episode["guid"]:
+ if item_id == episode["guid"]:
return StreamDetails(
provider=self.instance_id,
item_id=item_id,
# hard coded to unknown, so ffmpeg figures out
content_type=ContentType.UNKNOWN,
),
- media_type=MediaType.PODCAST,
+ media_type=MediaType.PODCAST_EPISODE,
stream_type=StreamType.HTTP,
path=episode["enclosures"][0]["url"],
)
total_episodes=len(self.parsed["episodes"]),
provider_mappings={
ProviderMapping(
- item_id=self.parsed["title"],
+ item_id=self.podcast_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
)
return podcast
- async def _parse_episode(self, track_obj: dict, prov_podcast_id: str) -> Episode:
- name = track_obj["title"]
- track_id = track_obj["guid"]
- episode = Episode(
- item_id=track_id,
+ async def _parse_episode(self, episode_obj: dict) -> PodcastEpisode:
+ name = episode_obj["title"]
+ item_id = episode_obj["guid"]
+ episode = PodcastEpisode(
+ item_id=item_id,
provider=self.domain,
name=name,
- duration=track_obj["total_time"],
+ duration=episode_obj["total_time"],
+ position=episode_obj["number"],
podcast=ItemMapping(
- item_id=prov_podcast_id,
+ item_id=self.podcast_id,
provider=self.instance_id,
name=self.parsed["title"],
media_type=MediaType.PODCAST,
),
provider_mappings={
ProviderMapping(
- item_id=track_id,
+ item_id=item_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.MP3,
),
- url=track_obj["link"],
+ url=episode_obj["link"],
)
},
)
- if "episode_art_url" in track_obj:
+ if "episode_art_url" in episode_obj:
episode.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
- path=track_obj["episode_art_url"],
+ path=episode_obj["episode_art_url"],
provider=self.lookup_key,
remotely_accessible=True,
)
]
- episode.metadata.description = track_obj["description"]
- episode.metadata.explicit = track_obj["explicit"]
+ episode.metadata.description = episode_obj["description"]
+ episode.metadata.explicit = episode_obj["explicit"]
return episode
Artist,
Audiobook,
AudioFormat,
- Chapter,
- Episode,
ItemMapping,
+ MediaItemChapter,
MediaItemImage,
MediaItemMetadata,
Podcast,
+ PodcastEpisode,
ProviderMapping,
Track,
UniqueList,
item_id=prov_audiobook_id,
provider=self.instance_id,
name=f"Test Audiobook {prov_audiobook_id}",
- metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB])),
+ metadata=MediaItemMetadata(
+ images=UniqueList([DEFAULT_THUMB]),
+ description="This is a description for Test Audiobook",
+ chapters=[
+ MediaItemChapter(position=1, name="Chapter 1", start=10, end=20),
+ MediaItemChapter(position=2, name="Chapter 2", start=20, end=40),
+ MediaItemChapter(position=2, name="Chapter 3", start=40),
+ ],
+ ),
provider_mappings={
ProviderMapping(
item_id=prov_audiobook_id,
)
},
publisher="Test Publisher",
- total_chapters=10,
authors=UniqueList(["AudioBook Author"]),
narrators=UniqueList(["AudioBook Narrator"]),
+ duration=60,
)
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
track_item_id = f"{artist_idx}_{album_idx}_{track_idx}"
yield await self.get_track(track_item_id)
- async def get_library_podcasts(self) -> AsyncGenerator[Track, None]:
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
"""Retrieve library tracks from the provider."""
num_podcasts = self.config.get_value(CONF_KEY_NUM_PODCASTS)
for podcast_idx in range(num_podcasts):
for audiobook_idx in range(num_audiobooks):
yield await self.get_audiobook(str(audiobook_idx))
- async def get_audiobook_chapters(
- self,
- prov_audiobook_id: str,
- ) -> list[Chapter]:
- """Get all Chapters for given audiobook id."""
- num_chapters = 25
- return [
- Chapter(
- item_id=f"{prov_audiobook_id}_{chapter_idx}",
- provider=self.instance_id,
- name=f"Test Chapter {prov_audiobook_id}-{chapter_idx}",
- duration=60,
- audiobook=ItemMapping(
- item_id=prov_audiobook_id,
- provider=self.instance_id,
- name=f"Test Audiobook {prov_audiobook_id}",
- media_type=MediaType.AUDIOBOOK,
- image=DEFAULT_THUMB,
- ),
- provider_mappings={
- ProviderMapping(
- item_id=f"{prov_audiobook_id}_{chapter_idx}",
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
- },
- metadata=MediaItemMetadata(
- description="This is a description for "
- f"Test Chapter {chapter_idx} of Test Audiobook {prov_audiobook_id}"
- ),
- position=chapter_idx,
- )
- for chapter_idx in range(num_chapters)
- ]
-
async def get_podcast_episodes(
self,
prov_podcast_id: str,
- ) -> list[Episode]:
- """Get all Episodes for given podcast id."""
+ ) -> list[PodcastEpisode]:
+ """Get all PodcastEpisodes for given podcast id."""
num_episodes = 25
return [
- Episode(
- item_id=f"{prov_podcast_id}_{episode_idx}",
- provider=self.instance_id,
- name=f"Test Episode {prov_podcast_id}-{episode_idx}",
- duration=60,
- podcast=ItemMapping(
- item_id=prov_podcast_id,
- provider=self.instance_id,
- name=f"Test Podcast {prov_podcast_id}",
- media_type=MediaType.PODCAST,
- image=DEFAULT_THUMB,
- ),
- provider_mappings={
- ProviderMapping(
- item_id=f"{prov_podcast_id}_{episode_idx}",
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
- },
- metadata=MediaItemMetadata(
- description="This is a description for "
- f"Test Episode {episode_idx} of Test Podcast {prov_podcast_id}"
- ),
- position=episode_idx,
- )
+ await self.get_podcast_episode(f"{prov_podcast_id}_{episode_idx}")
for episode_idx in range(num_episodes)
]
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get (full) podcast episode details by id."""
+ podcast_id, episode_idx = prov_episode_id.split("_", 2)
+ return PodcastEpisode(
+ item_id=prov_episode_id,
+ provider=self.instance_id,
+ name=f"Test PodcastEpisode {podcast_id}-{episode_idx}",
+ duration=60,
+ podcast=ItemMapping(
+ item_id=podcast_id,
+ provider=self.instance_id,
+ name=f"Test Podcast {podcast_id}",
+ media_type=MediaType.PODCAST,
+ image=DEFAULT_THUMB,
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=prov_episode_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ metadata=MediaItemMetadata(
+ description="This is a description for "
+ f"Test PodcastEpisode {episode_idx} of Test Podcast {podcast_id}"
+ ),
+ position=int(episode_idx),
+ )
+
async def get_stream_details(
self, item_id: str, media_type: MediaType = MediaType.TRACK
) -> StreamDetails: