DB_TABLE_TRACKS: Final[str] = "tracks"
DB_TABLE_PLAYLISTS: Final[str] = "playlists"
DB_TABLE_RADIOS: Final[str] = "radios"
+DB_TABLE_AUDIOBOOKS: Final[str] = "audiobooks"
+DB_TABLE_PODCASTS: Final[str] = "podcasts"
DB_TABLE_CACHE: Final[str] = "cache"
DB_TABLE_SETTINGS: Final[str] = "settings"
DB_TABLE_THUMBS: Final[str] = "thumbnails"
# store (serializable items) in cache
if prov.is_streaming_provider:
self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items]),
- category=cache_category,
- base_key=cache_base_key,
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ category=cache_category,
+ base_key=cache_base_key,
+ ),
)
for item in items:
# if this is a complete track object, pre-cache it as
--- /dev/null
+"""Manage MediaItems of type Audiobook."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.enums import MediaType, ProviderFeature
+from music_assistant_models.errors import InvalidDataError
+from music_assistant_models.media_items import Artist, Audiobook, Chapter, UniqueList
+
+from music_assistant.constants import DB_TABLE_AUDIOBOOKS
+from music_assistant.controllers.media.base import MediaControllerBase
+from music_assistant.helpers.compare import (
+ compare_audiobook,
+ compare_media_item,
+ loose_compare_strings,
+)
+from music_assistant.helpers.json import serialize_to_json
+
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import Track
+
+ from music_assistant.models.music_provider import MusicProvider
+
+
+class AudiobooksController(MediaControllerBase[Audiobook]):
+ """Controller managing MediaItems of type Audiobook."""
+
+ db_table = DB_TABLE_AUDIOBOOKS
+ media_type = MediaType.AUDIOBOOK
+ item_cls = Audiobook
+
+ def __init__(self, *args, **kwargs) -> None:
+ """Initialize class."""
+ super().__init__(*args, **kwargs)
+ self.base_query = """
+ SELECT
+ audiobooks.*,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', provider_mappings.provider_item_id,
+ 'provider_domain', provider_mappings.provider_domain,
+ 'provider_instance', provider_mappings.provider_instance,
+ 'available', provider_mappings.available,
+ 'audio_format', json(provider_mappings.audio_format),
+ 'url', provider_mappings.url,
+ 'details', provider_mappings.details
+ )) FROM provider_mappings WHERE provider_mappings.item_id = audiobooks.item_id AND media_type = 'audiobook') AS provider_mappings
+ FROM audiobooks""" # noqa: E501
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/audiobook_chapters", self.chapters)
+ self.mass.register_api_command(f"music/{api_base}/audiobook_versions", self.versions)
+
+ async def library_items(
+ self,
+ favorite: bool | None = None,
+ search: str | None = None,
+ limit: int = 500,
+ offset: int = 0,
+ order_by: str = "sort_name",
+ provider: str | None = None,
+ extra_query: str | None = None,
+ extra_query_params: dict[str, Any] | None = None,
+ ) -> list[Artist]:
+ """Get in-database audiobooks."""
+ extra_query_params: dict[str, Any] = extra_query_params or {}
+ extra_query_parts: list[str] = [extra_query] if extra_query else []
+ result = await self._get_library_items_by_query(
+ favorite=favorite,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ )
+ if search and len(result) < 25 and not offset:
+ # append author items to result
+ extra_query_parts = [
+ "WHERE audiobooks.authors LIKE :search OR audiobooks.name LIKE :search",
+ ]
+ extra_query_params["search"] = f"%{search}%"
+ return result + await self._get_library_items_by_query(
+ favorite=favorite,
+ search=None,
+ limit=limit,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ )
+ return result
+
+ async def chapters(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> UniqueList[Chapter]:
+ """Return audiobook chapters for the given provider audiobook id."""
+ # always check if we have a library item for this audiobook
+ library_audiobook = await self.get_library_item_by_prov_id(
+ item_id, provider_instance_id_or_domain
+ )
+ if not library_audiobook:
+ return await self._get_provider_audiobook_chapters(
+ item_id, provider_instance_id_or_domain
+ )
+ # return items from first/only provider
+ for provider_mapping in library_audiobook.provider_mappings:
+ return await self._get_provider_audiobook_chapters(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ return UniqueList()
+
+ async def versions(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> UniqueList[Audiobook]:
+ """Return all versions of an audiobook we can find on all providers."""
+ audiobook = await self.get_provider_item(item_id, provider_instance_id_or_domain)
+ search_query = audiobook.name
+ result: UniqueList[Audiobook] = UniqueList()
+ for provider_id in self.mass.music.get_unique_providers():
+ provider = self.mass.get_provider(provider_id)
+ if not provider:
+ continue
+ if not provider.library_supported(MediaType.AUDIOBOOK):
+ continue
+ result.extend(
+ prov_item
+ for prov_item in await self.search(search_query, provider_id)
+ if loose_compare_strings(audiobook.name, prov_item.name)
+ # make sure that the 'base' version is NOT included
+ and not audiobook.provider_mappings.intersection(prov_item.provider_mappings)
+ )
+ return result
+
+ async def _add_library_item(self, item: Audiobook) -> int:
+ """Add a new record to the database."""
+ if not isinstance(item, Audiobook):
+ msg = "Not a valid Audiobook object (ItemMapping can not be added to db)"
+ raise InvalidDataError(msg)
+ db_id = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "version": item.version,
+ "favorite": item.favorite,
+ "metadata": serialize_to_json(item.metadata),
+ "external_ids": serialize_to_json(item.external_ids),
+ "publisher": item.publisher,
+ "total_chapters": item.total_chapters,
+ "authors": item.authors,
+ "narrators": item.narrators,
+ },
+ )
+ # update/set provider_mappings table
+ await self._set_provider_mappings(db_id, item.provider_mappings)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
+
+ async def _update_library_item(
+ self, item_id: str | int, update: Audiobook, overwrite: bool = False
+ ) -> None:
+ """Update existing record in the database."""
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_library_item(db_id)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ cur_item.external_ids.update(update.external_ids)
+ provider_mappings = (
+ update.provider_mappings
+ if overwrite
+ else {*cur_item.provider_mappings, *update.provider_mappings}
+ )
+ await self.mass.music.database.update(
+ self.db_table,
+ {"item_id": db_id},
+ {
+ "name": update.name if overwrite else cur_item.name,
+ "sort_name": update.sort_name
+ if overwrite
+ else cur_item.sort_name or update.sort_name,
+ "version": update.version if overwrite else cur_item.version or update.version,
+ "metadata": serialize_to_json(metadata),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
+ "publisher": cur_item.publisher or update.publisher,
+ "total_chapters": cur_item.total_chapters or update.total_chapters,
+ "authors": update.authors if overwrite else cur_item.authors or update.authors,
+ "narrators": update.narrators
+ if overwrite
+ else cur_item.narrators or update.narrators,
+ },
+ )
+ # update/set provider_mappings table
+ await self._set_provider_mappings(db_id, provider_mappings, overwrite)
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
+
+ async def _get_provider_audiobook_chapters(
+ self, item_id: str, provider_instance_id_or_domain: str
+ ) -> list[Chapter]:
+ """Return audiobook chapters for the given provider audiobook id."""
+ prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
+ if prov is None:
+ return []
+ # prefer cache items (if any) - for streaming providers only
+ cache_base_key = prov.lookup_key
+ cache_key = f"audiobook.{item_id}"
+ if (
+ prov.is_streaming_provider
+ and (cache := await self.mass.cache.get(cache_key, base_key=cache_base_key)) is not None
+ ):
+ return [Chapter.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await prov.get_audiobook_chapters(item_id)
+ # store (serializable items) in cache
+ if prov.is_streaming_provider:
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ expiration=3600,
+ base_key=cache_base_key,
+ ),
+ )
+
+ return items
+
+ async def _get_provider_dynamic_base_tracks(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ limit: int = 25,
+ ) -> list[Track]:
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
+ msg = "Dynamic tracks not supported for Radio MediaItem"
+ raise NotImplementedError(msg)
+
+ async def _get_dynamic_tracks(self, media_item: Audiobook, limit: int = 25) -> list[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ msg = "Dynamic tracks not supported for Audiobook MediaItem"
+ raise NotImplementedError(msg)
+
+ async def match_providers(self, db_audiobook: Audiobook) -> None:
+ """Try to find match on all (streaming) providers for the provided (database) audiobook.
+
+ This is used to link objects of different providers/qualities together.
+ """
+ if db_audiobook.provider != "library":
+ return # Matching only supported for database items
+ if not db_audiobook.authors:
+ return # guard
+ author_name = db_audiobook.authors[0]
+
+ async def find_prov_match(provider: MusicProvider):
+ self.logger.debug(
+ "Trying to match audiobook %s on provider %s", db_audiobook.name, provider.name
+ )
+ match_found = False
+ search_str = f"{author_name} - {db_audiobook.name}"
+ search_result = await self.search(search_str, provider.instance_id)
+ for search_result_item in search_result:
+ if not search_result_item.available:
+ continue
+ if not compare_media_item(db_audiobook, search_result_item):
+ continue
+ # we must fetch the full audiobook version, search results can be simplified objects
+ prov_audiobook = await self.get_provider_item(
+ search_result_item.item_id,
+ search_result_item.provider,
+ fallback=search_result_item,
+ )
+ if compare_audiobook(db_audiobook, prov_audiobook):
+ # 100% match, we update the db with the additional provider mapping(s)
+ match_found = True
+ for provider_mapping in search_result_item.provider_mappings:
+ await self.add_provider_mapping(db_audiobook.item_id, provider_mapping)
+ db_audiobook.provider_mappings.add(provider_mapping)
+ return match_found
+
+ # try to find match on all providers
+ cur_provider_domains = {x.provider_domain for x in db_audiobook.provider_mappings}
+ for provider in self.mass.music.providers:
+ if provider.domain in cur_provider_domains:
+ continue
+ if ProviderFeature.SEARCH not in provider.supported_features:
+ continue
+ if not provider.library_supported(MediaType.AUDIOBOOK):
+ continue
+ if not provider.is_streaming_provider:
+ # matching on unique providers is pointless as they push (all) their content to MA
+ continue
+ if await find_prov_match(provider):
+ cur_provider_domains.add(provider.domain)
+ else:
+ self.logger.debug(
+ "Could not find match for Audiobook %s on provider %s",
+ db_audiobook.name,
+ provider.name,
+ )
--- /dev/null
+"""Manage MediaItems of type Podcast."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.enums import MediaType, ProviderFeature
+from music_assistant_models.errors import InvalidDataError
+from music_assistant_models.media_items import Artist, Episode, Podcast, UniqueList
+
+from music_assistant.constants import DB_TABLE_PODCASTS
+from music_assistant.controllers.media.base import MediaControllerBase
+from music_assistant.helpers.compare import (
+ compare_media_item,
+ compare_podcast,
+ loose_compare_strings,
+)
+from music_assistant.helpers.json import serialize_to_json
+
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import Track
+
+ from music_assistant.models.music_provider import MusicProvider
+
+
+class PodcastsController(MediaControllerBase[Podcast]):
+ """Controller managing MediaItems of type Podcast."""
+
+ db_table = DB_TABLE_PODCASTS
+ media_type = MediaType.PODCAST
+ item_cls = Podcast
+
+ def __init__(self, *args, **kwargs) -> None:
+ """Initialize class."""
+ super().__init__(*args, **kwargs)
+ self.base_query = """
+ SELECT
+ podcasts.*,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', provider_mappings.provider_item_id,
+ 'provider_domain', provider_mappings.provider_domain,
+ 'provider_instance', provider_mappings.provider_instance,
+ 'available', provider_mappings.available,
+ 'audio_format', json(provider_mappings.audio_format),
+ 'url', provider_mappings.url,
+ 'details', provider_mappings.details
+ )) FROM provider_mappings WHERE provider_mappings.item_id = podcasts.item_id AND media_type = 'podcast') AS provider_mappings
+ FROM podcasts""" # noqa: E501
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/podcast_episodes", self.episodes)
+ self.mass.register_api_command(f"music/{api_base}/podcast_versions", self.versions)
+
+ async def library_items(
+ self,
+ favorite: bool | None = None,
+ search: str | None = None,
+ limit: int = 500,
+ offset: int = 0,
+ order_by: str = "sort_name",
+ provider: str | None = None,
+ extra_query: str | None = None,
+ extra_query_params: dict[str, Any] | None = None,
+ ) -> list[Artist]:
+ """Get in-database podcasts."""
+ extra_query_params: dict[str, Any] = extra_query_params or {}
+ extra_query_parts: list[str] = [extra_query] if extra_query else []
+ result = await self._get_library_items_by_query(
+ favorite=favorite,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ )
+ if search and len(result) < 25 and not offset:
+ # append publisher items to result
+ extra_query_parts = [
+ "WHERE podcasts.publisher LIKE :search OR podcasts.name LIKE :search",
+ ]
+ extra_query_params["search"] = f"%{search}%"
+ return result + await self._get_library_items_by_query(
+ favorite=favorite,
+ search=None,
+ limit=limit,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ )
+ return result
+
+ async def episodes(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> UniqueList[Episode]:
+ """Return podcast episodes for the given provider podcast id."""
+ # always check if we have a library item for this podcast
+ library_podcast = await self.get_library_item_by_prov_id(
+ item_id, provider_instance_id_or_domain
+ )
+ if not library_podcast:
+ return await self._get_provider_podcast_episodes(
+ item_id, provider_instance_id_or_domain
+ )
+ # return items from first/only provider
+ for provider_mapping in library_podcast.provider_mappings:
+ return await self._get_provider_podcast_episodes(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ return UniqueList()
+
+ async def versions(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> UniqueList[Podcast]:
+ """Return all versions of an podcast we can find on all providers."""
+ podcast = await self.get_provider_item(item_id, provider_instance_id_or_domain)
+ search_query = podcast.name
+ result: UniqueList[Podcast] = UniqueList()
+ for provider_id in self.mass.music.get_unique_providers():
+ provider = self.mass.get_provider(provider_id)
+ if not provider:
+ continue
+ if not provider.library_supported(MediaType.PODCAST):
+ continue
+ result.extend(
+ prov_item
+ for prov_item in await self.search(search_query, provider_id)
+ if loose_compare_strings(podcast.name, prov_item.name)
+ # make sure that the 'base' version is NOT included
+ and not podcast.provider_mappings.intersection(prov_item.provider_mappings)
+ )
+ return result
+
+ async def _add_library_item(self, item: Podcast) -> int:
+ """Add a new record to the database."""
+ if not isinstance(item, Podcast):
+ msg = "Not a valid Podcast object (ItemMapping can not be added to db)"
+ raise InvalidDataError(msg)
+ db_id = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "version": item.version,
+ "favorite": item.favorite,
+ "metadata": serialize_to_json(item.metadata),
+ "external_ids": serialize_to_json(item.external_ids),
+ "publisher": item.publisher,
+ "total_episodes": item.total_episodes,
+ },
+ )
+ # update/set provider_mappings table
+ await self._set_provider_mappings(db_id, item.provider_mappings)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
+
+ async def _update_library_item(
+ self, item_id: str | int, update: Podcast, overwrite: bool = False
+ ) -> None:
+ """Update existing record in the database."""
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_library_item(db_id)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ cur_item.external_ids.update(update.external_ids)
+ provider_mappings = (
+ update.provider_mappings
+ if overwrite
+ else {*cur_item.provider_mappings, *update.provider_mappings}
+ )
+ await self.mass.music.database.update(
+ self.db_table,
+ {"item_id": db_id},
+ {
+ "name": update.name if overwrite else cur_item.name,
+ "sort_name": update.sort_name
+ if overwrite
+ else cur_item.sort_name or update.sort_name,
+ "version": update.version if overwrite else cur_item.version or update.version,
+ "metadata": serialize_to_json(metadata),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
+ "publisher": cur_item.publisher or update.publisher,
+ "total_episodes": cur_item.total_episodes or update.total_episodes,
+ },
+ )
+ # update/set provider_mappings table
+ await self._set_provider_mappings(db_id, provider_mappings, overwrite)
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
+
+ async def _get_provider_podcast_episodes(
+ self, item_id: str, provider_instance_id_or_domain: str
+ ) -> list[Episode]:
+ """Return podcast episodes for the given provider podcast id."""
+ prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
+ if prov is None:
+ return []
+ # prefer cache items (if any) - for streaming providers only
+ cache_base_key = prov.lookup_key
+ cache_key = f"podcast.{item_id}"
+ if (
+ prov.is_streaming_provider
+ and (cache := await self.mass.cache.get(cache_key, base_key=cache_base_key)) is not None
+ ):
+ return [Episode.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await prov.get_podcast_episodes(item_id)
+ # store (serializable items) in cache
+ if prov.is_streaming_provider:
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ expiration=3600,
+ base_key=cache_base_key,
+ ),
+ )
+
+ return items
+
+ async def _get_provider_dynamic_base_tracks(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ limit: int = 25,
+ ) -> list[Track]:
+ """Get the list of base tracks from the controller used to calculate the dynamic radio."""
+ msg = "Dynamic tracks not supported for Podcast MediaItem"
+ raise NotImplementedError(msg)
+
+ async def _get_dynamic_tracks(self, media_item: Podcast, limit: int = 25) -> list[Track]:
+ """Get dynamic list of tracks for given item, fallback/default implementation."""
+ msg = "Dynamic tracks not supported for Podcast MediaItem"
+ raise NotImplementedError(msg)
+
+ async def match_providers(self, db_podcast: Podcast) -> None:
+ """Try to find match on all (streaming) providers for the provided (database) podcast.
+
+ This is used to link objects of different providers/qualities together.
+ """
+ if db_podcast.provider != "library":
+ return # Matching only supported for database items
+
+ async def find_prov_match(provider: MusicProvider):
+ self.logger.debug(
+ "Trying to match podcast %s on provider %s", db_podcast.name, provider.name
+ )
+ match_found = False
+ search_str = db_podcast.name
+ search_result = await self.search(search_str, provider.instance_id)
+ for search_result_item in search_result:
+ if not search_result_item.available:
+ continue
+ if not compare_media_item(db_podcast, search_result_item):
+ continue
+ # we must fetch the full podcast version, search results can be simplified objects
+ prov_podcast = await self.get_provider_item(
+ search_result_item.item_id,
+ search_result_item.provider,
+ fallback=search_result_item,
+ )
+ if compare_podcast(db_podcast, prov_podcast):
+ # 100% match, we update the db with the additional provider mapping(s)
+ match_found = True
+ for provider_mapping in search_result_item.provider_mappings:
+ await self.add_provider_mapping(db_podcast.item_id, provider_mapping)
+ db_podcast.provider_mappings.add(provider_mapping)
+ return match_found
+
+ # try to find match on all providers
+ cur_provider_domains = {x.provider_domain for x in db_podcast.provider_mappings}
+ for provider in self.mass.music.providers:
+ if provider.domain in cur_provider_domains:
+ continue
+ if ProviderFeature.SEARCH not in provider.supported_features:
+ continue
+ if not provider.library_supported(MediaType.PODCAST):
+ continue
+ if not provider.is_streaming_provider:
+ # matching on unique providers is pointless as they push (all) their content to MA
+ continue
+ if await find_prov_match(provider):
+ cur_provider_domains.add(provider.domain)
+ else:
+ self.logger.debug(
+ "Could not find match for Podcast %s on provider %s",
+ db_podcast.name,
+ provider.name,
+ )
DB_TABLE_ALBUM_TRACKS,
DB_TABLE_ALBUMS,
DB_TABLE_ARTISTS,
+ DB_TABLE_AUDIOBOOKS,
DB_TABLE_LOUDNESS_MEASUREMENTS,
DB_TABLE_PLAYLISTS,
DB_TABLE_PLAYLOG,
+ DB_TABLE_PODCASTS,
DB_TABLE_PROVIDER_MAPPINGS,
DB_TABLE_RADIOS,
DB_TABLE_SETTINGS,
from .media.albums import AlbumsController
from .media.artists import ArtistsController
+from .media.audiobooks import AudiobooksController
from .media.playlists import PlaylistController
+from .media.podcasts import PodcastsController
from .media.radio import RadioController
from .media.tracks import TracksController
self.tracks = TracksController(self.mass)
self.radio = RadioController(self.mass)
self.playlists = PlaylistController(self.mass)
+ self.audiobooks = AudiobooksController(self.mass)
+ self.podcasts = PodcastsController(self.mass)
self.in_progress_syncs: list[SyncTask] = []
self._sync_lock = asyncio.Lock()
self.manifest.name = "Music controller"
return SearchResults(tracks=[item])
elif media_type == MediaType.PLAYLIST:
return SearchResults(playlists=[item])
+ elif media_type == MediaType.AUDIOBOOK:
+ return SearchResults(audiobooks=[item])
+ elif media_type == MediaType.PODCAST:
+ return SearchResults(podcasts=[item])
else:
return SearchResults()
for item in sublist
if item is not None
][:limit],
+ audiobooks=[
+ item
+ for sublist in zip_longest(*[x.audiobooks for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ][:limit],
+ podcasts=[
+ item
+ for sublist in zip_longest(*[x.podcasts for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ][:limit],
)
async def search_provider(
result.playlists = search_results
elif media_type == MediaType.RADIO:
result.radio = search_results
+ elif media_type == MediaType.AUDIOBOOK:
+ result.audiobooks = search_results
+ elif media_type == MediaType.PODCAST:
+ result.podcasts = search_results
return result
@api_command("music/browse")
result = searchresult.tracks
elif media_item.media_type == MediaType.PLAYLIST:
result = searchresult.playlists
+ elif media_item.media_type == MediaType.AUDIOBOOK:
+ result = searchresult.audiobooks
+ elif media_item.media_type == MediaType.PODCAST:
+ result = searchresult.podcasts
else:
result = searchresult.radio
for item in result:
| TracksController
| RadioController
| PlaylistController
+ | AudiobooksController
+ | PodcastsController
):
"""Return controller for MediaType."""
if media_type == MediaType.ARTIST:
return self.radio
if media_type == MediaType.PLAYLIST:
return self.playlists
+ if media_type == MediaType.AUDIOBOOK:
+ return self.audiobooks
+ if media_type == MediaType.PODCAST:
+ return self.podcasts
return None
def get_unique_providers(self) -> set[str]:
DB_TABLE_ARTISTS,
DB_TABLE_PLAYLISTS,
DB_TABLE_RADIOS,
+ DB_TABLE_AUDIOBOOKS,
+ DB_TABLE_PODCASTS,
DB_TABLE_ALBUM_TRACKS,
DB_TABLE_PLAYLOG,
DB_TABLE_PROVIDER_MAPPINGS,
[timestamp_modified] INTEGER
);"""
)
+ await self.database.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {DB_TABLE_AUDIOBOOKS}(
+ [item_id] INTEGER PRIMARY KEY AUTOINCREMENT,
+ [name] TEXT NOT NULL,
+ [sort_name] TEXT NOT NULL,
+ [favorite] BOOLEAN DEFAULT 0,
+ [publisher] TEXT NOT NULL,
+ [total_chapters] INTEGER NOT NULL,
+ [authors] json NOT NULL,
+ [narrators] json NOT NULL,
+ [metadata] json NOT NULL,
+ [external_ids] json NOT NULL,
+ [play_count] INTEGER DEFAULT 0,
+ [last_played] INTEGER DEFAULT 0,
+ [timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
+ [timestamp_modified] INTEGER
+ );"""
+ )
+ await self.database.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {DB_TABLE_PODCASTS}(
+ [item_id] INTEGER PRIMARY KEY AUTOINCREMENT,
+ [name] TEXT NOT NULL,
+ [sort_name] TEXT NOT NULL,
+ [favorite] BOOLEAN DEFAULT 0,
+ [publisher] TEXT NOT NULL,
+ [total_episodes] INTEGER NOT NULL,
+ [metadata] json NOT NULL,
+ [external_ids] json NOT NULL,
+ [play_count] INTEGER DEFAULT 0,
+ [last_played] INTEGER DEFAULT 0,
+ [timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
+ [timestamp_modified] INTEGER
+ );"""
+ )
await self.database.execute(
f"""
CREATE TABLE IF NOT EXISTS {DB_TABLE_ALBUM_TRACKS}(
DB_TABLE_TRACKS,
DB_TABLE_PLAYLISTS,
DB_TABLE_RADIOS,
+ DB_TABLE_AUDIOBOOKS,
+ DB_TABLE_PODCASTS,
):
# index on favorite column
await self.database.execute(
async def __create_database_triggers(self) -> None:
"""Create database triggers."""
# triggers to auto update timestamps
- for db_table in ("artists", "albums", "tracks", "playlists", "radios"):
+ for db_table in (
+ "artists",
+ "albums",
+ "tracks",
+ "playlists",
+ "radios",
+ "audiobooks",
+ "podcasts",
+ ):
await self.database.execute(
f"""
CREATE TRIGGER IF NOT EXISTS update_{db_table}_timestamp
from music_assistant_models.media_items import (
Album,
Artist,
+ Audiobook,
ItemMapping,
MediaItem,
MediaItemMetadata,
MediaItemType,
Playlist,
+ Podcast,
Radio,
Track,
)
return compare_playlist(base_item, compare_item, strict)
if base_item.media_type == MediaType.RADIO and compare_item.media_type == MediaType.RADIO:
return compare_radio(base_item, compare_item, strict)
+ if (
+ base_item.media_type == MediaType.AUDIOBOOK
+ and compare_item.media_type == MediaType.AUDIOBOOK
+ ):
+ return compare_audiobook(base_item, compare_item, strict)
+ if base_item.media_type == MediaType.PODCAST and compare_item.media_type == MediaType.PODCAST:
+ return compare_podcast(base_item, compare_item, strict)
return compare_item_mapping(base_item, compare_item, strict)
return compare_strings(base_item.name, compare_item.name, strict=strict)
+def compare_audiobook(
+ base_item: Audiobook | ItemMapping | None,
+ compare_item: Audiobook | ItemMapping | None,
+ strict: bool = True,
+) -> bool | None:
+ """Compare two Audiobook items and return True if they match."""
+ if base_item is None or compare_item is None:
+ return False
+ # return early on exact item_id match
+ if compare_item_ids(base_item, compare_item):
+ return True
+
+ # return early on (un)matched external id
+ for ext_id in (
+ ExternalID.ASIN,
+ ExternalID.BARCODE,
+ ):
+ external_id_match = compare_external_ids(
+ base_item.external_ids, compare_item.external_ids, ext_id
+ )
+ if external_id_match is not None:
+ return external_id_match
+
+ # compare version
+ if not compare_version(base_item.version, compare_item.version):
+ return False
+ # compare name
+ if not compare_strings(base_item.name, compare_item.name, strict=True):
+ return False
+ if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
+ return True
+ # for strict matching we REQUIRE both items to be a real Audiobook object
+ assert isinstance(base_item, Audiobook)
+ assert isinstance(compare_item, Audiobook)
+ # compare publisher
+ if (
+ base_item.publisher
+ and compare_item.publisher
+ and not compare_strings(base_item.publisher, compare_item.publisher, strict=True)
+ ):
+ return False
+ # compare author(s)
+ for author in base_item.authors:
+ author_safe = create_safe_string(author)
+ if author_safe in [create_safe_string(x) for x in compare_item.authors]:
+ return True
+ return False
+
+
+def compare_podcast(
+ base_item: Podcast | ItemMapping | None,
+ compare_item: Podcast | ItemMapping | None,
+ strict: bool = True,
+) -> bool | None:
+ """Compare two Podcast items and return True if they match."""
+ if base_item is None or compare_item is None:
+ return False
+ # return early on exact item_id match
+ if compare_item_ids(base_item, compare_item):
+ return True
+
+ # return early on (un)matched external id
+ for ext_id in (
+ ExternalID.ASIN,
+ ExternalID.BARCODE,
+ ):
+ external_id_match = compare_external_ids(
+ base_item.external_ids, compare_item.external_ids, ext_id
+ )
+ if external_id_match is not None:
+ return external_id_match
+
+ # compare version
+ if not compare_version(base_item.version, compare_item.version):
+ return False
+ # compare name
+ if not compare_strings(base_item.name, compare_item.name, strict=True):
+ return False
+ if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
+ return True
+ # for strict matching we REQUIRE both items to be a real Podcast object
+ assert isinstance(base_item, Audiobook)
+ assert isinstance(compare_item, Audiobook)
+ # compare publisher
+ return not (
+ base_item.publisher
+ and compare_item.publisher
+ and not compare_strings(base_item.publisher, compare_item.publisher, strict=True)
+ )
+
+
def compare_item_mapping(
base_item: ItemMapping,
compare_item: ItemMapping,
from music_assistant_models.media_items import (
Album,
Artist,
+ Audiobook,
BrowseFolder,
+ Chapter,
+ Episode,
ItemMapping,
MediaItemType,
Playlist,
raise NotImplementedError
yield # type: ignore
+ async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
+ """Retrieve library/subscribed audiobooks from the provider."""
+ if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
+ raise NotImplementedError
+ yield # type: ignore
+
+ async def get_library_podcasts(self) -> AsyncGenerator[Audiobook, None]:
+ """Retrieve library/subscribed podcasts from the provider."""
+ if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
+ raise NotImplementedError
+ yield # type: ignore
+
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
raise NotImplementedError
if ProviderFeature.LIBRARY_RADIOS in self.supported_features:
raise NotImplementedError
+ async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook: # type: ignore[return]
+ """Get full audiobook details by id."""
+ if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
+ raise NotImplementedError
+
+ async def get_podcast(self, prov_podcast_id: str) -> Audiobook: # type: ignore[return]
+ """Get full audiobook details by id."""
+ if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
+ raise NotImplementedError
+
+ async def get_chapter(self, prov_chapter_id: str) -> Chapter: # type: ignore[return]
+ """Get (full) audiobook chapter details by id."""
+ if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
+ raise NotImplementedError
+
+ async def get_episode(self, prov_episode_id: str) -> Chapter: # type: ignore[return]
+ """Get (full) podcast episode details by id."""
+ if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
+ raise NotImplementedError
+
async def get_album_tracks(
self,
prov_album_id: str, # type: ignore[return]
if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
raise NotImplementedError
+ async def get_audiobook_chapters(
+ self,
+ prov_audiobook_id: str,
+ ) -> list[Chapter]:
+ """Get all Chapters for given audiobook id."""
+ if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
+ raise NotImplementedError
+
+ async def get_podcast_episodes(
+ self,
+ prov_podcast_id: str,
+ ) -> list[Episode]:
+ """Get all Episodes for given podcast id."""
+ if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
+ raise NotImplementedError
+
async def library_add(self, item: MediaItemType) -> bool:
"""Add item to provider's library. Return true on success."""
if (
and ProviderFeature.LIBRARY_RADIOS_EDIT in self.supported_features
):
raise NotImplementedError
+ if (
+ item.media_type == MediaType.AUDIOBOOK
+ and ProviderFeature.LIBRARY_AUDIOBOOKS_EDIT in self.supported_features
+ ):
+ raise NotImplementedError
+ if (
+ item.media_type == MediaType.PODCAST
+ and ProviderFeature.LIBRARY_PODCASTS_EDIT in self.supported_features
+ ):
+ raise NotImplementedError
self.logger.info(
"Provider %s does not support library edit, "
"the action will only be performed in the local database.",
and ProviderFeature.LIBRARY_RADIOS_EDIT in self.supported_features
):
raise NotImplementedError
+ if (
+ media_type == MediaType.AUDIOBOOK
+ and ProviderFeature.LIBRARY_AUDIOBOOKS_EDIT in self.supported_features
+ ):
+ raise NotImplementedError
+ if (
+ media_type == MediaType.PODCAST
+ and ProviderFeature.LIBRARY_PODCASTS_EDIT in self.supported_features
+ ):
+ raise NotImplementedError
self.logger.info(
"Provider %s does not support library edit, "
"the action will only be performed in the local database.",
if ProviderFeature.SIMILAR_TRACKS in self.supported_features:
raise NotImplementedError
- async def get_stream_details(self, item_id: str) -> StreamDetails:
- """Get streamdetails for a track/radio."""
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
+ """Get streamdetails for a track/radio/chapter/episode."""
raise NotImplementedError
async def get_audio_stream( # type: ignore[return]
return await self.get_playlist(prov_item_id)
if media_type == MediaType.RADIO:
return await self.get_radio(prov_item_id)
+ if media_type == MediaType.AUDIOBOOK:
+ return await self.get_audiobook(prov_item_id)
+ if media_type == MediaType.PODCAST:
+ return await self.get_podcast(prov_item_id)
+ if media_type == MediaType.CHAPTER:
+ return await self.get_chapter(prov_item_id)
+ if media_type == MediaType.EPISODE:
+ return await self.get_episode(prov_item_id)
return await self.get_track(prov_item_id)
- async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping]:
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping]: # noqa: PLR0915
"""Browse this provider's items.
:param path: The path to browse, (e.g. provider_id://artists).
return await self.mass.music.playlists.library_items(
extra_query=query, extra_query_params=query_params
)
+ if subpath == "audiobooks":
+ library_items = await self.mass.cache.get(
+ "audiobook",
+ default=[],
+ category=CacheCategory.LIBRARY_ITEMS,
+ base_key=self.instance_id,
+ )
+ library_items = cast(list[int], library_items)
+ query = "audiobooks.item_id in :ids"
+ query_params = {"ids": library_items}
+ return await self.mass.music.audiobooks.library_items(
+ extra_query=query, extra_query_params=query_params
+ )
+ if subpath == "podcasts":
+ library_items = await self.mass.cache.get(
+ "podcast",
+ default=[],
+ category=CacheCategory.LIBRARY_ITEMS,
+ base_key=self.instance_id,
+ )
+ library_items = cast(list[int], library_items)
+ query = "podcasts.item_id in :ids"
+ query_params = {"ids": library_items}
+ return await self.mass.music.podcasts.library_items(
+ extra_query=query, extra_query_params=query_params
+ )
if subpath:
# unknown path
msg = "Invalid subpath"
label="radios",
)
)
+ if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="audiobooks",
+ provider=self.domain,
+ path=path + "audiobooks",
+ name="",
+ label="audiobooks",
+ )
+ )
+ if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="podcasts",
+ provider=self.domain,
+ path=path + "podcasts",
+ name="",
+ label="podcasts",
+ )
+ )
return items
async def recommendations(self) -> list[MediaItemType]:
return ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features
if media_type == MediaType.RADIO:
return ProviderFeature.LIBRARY_RADIOS in self.supported_features
+ if media_type == MediaType.AUDIOBOOK:
+ return ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features
+ if media_type == MediaType.PODCAST:
+ return ProviderFeature.LIBRARY_PODCASTS in self.supported_features
return False
def library_edit_supported(self, media_type: MediaType) -> bool:
return ProviderFeature.LIBRARY_PLAYLISTS_EDIT in self.supported_features
if media_type == MediaType.RADIO:
return ProviderFeature.LIBRARY_RADIOS_EDIT in self.supported_features
+ if media_type == MediaType.AUDIOBOOK:
+ return ProviderFeature.LIBRARY_AUDIOBOOKS_EDIT in self.supported_features
return False
def _get_library_gen(self, media_type: MediaType) -> AsyncGenerator[MediaItemType, None]:
return self.get_library_playlists()
if media_type == MediaType.RADIO:
return self.get_library_radios()
+ if media_type == MediaType.AUDIOBOOK:
+ return self.get_library_audiobooks()
+ if media_type == MediaType.PODCAST:
+ return self.get_library_podcasts()
raise NotImplementedError
# Get a list of similar tracks based on the provided track.
# This is only called if the provider supports the SIMILAR_TRACKS feature.
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Get streamdetails for a track/radio."""
# Get stream details for a track or radio.
# Implementing this method is MANDATORY to allow playback.
found_tracks.append(self._parse_track(track))
return found_tracks
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
stream_metadata = await self._fetch_song_stream_metadata(item_id)
license_url = stream_metadata["hls-key-server-url"]
)
return media_info
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Get streamdetails for a track/radio."""
media_info = await self._get_media_info(item_id)
is_radio = media_info.get("icy-name") or not media_info.duration
]["data"][:limit]
return [await self.get_track(track["SNG_ID"]) for track in tracks]
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
url_details, song_data = await self.gw_client.get_deezer_track_urls(item_id)
url = url_details["sources"][0]["url"]
await _file.write("#EXTM3U\n")
return await self.get_playlist(filename)
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
library_item = await self.mass.music.tracks.get_library_item_by_prov_id(
item_id, self.instance_id
for album in albums["Items"]
]
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
jellyfin_track = await self._client.get_track(item_id)
mimetype = self._media_mime_type(jellyfin_track)
msg = f"Failed to remove songs from {prov_playlist_id}, check your permissions."
raise ProviderPermissionDenied(msg) from ex
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Get the details needed to process a specified track."""
try:
sonic_song: SonicSong = await self._run_async(self._conn.getSong, item_id)
return albums
return []
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Get streamdetails for a track."""
plex_track = await self._get_data(item_id, PlexTrack)
if not plex_track or not plex_track.media:
playlist_track_ids=",".join(playlist_track_ids),
)
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
streamdata = None
for format_id in [27, 7, 6, 5]:
return radio
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.RADIO
+ ) -> StreamDetails:
"""Get streamdetails for a radio station."""
stream = await self.radios.station(uuid=item_id)
if not stream:
return self._parse_radio(self._channels_by_id[prov_radio_id])
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.RADIO
+ ) -> StreamDetails:
"""Get streamdetails for a track/radio."""
hls_path = f"http://{self._base_url}/{item_id}.m3u8"
return tracks
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
url: str = await self._soundcloud.get_stream_url(track_id=item_id)
return StreamDetails(
items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit)
return [self._parse_track(item) for item in items["tracks"] if (item and item["id"])]
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
return StreamDetails(
item_id=item_id,
track_item_id = f"{artist_idx}_{album_idx}_{track_idx}"
yield await self.get_track(track_item_id)
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Get streamdetails for a track/radio."""
return StreamDetails(
provider=self.instance_id,
)
return self._parse_playlist(playlist_obj)
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
tidal_session = await self._get_tidal_session()
# make sure a valid track is requested.
await self.mass.cache.set(preset_id, result, base_key=cache_base_key)
return result
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.RADIO
+ ) -> StreamDetails:
"""Get streamdetails for a radio station."""
if item_id.startswith("http"):
# custom url
return tracks
return []
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
stream_format = await self._get_stream_format(item_id=item_id)
self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)