-"""Jellyfin support for MusicAssistant."""\r
-\r
-from __future__ import annotations\r
-\r
-import asyncio\r
-import logging\r
-import mimetypes\r
-import socket\r
-import uuid\r
-from asyncio import TaskGroup\r
-from typing import TYPE_CHECKING, Any\r
-\r
-if TYPE_CHECKING:\r
- from collections.abc import AsyncGenerator, Callable, Coroutine\r
-\r
-from jellyfin_apiclient_python import JellyfinClient\r
-from jellyfin_apiclient_python.api import API\r
-\r
-from music_assistant.common.models.config_entries import (\r
- ConfigEntry,\r
- ConfigValueType,\r
- ProviderConfig,\r
-)\r
-from music_assistant.common.models.enums import (\r
- ConfigEntryType,\r
- ContentType,\r
- ImageType,\r
- MediaType,\r
- ProviderFeature,\r
- StreamType,\r
-)\r
-from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError\r
-from music_assistant.common.models.media_items import (\r
- Album,\r
- Artist,\r
- AudioFormat,\r
- ItemMapping,\r
- MediaItem,\r
- MediaItemImage,\r
- Playlist,\r
- ProviderMapping,\r
- SearchResults,\r
- Track,\r
-)\r
-from music_assistant.common.models.streamdetails import StreamDetails\r
-\r
-if TYPE_CHECKING:\r
- from music_assistant.common.models.provider import ProviderManifest\r
-\r
-from music_assistant.constants import VARIOUS_ARTISTS_NAME\r
-\r
-if TYPE_CHECKING:\r
- from music_assistant.server import MusicAssistant\r
-if TYPE_CHECKING:\r
- from music_assistant.server.models import ProviderInstanceType\r
-\r
-from music_assistant.server.models.music_provider import MusicProvider\r
-\r
-from .const import (\r
- CLIENT_VERSION,\r
- ITEM_KEY_ALBUM,\r
- ITEM_KEY_ALBUM_ARTIST,\r
- ITEM_KEY_ALBUM_ARTISTS,\r
- ITEM_KEY_ALBUM_ID,\r
- ITEM_KEY_ARTIST_ITEMS,\r
- ITEM_KEY_CAN_DOWNLOAD,\r
- ITEM_KEY_COLLECTION_TYPE,\r
- ITEM_KEY_ID,\r
- ITEM_KEY_IMAGE_TAGS,\r
- ITEM_KEY_INDEX_NUMBER,\r
- ITEM_KEY_MEDIA_CHANNELS,\r
- ITEM_KEY_MEDIA_CODEC,\r
- ITEM_KEY_MEDIA_SOURCES,\r
- ITEM_KEY_MEDIA_STREAMS,\r
- ITEM_KEY_MUSICBRAINZ_ARTIST,\r
- ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP,\r
- ITEM_KEY_MUSICBRAINZ_TRACK,\r
- ITEM_KEY_NAME,\r
- ITEM_KEY_OVERVIEW,\r
- ITEM_KEY_PARENT_ID,\r
- ITEM_KEY_PARENT_INDEX_NUM,\r
- ITEM_KEY_PRODUCTION_YEAR,\r
- ITEM_KEY_PROVIDER_IDS,\r
- ITEM_KEY_RUNTIME_TICKS,\r
- ITEM_KEY_SORT_NAME,\r
- ITEM_KEY_USER_DATA,\r
- ITEM_TYPE_ALBUM,\r
- ITEM_TYPE_ARTIST,\r
- ITEM_TYPE_AUDIO,\r
- ITEM_TYPE_MUSICARTISTS,\r
- MAX_IMAGE_WIDTH,\r
- SUPPORTED_CONTAINER_FORMATS,\r
- USER_APP_NAME,\r
- USER_DATA_KEY_IS_FAVORITE,\r
-)\r
-\r
-CONF_URL = "url"\r
-CONF_USERNAME = "username"\r
-CONF_PASSWORD = "password"\r
-FAKE_ARTIST_PREFIX = "_fake://"\r
-\r
-\r
-async def setup(\r
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig\r
-) -> ProviderInstanceType:\r
- """Initialize provider(instance) with given configuration."""\r
- prov = JellyfinProvider(mass, manifest, config)\r
- await prov.handle_async_init()\r
- return prov\r
-\r
-\r
-async def get_config_entries(\r
- mass: MusicAssistant,\r
- instance_id: str | None = None, # pylint: disable=W0613\r
- action: str | None = None,\r
- values: dict[str, ConfigValueType] | None = None,\r
-) -> tuple[ConfigEntry, ...]:\r
- """\r
- Return Config entries to setup this provider.\r
-\r
- instance_id: id of an existing provider instance (None if new instance setup).\r
- action: [optional] action key called from config entries UI.\r
- values: the (intermediate) raw values for config entries sent with the action.\r
- """\r
- # config flow auth action/step (authenticate button clicked)\r
- # ruff: noqa: ARG001\r
- return (\r
- ConfigEntry(\r
- key=CONF_URL,\r
- type=ConfigEntryType.STRING,\r
- label="Server",\r
- required=True,\r
- description="The url of the Jellyfin server to connect to.",\r
- ),\r
- ConfigEntry(\r
- key=CONF_USERNAME,\r
- type=ConfigEntryType.STRING,\r
- label="Username",\r
- required=True,\r
- description="The username to authenticate to the remote server."\r
- "the remote host, For example 'media'.",\r
- ),\r
- ConfigEntry(\r
- key=CONF_PASSWORD,\r
- type=ConfigEntryType.SECURE_STRING,\r
- label="Password",\r
- required=False,\r
- description="The password to authenticate to the remote server.",\r
- ),\r
- )\r
-\r
-\r
-class JellyfinProvider(MusicProvider):\r
- """Provider for a jellyfin music library."""\r
-\r
- # _jellyfin_server : JellyfinClient = None\r
-\r
- async def handle_async_init(self) -> None:\r
- """Initialize provider(instance) with given configuration."""\r
-\r
- def connect() -> JellyfinClient:\r
- try:\r
- client = JellyfinClient()\r
- device_name = socket.gethostname()\r
- device_id = str(uuid.uuid4())\r
- client.config.app(USER_APP_NAME, CLIENT_VERSION, device_name, device_id)\r
- if CONF_URL.startswith("https://"):\r
- JellyfinClient.config.data["auth.ssl"] = True\r
- else:\r
- client.config.data["auth.ssl"] = False\r
- jellyfin_server_url = self.config.get_value(CONF_URL)\r
- jellyfin_server_user = self.config.get_value(CONF_USERNAME)\r
- jellyfin_server_password = self.config.get_value(CONF_PASSWORD)\r
- client.auth.connect_to_address(jellyfin_server_url)\r
- client.auth.login(\r
- jellyfin_server_url, jellyfin_server_user, jellyfin_server_password\r
- )\r
- credentials = client.auth.credentials.get_credentials()\r
- if not credentials["Servers"]:\r
- raise IndexError("No servers found")\r
- server = credentials["Servers"][0]\r
- server["username"] = jellyfin_server_user\r
- _jellyfin_server = client\r
- # json.dumps(server)\r
- except Exception as err:\r
- msg = f"Authentication failed: {err}"\r
- raise LoginFailed(msg) from err\r
- return _jellyfin_server\r
-\r
- self._jellyfin_server = await self._run_async(connect)\r
-\r
- @property\r
- def supported_features(self) -> tuple[ProviderFeature, ...]:\r
- """Return a list of supported features."""\r
- return (\r
- ProviderFeature.LIBRARY_ARTISTS,\r
- ProviderFeature.LIBRARY_ALBUMS,\r
- ProviderFeature.LIBRARY_TRACKS,\r
- ProviderFeature.LIBRARY_PLAYLISTS,\r
- ProviderFeature.BROWSE,\r
- ProviderFeature.SEARCH,\r
- ProviderFeature.ARTIST_ALBUMS,\r
- )\r
-\r
- @property\r
- def is_streaming_provider(self) -> bool:\r
- """Return True if the provider is a streaming provider."""\r
- return False\r
-\r
- async def _run_async(self, call: Callable, *args, **kwargs):\r
- return await self.mass.create_task(call, *args, **kwargs)\r
-\r
- def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:\r
- return ItemMapping(\r
- media_type=media_type,\r
- item_id=key,\r
- provider=self.instance_id,\r
- name=name,\r
- )\r
-\r
- async def _parse(self, jellyfin_media) -> MediaItem | None:\r
- if jellyfin_media.type == "artist":\r
- return await self._parse_artist(jellyfin_media)\r
- elif jellyfin_media.type == "album":\r
- return await self._parse_album(jellyfin_media)\r
- elif jellyfin_media.type == "track":\r
- return await self._parse_track(jellyfin_media)\r
- elif jellyfin_media.type == "playlist":\r
- return await self._parse_playlist(jellyfin_media)\r
- return None\r
-\r
- async def _search_track(self, search_query, limit) -> list[dict[str, Any]]:\r
- resultset = await self._run_async(\r
- API.search_media_items,\r
- self._jellyfin_server.jellyfin,\r
- term=search_query,\r
- media=ITEM_TYPE_AUDIO,\r
- limit=limit,\r
- )\r
- return resultset["Items"]\r
-\r
- async def _search_album(self, search_query, limit) -> list[dict[str, Any]]:\r
- if "-" in search_query:\r
- searchterms = search_query.split(" - ")\r
- albumname = searchterms[1]\r
- else:\r
- albumname = search_query\r
- resultset = await self._run_async(\r
- API.search_media_items,\r
- self._jellyfin_server.jellyfin,\r
- term=albumname,\r
- media=ITEM_TYPE_ALBUM,\r
- limit=limit,\r
- )\r
- return resultset["Items"]\r
-\r
- async def _search_artist(self, search_query, limit) -> list[dict[str, Any]]:\r
- resultset = await self._run_async(\r
- API.search_media_items,\r
- self._jellyfin_server.jellyfin,\r
- term=search_query,\r
- media=ITEM_TYPE_ARTIST,\r
- limit=limit,\r
- )\r
- return resultset["Items"]\r
-\r
- async def _search_playlist(self, search_query, limit) -> list[dict[str, Any]]:\r
- resultset = await self._run_async(\r
- API.search_media_items,\r
- self._jellyfin_server.jellyfin,\r
- term=search_query,\r
- media="Playlist",\r
- limit=limit,\r
- )\r
- return resultset["Items"]\r
-\r
- async def _search_and_parse(\r
- self, search_coro: Coroutine, parse_coro: Callable\r
- ) -> list[MediaItem]:\r
- task_results = []\r
- async with TaskGroup() as tg:\r
- for item in await search_coro:\r
- task_results.append(tg.create_task(parse_coro(item)))\r
-\r
- results = []\r
- for task in task_results:\r
- results.append(task.result())\r
-\r
- return results\r
-\r
- async def _parse_album(self, jellyfin_album: dict[str, Any]) -> Album:\r
- """Parse a Jellyfin Album response to an Album model object."""\r
- album_id = jellyfin_album[ITEM_KEY_ID]\r
- album = Album(\r
- item_id=album_id,\r
- provider=self.domain,\r
- name=jellyfin_album[ITEM_KEY_NAME],\r
- provider_mappings={\r
- ProviderMapping(\r
- item_id=str(album_id),\r
- provider_domain=self.domain,\r
- provider_instance=self.instance_id,\r
- )\r
- },\r
- )\r
- current_jellyfin_album = API.get_item(self._jellyfin_server.jellyfin, album_id)\r
- if ITEM_KEY_PRODUCTION_YEAR in current_jellyfin_album:\r
- album.year = current_jellyfin_album[ITEM_KEY_PRODUCTION_YEAR]\r
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_album):\r
- album.metadata.images = [\r
- MediaItemImage(\r
- type=ImageType.THUMB,\r
- path=thumb,\r
- provider=self.instance_id,\r
- remotely_accessible=False,\r
- )\r
- ]\r
- if ITEM_KEY_OVERVIEW in current_jellyfin_album:\r
- album.metadata.description = current_jellyfin_album[ITEM_KEY_OVERVIEW]\r
- if ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP in current_jellyfin_album[ITEM_KEY_PROVIDER_IDS]:\r
- try:\r
- album.mbid = current_jellyfin_album[ITEM_KEY_PROVIDER_IDS][\r
- ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP\r
- ]\r
- except InvalidDataError as error:\r
- self.logger.warning(\r
- "Jellyfin has an invalid musicbrainz id for album %s",\r
- album.name,\r
- exc_info=error if self.logger.isEnabledFor(logging.DEBUG) else None,\r
- )\r
- if ITEM_KEY_SORT_NAME in current_jellyfin_album:\r
- album.sort_name = current_jellyfin_album[ITEM_KEY_SORT_NAME]\r
- if ITEM_KEY_ALBUM_ARTIST in current_jellyfin_album:\r
- album.artists.append(\r
- self._get_item_mapping(\r
- MediaType.ARTIST,\r
- current_jellyfin_album[ITEM_KEY_ALBUM_ARTISTS][0].get(ITEM_KEY_ID),\r
- current_jellyfin_album[ITEM_KEY_ALBUM_ARTIST],\r
- )\r
- )\r
- elif len(current_jellyfin_album.get(ITEM_KEY_ARTIST_ITEMS, [])) >= 1:\r
- for artist_item in current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS]:\r
- album.artists.append(\r
- self._get_item_mapping(\r
- MediaType.ARTIST,\r
- artist_item[ITEM_KEY_ID],\r
- artist_item[ITEM_KEY_NAME],\r
- )\r
- )\r
- user_data = current_jellyfin_album.get(ITEM_KEY_USER_DATA, {})\r
- album.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)\r
- return album\r
-\r
- async def _parse_artist(self, jellyfin_artist: dict[str, Any]) -> Artist:\r
- """Parse a Jellyfin Artist response to Artist model object."""\r
- artist_id = jellyfin_artist[ITEM_KEY_ID]\r
- current_artist = API.get_item(self._jellyfin_server.jellyfin, artist_id)\r
- if not artist_id:\r
- msg = "Artist does not have a valid ID"\r
- raise InvalidDataError(msg)\r
- artist = Artist(\r
- item_id=artist_id,\r
- name=jellyfin_artist[ITEM_KEY_NAME],\r
- provider=self.domain,\r
- provider_mappings={\r
- ProviderMapping(\r
- item_id=str(artist_id),\r
- provider_domain=self.domain,\r
- provider_instance=self.instance_id,\r
- )\r
- },\r
- )\r
- if ITEM_KEY_OVERVIEW in current_artist:\r
- artist.metadata.description = current_artist[ITEM_KEY_OVERVIEW]\r
- if ITEM_KEY_MUSICBRAINZ_ARTIST in current_artist[ITEM_KEY_PROVIDER_IDS]:\r
- try:\r
- artist.mbid = current_artist[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_ARTIST]\r
- except InvalidDataError as error:\r
- self.logger.warning(\r
- "Jellyfin has an invalid musicbrainz id for artist %s",\r
- artist.name,\r
- exc_info=error if self.logger.isEnabledFor(logging.DEBUG) else None,\r
- )\r
- if ITEM_KEY_SORT_NAME in current_artist:\r
- artist.sort_name = current_artist[ITEM_KEY_SORT_NAME]\r
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_artist):\r
- artist.metadata.images = [\r
- MediaItemImage(\r
- type=ImageType.THUMB,\r
- path=thumb,\r
- provider=self.instance_id,\r
- remotely_accessible=False,\r
- )\r
- ]\r
- user_data = current_artist.get(ITEM_KEY_USER_DATA, {})\r
- artist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)\r
- return artist\r
-\r
- async def _parse_track(self, jellyfin_track: dict[str, Any]) -> Track:\r
- """Parse a Jellyfin Track response to a Track model object."""\r
- current_jellyfin_track = await asyncio.to_thread(\r
- API.get_item, self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID]\r
- )\r
- available = False\r
- content = None\r
- available = current_jellyfin_track[ITEM_KEY_CAN_DOWNLOAD]\r
- content = current_jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CODEC]\r
- track = Track(\r
- item_id=jellyfin_track[ITEM_KEY_ID],\r
- provider=self.instance_id,\r
- name=jellyfin_track[ITEM_KEY_NAME],\r
- provider_mappings={\r
- ProviderMapping(\r
- item_id=jellyfin_track[ITEM_KEY_ID],\r
- provider_domain=self.domain,\r
- provider_instance=self.instance_id,\r
- available=available,\r
- audio_format=AudioFormat(\r
- content_type=(\r
- ContentType.try_parse(content) if content else ContentType.UNKNOWN\r
- ),\r
- ),\r
- url=self._get_stream_url(self._jellyfin_server, jellyfin_track[ITEM_KEY_ID]),\r
- )\r
- },\r
- )\r
-\r
- track.disc_number = current_jellyfin_track.get(ITEM_KEY_PARENT_INDEX_NUM, 1)\r
- if "IndexNumber" in current_jellyfin_track:\r
- if current_jellyfin_track["IndexNumber"] >= 1:\r
- track_idx = current_jellyfin_track["IndexNumber"]\r
- track.track_number = track_idx\r
- track.position = track_idx\r
-\r
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_track):\r
- track.metadata.images = [\r
- MediaItemImage(\r
- type=ImageType.THUMB,\r
- path=thumb,\r
- provider=self.instance_id,\r
- remotely_accessible=False,\r
- )\r
- ]\r
-\r
- if current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]:\r
- for artist_item in current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]:\r
- track.artists.append(\r
- self._get_item_mapping(\r
- MediaType.ARTIST,\r
- artist_item[ITEM_KEY_ID],\r
- artist_item[ITEM_KEY_NAME],\r
- )\r
- )\r
- elif ITEM_KEY_ALBUM_ID in current_jellyfin_track:\r
- parent_album = API.get_item(\r
- self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_ALBUM_ID]\r
- )\r
- if ITEM_KEY_ALBUM_ARTISTS in parent_album:\r
- for artist_item in parent_album[ITEM_KEY_ALBUM_ARTISTS]:\r
- track.artists.append(\r
- self._get_item_mapping(\r
- MediaType.ARTIST,\r
- artist_item[ITEM_KEY_ID],\r
- artist_item[ITEM_KEY_NAME],\r
- )\r
- )\r
- else:\r
- track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))\r
- else:\r
- track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))\r
- if ITEM_KEY_ALBUM_ID in current_jellyfin_track and ITEM_KEY_ALBUM in current_jellyfin_track:\r
- track.album = self._get_item_mapping(\r
- MediaType.ALBUM,\r
- current_jellyfin_track[ITEM_KEY_ALBUM_ID],\r
- current_jellyfin_track[ITEM_KEY_ALBUM],\r
- )\r
- elif ITEM_KEY_ALBUM_ID in current_jellyfin_track:\r
- parent_album = API.get_item(\r
- self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_ALBUM_ID]\r
- )\r
- track.album = self._get_item_mapping(\r
- MediaType.ALBUM,\r
- parent_album[ITEM_KEY_ID],\r
- parent_album[ITEM_KEY_NAME],\r
- )\r
- if ITEM_KEY_PARENT_INDEX_NUM in current_jellyfin_track:\r
- track.disc_number = current_jellyfin_track[ITEM_KEY_PARENT_INDEX_NUM]\r
- if ITEM_KEY_RUNTIME_TICKS in current_jellyfin_track:\r
- track.duration = int(\r
- current_jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000\r
- ) # 10000000 ticks per millisecond\r
- track.track_number = current_jellyfin_track.get(ITEM_KEY_INDEX_NUMBER, 99)\r
- if ITEM_KEY_MUSICBRAINZ_TRACK in current_jellyfin_track[ITEM_KEY_PROVIDER_IDS]:\r
- track_mbid = current_jellyfin_track[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_TRACK]\r
- try:\r
- track.mbid = track_mbid\r
- except InvalidDataError as error:\r
- self.logger.warning(\r
- "Jellyfin has an invalid musicbrainz id for track %s",\r
- track.name,\r
- exc_info=error if self.logger.isEnabledFor(logging.DEBUG) else None,\r
- )\r
- user_data = current_jellyfin_track.get(ITEM_KEY_USER_DATA, {})\r
- track.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)\r
- return track\r
-\r
- async def _parse_playlist(self, jellyfin_playlist: dict[str, Any]) -> Playlist:\r
- """Parse a Jellyfin Playlist response to a Playlist object."""\r
- playlistid = jellyfin_playlist[ITEM_KEY_ID]\r
- playlist = Playlist(\r
- item_id=playlistid,\r
- provider=self.domain,\r
- name=jellyfin_playlist[ITEM_KEY_NAME],\r
- provider_mappings={\r
- ProviderMapping(\r
- item_id=playlistid,\r
- provider_domain=self.domain,\r
- provider_instance=self.instance_id,\r
- )\r
- },\r
- )\r
- if ITEM_KEY_OVERVIEW in jellyfin_playlist:\r
- playlist.metadata.description = jellyfin_playlist[ITEM_KEY_OVERVIEW]\r
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_playlist):\r
- playlist.metadata.images = [\r
- MediaItemImage(\r
- type=ImageType.THUMB,\r
- path=thumb,\r
- provider=self.instance_id,\r
- remotely_accessible=False,\r
- )\r
- ]\r
- user_data = jellyfin_playlist.get(ITEM_KEY_USER_DATA, {})\r
- playlist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)\r
- playlist.is_editable = False\r
- return playlist\r
-\r
- async def search(\r
- self,\r
- search_query: str,\r
- media_types: list[MediaType],\r
- limit: int = 20,\r
- ) -> SearchResults:\r
- """Perform search on the plex library.\r
-\r
- :param search_query: Search query.\r
- :param media_types: A list of media_types to include. All types if None.\r
- :param limit: Number of items to return in the search (per type).\r
- """\r
- tasks = {}\r
-\r
- async with TaskGroup() as tg:\r
- for media_type in media_types:\r
- if media_type == MediaType.ARTIST:\r
- tasks[MediaType.ARTIST] = tg.create_task(\r
- self._search_and_parse(\r
- self._search_artist(search_query, limit), self._parse_artist\r
- )\r
- )\r
- elif media_type == MediaType.ALBUM:\r
- tasks[MediaType.ALBUM] = tg.create_task(\r
- self._search_and_parse(\r
- self._search_album(search_query, limit), self._parse_album\r
- )\r
- )\r
- elif media_type == MediaType.TRACK:\r
- tasks[MediaType.TRACK] = tg.create_task(\r
- self._search_and_parse(\r
- self._search_track(search_query, limit), self._parse_track\r
- )\r
- )\r
- elif media_type == MediaType.PLAYLIST:\r
- tasks[MediaType.PLAYLIST] = tg.create_task(\r
- self._search_and_parse(\r
- self._search_playlist(search_query, limit), self._parse_playlist\r
- )\r
- )\r
-\r
- search_results = SearchResults()\r
-\r
- for media_type, task in tasks.items():\r
- if media_type == MediaType.ARTIST:\r
- search_results.artists = task.result()\r
- elif media_type == MediaType.ALBUM:\r
- search_results.albums = task.result()\r
- elif media_type == MediaType.TRACK:\r
- search_results.tracks = task.result()\r
- elif media_type == MediaType.PLAYLIST:\r
- search_results.playlists = task.result()\r
-\r
- return search_results\r
-\r
- async def get_library_artists(self) -> AsyncGenerator[Artist, None]:\r
- """Retrieve all library artists from Jellyfin Music."""\r
- jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)\r
- for jellyfin_library in jellyfin_libraries:\r
- response = API._get(\r
- self._jellyfin_server.jellyfin,\r
- "Artists",\r
- {\r
- ITEM_KEY_PARENT_ID: jellyfin_library[ITEM_KEY_ID],\r
- "ArtistType": "Artist,AlbumArtist",\r
- },\r
- )\r
- artists_obj = response["Items"]\r
- for artist in artists_obj:\r
- yield await self._parse_artist(artist)\r
-\r
- async def get_library_albums(self) -> AsyncGenerator[Album, None]:\r
- """Retrieve all library albums from Jellyfin Music."""\r
- jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)\r
- for jellyfin_library in jellyfin_libraries:\r
- artists_obj = await self._get_children(\r
- self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST\r
- )\r
- for artist in artists_obj:\r
- albums_obj = await self._get_children(\r
- self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM\r
- )\r
- for album in albums_obj:\r
- yield await self._parse_album(album)\r
-\r
- async def get_library_tracks(self) -> AsyncGenerator[Track, None]:\r
- """Retrieve library tracks from Jellyfin Music."""\r
- jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)\r
- self._jellyfin_server.default_timeout = 120\r
- for jellyfin_library in jellyfin_libraries:\r
- artists_obj = await self._get_children(\r
- self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST\r
- )\r
- for artist in artists_obj:\r
- albums_obj = await self._get_children(\r
- self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM\r
- )\r
- for album in albums_obj:\r
- tracks_obj = await self._get_children(\r
- self._jellyfin_server, album[ITEM_KEY_ID], ITEM_TYPE_AUDIO\r
- )\r
- for track in tracks_obj:\r
- yield await self._parse_track(track)\r
-\r
- async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:\r
- """Retrieve all library playlists from the provider."""\r
- playlist_libraries = await self._get_playlists(self._jellyfin_server)\r
- for playlist_library in playlist_libraries:\r
- playlists_obj = await self._get_children(\r
- self._jellyfin_server, playlist_library[ITEM_KEY_ID], "Playlist"\r
- )\r
- for playlist in playlists_obj:\r
- if "MediaType" in playlist: # Only jellyfin has this property\r
- if playlist["MediaType"] == "Audio":\r
- yield await self._parse_playlist(playlist)\r
- else: # emby playlists are only audio type\r
- yield await self._parse_playlist(playlist)\r
-\r
- async def get_album(self, prov_album_id) -> Album:\r
- """Get full album details by id."""\r
- if jellyfin_album := API.get_item(self._jellyfin_server.jellyfin, prov_album_id):\r
- return await self._run_async(self._parse_album(jellyfin_album))\r
- msg = f"Item {prov_album_id} not found"\r
- raise MediaNotFoundError(msg)\r
-\r
- async def get_album_tracks(self, prov_album_id: str) -> list[Track]:\r
- """Get album tracks for given album id."""\r
- jellyfin_album_tracks = await self._get_children(\r
- self._jellyfin_server, prov_album_id, ITEM_TYPE_AUDIO\r
- )\r
- return [\r
- await self._parse_track(jellyfin_album_track)\r
- for jellyfin_album_track in jellyfin_album_tracks\r
- ]\r
-\r
- async def get_artist(self, prov_artist_id) -> Artist:\r
- """Get full artist details by id."""\r
- if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):\r
- # This artist does not exist in jellyfin, so we can just load it from DB.\r
-\r
- if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(\r
- prov_artist_id, self.instance_id\r
- ):\r
- return db_artist\r
- msg = f"Artist not found: {prov_artist_id}"\r
- raise MediaNotFoundError(msg)\r
-\r
- if jellyfin_artist := API.get_item(self._jellyfin_server.jellyfin, prov_artist_id):\r
- return await self._parse_artist(jellyfin_artist)\r
- msg = f"Item {prov_artist_id} not found"\r
- raise MediaNotFoundError(msg)\r
-\r
- async def get_track(self, prov_track_id) -> Track:\r
- """Get full track details by id."""\r
- if jellyfin_track := API.get_item(self._jellyfin_server.jellyfin, prov_track_id):\r
- return await self._parse_track(jellyfin_track)\r
- msg = f"Item {prov_track_id} not found"\r
- raise MediaNotFoundError(msg)\r
-\r
- async def get_playlist(self, prov_playlist_id) -> Playlist:\r
- """Get full playlist details by id."""\r
- if jellyfin_playlist := API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id):\r
- return await self._parse_playlist(jellyfin_playlist)\r
- msg = f"Item {prov_playlist_id} not found"\r
- raise MediaNotFoundError(msg)\r
-\r
- async def get_playlist_tracks(\r
- self, prov_playlist_id: str, offset: int, limit: int\r
- ) -> list[Track]:\r
- """Get playlist tracks."""\r
- result: list[Track] = []\r
- if offset:\r
- # paging not supported, we always return the whole list at once\r
- return []\r
- # TODO: Does Jellyfin support paging here?\r
- jellyfin_playlist = API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id)\r
- playlist_items = await self._get_children(\r
- self._jellyfin_server, jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO\r
- )\r
- if not playlist_items:\r
- return result\r
- for index, jellyfin_track in enumerate(playlist_items, 1):\r
- try:\r
- if track := await self._parse_track(jellyfin_track):\r
- if not track.position:\r
- track.position = offset + index\r
- result.append(track)\r
- except (KeyError, ValueError) as err:\r
- self.logger.error(\r
- "Skipping track %s: %s", jellyfin_track.get(ITEM_KEY_NAME, index), str(err)\r
- )\r
- return result\r
-\r
- async def get_artist_albums(self, prov_artist_id) -> list[Album]:\r
- """Get a list of albums for the given artist."""\r
- if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):\r
- artists_obj = await self._get_children(\r
- self._jellyfin_server, prov_artist_id, ITEM_TYPE_ARTIST\r
- )\r
- for artist in artists_obj:\r
- jellyfin_albums = await self._get_children(\r
- self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM\r
- )\r
- if jellyfin_albums:\r
- albums = []\r
- for album_obj in jellyfin_albums:\r
- albums.append(await self._parse_album(album_obj))\r
- return albums\r
- return []\r
-\r
- async def get_stream_details(self, item_id: str) -> StreamDetails:\r
- """Return the content details for the given track when it will be streamed."""\r
- jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id)\r
- mimetype = self._media_mime_type(jellyfin_track)\r
- media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]\r
- url = API.audio_url(\r
- self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS\r
- )\r
- if ITEM_KEY_MEDIA_CODEC in media_stream:\r
- content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])\r
- else:\r
- content_type = ContentType.try_parse(mimetype)\r
- return StreamDetails(\r
- item_id=jellyfin_track[ITEM_KEY_ID],\r
- provider=self.instance_id,\r
- audio_format=AudioFormat(\r
- content_type=content_type,\r
- channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS],\r
- ),\r
- stream_type=StreamType.HTTP,\r
- duration=int(\r
- jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000\r
- ), # 10000000 ticks per millisecond)\r
- path=url,\r
- )\r
-\r
- def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None:\r
- """Return the URL for the primary image of a media item if available."""\r
- image_tags = media_item[ITEM_KEY_IMAGE_TAGS]\r
-\r
- if "Primary" not in image_tags:\r
- return None\r
-\r
- item_id = media_item[ITEM_KEY_ID]\r
- return API.artwork(client.jellyfin, item_id, "Primary", MAX_IMAGE_WIDTH)\r
-\r
- def _get_stream_url(self, client: JellyfinClient, media_item: str) -> str:\r
- """Return the stream URL for a media item."""\r
- return API.audio_url(client.jellyfin, media_item) # type: ignore[no-any-return]\r
-\r
- async def _get_children(\r
- self, client: JellyfinClient, parent_id: str, item_type: str\r
- ) -> list[dict[str, Any]]:\r
- """Return all children for the parent_id whose item type is item_type."""\r
- params = {\r
- "Recursive": "true",\r
- ITEM_KEY_PARENT_ID: parent_id,\r
- }\r
- if item_type in ITEM_TYPE_ARTIST:\r
- params["IncludeItemTypes"] = f"{ITEM_TYPE_MUSICARTISTS},{ITEM_TYPE_ARTIST}"\r
- else:\r
- params["IncludeItemTypes"] = item_type\r
- if item_type in ITEM_TYPE_AUDIO:\r
- params["Fields"] = ITEM_KEY_MEDIA_SOURCES\r
-\r
- result = client.jellyfin.user_items("", params)\r
- return result["Items"]\r
-\r
- async def _get_music_libraries(self, client: JellyfinClient) -> list[dict[str, Any]]:\r
- """Return all supported libraries a user has access to."""\r
- response = API.get_media_folders(client.jellyfin)\r
- libraries = response["Items"]\r
- result = []\r
- for library in libraries:\r
- if ITEM_KEY_COLLECTION_TYPE in library and library[ITEM_KEY_COLLECTION_TYPE] in "music":\r
- result.append(library)\r
- return result\r
-\r
- async def _get_playlists(self, client: JellyfinClient) -> list[dict[str, Any]]:\r
- """Return all supported libraries a user has access to."""\r
- response = API.get_media_folders(client.jellyfin)\r
- libraries = response["Items"]\r
- result = []\r
- for library in libraries:\r
- if (\r
- ITEM_KEY_COLLECTION_TYPE in library\r
- and library[ITEM_KEY_COLLECTION_TYPE] in "playlists"\r
- ):\r
- result.append(library)\r
- return result\r
-\r
- def _media_mime_type(self, media_item: dict[str, Any]) -> str | None:\r
- """Return the mime type of a media item."""\r
- if not media_item.get(ITEM_KEY_MEDIA_SOURCES):\r
- return None\r
-\r
- media_source = media_item[ITEM_KEY_MEDIA_SOURCES][0]\r
-\r
- if "Path" not in media_source:\r
- return None\r
-\r
- path = media_source["Path"]\r
- mime_type, _ = mimetypes.guess_type(path)\r
-\r
- return mime_type\r
+"""Jellyfin support for MusicAssistant."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import mimetypes
+import socket
+import uuid
+from asyncio import TaskGroup
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator, Callable, Coroutine
+
+from jellyfin_apiclient_python import JellyfinClient
+from jellyfin_apiclient_python.api import API
+
+from music_assistant.common.models.config_entries import (
+ ConfigEntry,
+ ConfigValueType,
+ ProviderConfig,
+)
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
+from music_assistant.common.models.media_items import (
+ Album,
+ Artist,
+ AudioFormat,
+ ItemMapping,
+ MediaItem,
+ MediaItemImage,
+ Playlist,
+ ProviderMapping,
+ SearchResults,
+ Track,
+)
+from music_assistant.common.models.streamdetails import StreamDetails
+
+if TYPE_CHECKING:
+ from music_assistant.common.models.provider import ProviderManifest
+
+from music_assistant.constants import VARIOUS_ARTISTS_NAME
+
+if TYPE_CHECKING:
+ from music_assistant.server import MusicAssistant
+if TYPE_CHECKING:
+ from music_assistant.server.models import ProviderInstanceType
+
+from music_assistant.server.models.music_provider import MusicProvider
+
+from .const import (
+ CLIENT_VERSION,
+ ITEM_KEY_ALBUM,
+ ITEM_KEY_ALBUM_ARTIST,
+ ITEM_KEY_ALBUM_ARTISTS,
+ ITEM_KEY_ALBUM_ID,
+ ITEM_KEY_ARTIST_ITEMS,
+ ITEM_KEY_CAN_DOWNLOAD,
+ ITEM_KEY_COLLECTION_TYPE,
+ ITEM_KEY_ID,
+ ITEM_KEY_IMAGE_TAGS,
+ ITEM_KEY_INDEX_NUMBER,
+ ITEM_KEY_MEDIA_CHANNELS,
+ ITEM_KEY_MEDIA_CODEC,
+ ITEM_KEY_MEDIA_SOURCES,
+ ITEM_KEY_MEDIA_STREAMS,
+ ITEM_KEY_MUSICBRAINZ_ARTIST,
+ ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP,
+ ITEM_KEY_MUSICBRAINZ_TRACK,
+ ITEM_KEY_NAME,
+ ITEM_KEY_OVERVIEW,
+ ITEM_KEY_PARENT_ID,
+ ITEM_KEY_PARENT_INDEX_NUM,
+ ITEM_KEY_PRODUCTION_YEAR,
+ ITEM_KEY_PROVIDER_IDS,
+ ITEM_KEY_RUNTIME_TICKS,
+ ITEM_KEY_SORT_NAME,
+ ITEM_KEY_USER_DATA,
+ ITEM_TYPE_ALBUM,
+ ITEM_TYPE_ARTIST,
+ ITEM_TYPE_AUDIO,
+ ITEM_TYPE_MUSICARTISTS,
+ MAX_IMAGE_WIDTH,
+ SUPPORTED_CONTAINER_FORMATS,
+ USER_APP_NAME,
+ USER_DATA_KEY_IS_FAVORITE,
+)
+
+CONF_URL = "url"
+CONF_USERNAME = "username"
+CONF_PASSWORD = "password"
+FAKE_ARTIST_PREFIX = "_fake://"
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ prov = JellyfinProvider(mass, manifest, config)
+ await prov.handle_async_init()
+ return prov
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None, # pylint: disable=W0613
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # config flow auth action/step (authenticate button clicked)
+ # ruff: noqa: ARG001
+ return (
+ ConfigEntry(
+ key=CONF_URL,
+ type=ConfigEntryType.STRING,
+ label="Server",
+ required=True,
+ description="The url of the Jellyfin server to connect to.",
+ ),
+ ConfigEntry(
+ key=CONF_USERNAME,
+ type=ConfigEntryType.STRING,
+ label="Username",
+ required=True,
+ description="The username to authenticate to the remote server."
+ "the remote host, For example 'media'.",
+ ),
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Password",
+ required=False,
+ description="The password to authenticate to the remote server.",
+ ),
+ )
+
+
+class JellyfinProvider(MusicProvider):
+ """Provider for a jellyfin music library."""
+
+ # _jellyfin_server : JellyfinClient = None
+
+ async def handle_async_init(self) -> None:
+ """Initialize provider(instance) with given configuration."""
+
+ def connect() -> JellyfinClient:
+ try:
+ client = JellyfinClient()
+ device_name = socket.gethostname()
+ device_id = str(uuid.uuid4())
+ client.config.app(USER_APP_NAME, CLIENT_VERSION, device_name, device_id)
+ if CONF_URL.startswith("https://"):
+ JellyfinClient.config.data["auth.ssl"] = True
+ else:
+ client.config.data["auth.ssl"] = False
+ jellyfin_server_url = self.config.get_value(CONF_URL)
+ jellyfin_server_user = self.config.get_value(CONF_USERNAME)
+ jellyfin_server_password = self.config.get_value(CONF_PASSWORD)
+ client.auth.connect_to_address(jellyfin_server_url)
+ client.auth.login(
+ jellyfin_server_url, jellyfin_server_user, jellyfin_server_password
+ )
+ credentials = client.auth.credentials.get_credentials()
+ if not credentials["Servers"]:
+ raise IndexError("No servers found")
+ server = credentials["Servers"][0]
+ server["username"] = jellyfin_server_user
+ _jellyfin_server = client
+ # json.dumps(server)
+ except Exception as err:
+ msg = f"Authentication failed: {err}"
+ raise LoginFailed(msg) from err
+ return _jellyfin_server
+
+ self._jellyfin_server = await self._run_async(connect)
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return a list of supported features."""
+ return (
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_ALBUMS,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ ProviderFeature.BROWSE,
+ ProviderFeature.SEARCH,
+ ProviderFeature.ARTIST_ALBUMS,
+ )
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ return False
+
+ async def _run_async(self, call: Callable, *args, **kwargs):
+ return await self.mass.create_task(call, *args, **kwargs)
+
+ def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
+ return ItemMapping(
+ media_type=media_type,
+ item_id=key,
+ provider=self.instance_id,
+ name=name,
+ )
+
+ async def _parse(self, jellyfin_media) -> MediaItem | None:
+ if jellyfin_media.type == "artist":
+ return await self._parse_artist(jellyfin_media)
+ elif jellyfin_media.type == "album":
+ return await self._parse_album(jellyfin_media)
+ elif jellyfin_media.type == "track":
+ return await self._parse_track(jellyfin_media)
+ elif jellyfin_media.type == "playlist":
+ return await self._parse_playlist(jellyfin_media)
+ return None
+
+ async def _search_track(self, search_query, limit) -> list[dict[str, Any]]:
+ resultset = await self._run_async(
+ API.search_media_items,
+ self._jellyfin_server.jellyfin,
+ term=search_query,
+ media=ITEM_TYPE_AUDIO,
+ limit=limit,
+ )
+ return resultset["Items"]
+
+ async def _search_album(self, search_query, limit) -> list[dict[str, Any]]:
+ if "-" in search_query:
+ searchterms = search_query.split(" - ")
+ albumname = searchterms[1]
+ else:
+ albumname = search_query
+ resultset = await self._run_async(
+ API.search_media_items,
+ self._jellyfin_server.jellyfin,
+ term=albumname,
+ media=ITEM_TYPE_ALBUM,
+ limit=limit,
+ )
+ return resultset["Items"]
+
+ async def _search_artist(self, search_query, limit) -> list[dict[str, Any]]:
+ resultset = await self._run_async(
+ API.search_media_items,
+ self._jellyfin_server.jellyfin,
+ term=search_query,
+ media=ITEM_TYPE_ARTIST,
+ limit=limit,
+ )
+ return resultset["Items"]
+
+ async def _search_playlist(self, search_query, limit) -> list[dict[str, Any]]:
+ resultset = await self._run_async(
+ API.search_media_items,
+ self._jellyfin_server.jellyfin,
+ term=search_query,
+ media="Playlist",
+ limit=limit,
+ )
+ return resultset["Items"]
+
+ async def _search_and_parse(
+ self, search_coro: Coroutine, parse_coro: Callable
+ ) -> list[MediaItem]:
+ task_results = []
+ async with TaskGroup() as tg:
+ for item in await search_coro:
+ task_results.append(tg.create_task(parse_coro(item)))
+
+ results = []
+ for task in task_results:
+ results.append(task.result())
+
+ return results
+
+ async def _parse_album(self, jellyfin_album: dict[str, Any]) -> Album:
+ """Parse a Jellyfin Album response to an Album model object."""
+ album_id = jellyfin_album[ITEM_KEY_ID]
+ album = Album(
+ item_id=album_id,
+ provider=self.domain,
+ name=jellyfin_album[ITEM_KEY_NAME],
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(album_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ current_jellyfin_album = API.get_item(self._jellyfin_server.jellyfin, album_id)
+ if ITEM_KEY_PRODUCTION_YEAR in current_jellyfin_album:
+ album.year = current_jellyfin_album[ITEM_KEY_PRODUCTION_YEAR]
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_album):
+ album.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ if ITEM_KEY_OVERVIEW in current_jellyfin_album:
+ album.metadata.description = current_jellyfin_album[ITEM_KEY_OVERVIEW]
+ if ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP in current_jellyfin_album[ITEM_KEY_PROVIDER_IDS]:
+ try:
+ album.mbid = current_jellyfin_album[ITEM_KEY_PROVIDER_IDS][
+ ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP
+ ]
+ except InvalidDataError as error:
+ self.logger.warning(
+ "Jellyfin has an invalid musicbrainz id for album %s",
+ album.name,
+ exc_info=error if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ if ITEM_KEY_SORT_NAME in current_jellyfin_album:
+ album.sort_name = current_jellyfin_album[ITEM_KEY_SORT_NAME]
+ if ITEM_KEY_ALBUM_ARTIST in current_jellyfin_album:
+ album.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ current_jellyfin_album[ITEM_KEY_ALBUM_ARTISTS][0].get(ITEM_KEY_ID),
+ current_jellyfin_album[ITEM_KEY_ALBUM_ARTIST],
+ )
+ )
+ elif len(current_jellyfin_album.get(ITEM_KEY_ARTIST_ITEMS, [])) >= 1:
+ for artist_item in current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS]:
+ album.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ artist_item[ITEM_KEY_ID],
+ artist_item[ITEM_KEY_NAME],
+ )
+ )
+ user_data = current_jellyfin_album.get(ITEM_KEY_USER_DATA, {})
+ album.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
+ return album
+
+ async def _parse_artist(self, jellyfin_artist: dict[str, Any]) -> Artist:
+ """Parse a Jellyfin Artist response to Artist model object."""
+ artist_id = jellyfin_artist[ITEM_KEY_ID]
+ current_artist = API.get_item(self._jellyfin_server.jellyfin, artist_id)
+ if not artist_id:
+ msg = "Artist does not have a valid ID"
+ raise InvalidDataError(msg)
+ artist = Artist(
+ item_id=artist_id,
+ name=jellyfin_artist[ITEM_KEY_NAME],
+ provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ if ITEM_KEY_OVERVIEW in current_artist:
+ artist.metadata.description = current_artist[ITEM_KEY_OVERVIEW]
+ if ITEM_KEY_MUSICBRAINZ_ARTIST in current_artist[ITEM_KEY_PROVIDER_IDS]:
+ try:
+ artist.mbid = current_artist[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_ARTIST]
+ except InvalidDataError as error:
+ self.logger.warning(
+ "Jellyfin has an invalid musicbrainz id for artist %s",
+ artist.name,
+ exc_info=error if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ if ITEM_KEY_SORT_NAME in current_artist:
+ artist.sort_name = current_artist[ITEM_KEY_SORT_NAME]
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_artist):
+ artist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ user_data = current_artist.get(ITEM_KEY_USER_DATA, {})
+ artist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
+ return artist
+
+ async def _parse_track(self, jellyfin_track: dict[str, Any]) -> Track:
+ """Parse a Jellyfin Track response to a Track model object."""
+ current_jellyfin_track = await asyncio.to_thread(
+ API.get_item, self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID]
+ )
+ available = False
+ content = None
+ available = current_jellyfin_track[ITEM_KEY_CAN_DOWNLOAD]
+ content = current_jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CODEC]
+ track = Track(
+ item_id=jellyfin_track[ITEM_KEY_ID],
+ provider=self.instance_id,
+ name=jellyfin_track[ITEM_KEY_NAME],
+ provider_mappings={
+ ProviderMapping(
+ item_id=jellyfin_track[ITEM_KEY_ID],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=available,
+ audio_format=AudioFormat(
+ content_type=(
+ ContentType.try_parse(content) if content else ContentType.UNKNOWN
+ ),
+ ),
+ url=self._get_stream_url(self._jellyfin_server, jellyfin_track[ITEM_KEY_ID]),
+ )
+ },
+ )
+
+ track.disc_number = current_jellyfin_track.get(ITEM_KEY_PARENT_INDEX_NUM, 1)
+ if "IndexNumber" in current_jellyfin_track:
+ if current_jellyfin_track["IndexNumber"] >= 1:
+ track_idx = current_jellyfin_track["IndexNumber"]
+ track.track_number = track_idx
+ track.position = track_idx
+
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_track):
+ track.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+
+ if current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]:
+ for artist_item in current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]:
+ track.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ artist_item[ITEM_KEY_ID],
+ artist_item[ITEM_KEY_NAME],
+ )
+ )
+ elif ITEM_KEY_ALBUM_ID in current_jellyfin_track:
+ parent_album = API.get_item(
+ self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_ALBUM_ID]
+ )
+ if ITEM_KEY_ALBUM_ARTISTS in parent_album:
+ for artist_item in parent_album[ITEM_KEY_ALBUM_ARTISTS]:
+ track.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ artist_item[ITEM_KEY_ID],
+ artist_item[ITEM_KEY_NAME],
+ )
+ )
+ else:
+ track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))
+ else:
+ track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))
+ if ITEM_KEY_ALBUM_ID in current_jellyfin_track and ITEM_KEY_ALBUM in current_jellyfin_track:
+ track.album = self._get_item_mapping(
+ MediaType.ALBUM,
+ current_jellyfin_track[ITEM_KEY_ALBUM_ID],
+ current_jellyfin_track[ITEM_KEY_ALBUM],
+ )
+ elif ITEM_KEY_ALBUM_ID in current_jellyfin_track:
+ parent_album = API.get_item(
+ self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_ALBUM_ID]
+ )
+ track.album = self._get_item_mapping(
+ MediaType.ALBUM,
+ parent_album[ITEM_KEY_ID],
+ parent_album[ITEM_KEY_NAME],
+ )
+ if ITEM_KEY_PARENT_INDEX_NUM in current_jellyfin_track:
+ track.disc_number = current_jellyfin_track[ITEM_KEY_PARENT_INDEX_NUM]
+ if ITEM_KEY_RUNTIME_TICKS in current_jellyfin_track:
+ track.duration = int(
+ current_jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000
+ ) # 10000000 ticks per millisecond
+ track.track_number = current_jellyfin_track.get(ITEM_KEY_INDEX_NUMBER, 99)
+ if ITEM_KEY_MUSICBRAINZ_TRACK in current_jellyfin_track[ITEM_KEY_PROVIDER_IDS]:
+ track_mbid = current_jellyfin_track[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_TRACK]
+ try:
+ track.mbid = track_mbid
+ except InvalidDataError as error:
+ self.logger.warning(
+ "Jellyfin has an invalid musicbrainz id for track %s",
+ track.name,
+ exc_info=error if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ user_data = current_jellyfin_track.get(ITEM_KEY_USER_DATA, {})
+ track.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
+ return track
+
+ async def _parse_playlist(self, jellyfin_playlist: dict[str, Any]) -> Playlist:
+ """Parse a Jellyfin Playlist response to a Playlist object."""
+ playlistid = jellyfin_playlist[ITEM_KEY_ID]
+ playlist = Playlist(
+ item_id=playlistid,
+ provider=self.domain,
+ name=jellyfin_playlist[ITEM_KEY_NAME],
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlistid,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ if ITEM_KEY_OVERVIEW in jellyfin_playlist:
+ playlist.metadata.description = jellyfin_playlist[ITEM_KEY_OVERVIEW]
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_playlist):
+ playlist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ user_data = jellyfin_playlist.get(ITEM_KEY_USER_DATA, {})
+ playlist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
+ playlist.is_editable = False
+ return playlist
+
+ async def search(
+ self,
+ search_query: str,
+ media_types: list[MediaType],
+ limit: int = 20,
+ ) -> SearchResults:
+ """Perform search on the plex library.
+
+ :param search_query: Search query.
+ :param media_types: A list of media_types to include. All types if None.
+ :param limit: Number of items to return in the search (per type).
+ """
+ tasks = {}
+
+ async with TaskGroup() as tg:
+ for media_type in media_types:
+ if media_type == MediaType.ARTIST:
+ tasks[MediaType.ARTIST] = tg.create_task(
+ self._search_and_parse(
+ self._search_artist(search_query, limit), self._parse_artist
+ )
+ )
+ elif media_type == MediaType.ALBUM:
+ tasks[MediaType.ALBUM] = tg.create_task(
+ self._search_and_parse(
+ self._search_album(search_query, limit), self._parse_album
+ )
+ )
+ elif media_type == MediaType.TRACK:
+ tasks[MediaType.TRACK] = tg.create_task(
+ self._search_and_parse(
+ self._search_track(search_query, limit), self._parse_track
+ )
+ )
+ elif media_type == MediaType.PLAYLIST:
+ tasks[MediaType.PLAYLIST] = tg.create_task(
+ self._search_and_parse(
+ self._search_playlist(search_query, limit), self._parse_playlist
+ )
+ )
+
+ search_results = SearchResults()
+
+ for media_type, task in tasks.items():
+ if media_type == MediaType.ARTIST:
+ search_results.artists = task.result()
+ elif media_type == MediaType.ALBUM:
+ search_results.albums = task.result()
+ elif media_type == MediaType.TRACK:
+ search_results.tracks = task.result()
+ elif media_type == MediaType.PLAYLIST:
+ search_results.playlists = task.result()
+
+ return search_results
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve all library artists from Jellyfin Music."""
+ jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)
+ for jellyfin_library in jellyfin_libraries:
+ response = API._get(
+ self._jellyfin_server.jellyfin,
+ "Artists",
+ {
+ ITEM_KEY_PARENT_ID: jellyfin_library[ITEM_KEY_ID],
+ "ArtistType": "Artist,AlbumArtist",
+ },
+ )
+ artists_obj = response["Items"]
+ for artist in artists_obj:
+ yield await self._parse_artist(artist)
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """Retrieve all library albums from Jellyfin Music."""
+ jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)
+ for jellyfin_library in jellyfin_libraries:
+ artists_obj = await self._get_children(
+ self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST
+ )
+ for artist in artists_obj:
+ albums_obj = await self._get_children(
+ self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM
+ )
+ for album in albums_obj:
+ yield await self._parse_album(album)
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """Retrieve library tracks from Jellyfin Music."""
+ jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)
+ self._jellyfin_server.default_timeout = 120
+ for jellyfin_library in jellyfin_libraries:
+ artists_obj = await self._get_children(
+ self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST
+ )
+ for artist in artists_obj:
+ albums_obj = await self._get_children(
+ self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM
+ )
+ for album in albums_obj:
+ tracks_obj = await self._get_children(
+ self._jellyfin_server, album[ITEM_KEY_ID], ITEM_TYPE_AUDIO
+ )
+ for track in tracks_obj:
+ yield await self._parse_track(track)
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Retrieve all library playlists from the provider."""
+ playlist_libraries = await self._get_playlists(self._jellyfin_server)
+ for playlist_library in playlist_libraries:
+ playlists_obj = await self._get_children(
+ self._jellyfin_server, playlist_library[ITEM_KEY_ID], "Playlist"
+ )
+ for playlist in playlists_obj:
+ if "MediaType" in playlist: # Only jellyfin has this property
+ if playlist["MediaType"] == "Audio":
+ yield await self._parse_playlist(playlist)
+ else: # emby playlists are only audio type
+ yield await self._parse_playlist(playlist)
+
+ async def get_album(self, prov_album_id) -> Album:
+ """Get full album details by id."""
+ if jellyfin_album := API.get_item(self._jellyfin_server.jellyfin, prov_album_id):
+ return await self._run_async(self._parse_album(jellyfin_album))
+ msg = f"Item {prov_album_id} not found"
+ raise MediaNotFoundError(msg)
+
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
+ """Get album tracks for given album id."""
+ jellyfin_album_tracks = await self._get_children(
+ self._jellyfin_server, prov_album_id, ITEM_TYPE_AUDIO
+ )
+ return [
+ await self._parse_track(jellyfin_album_track)
+ for jellyfin_album_track in jellyfin_album_tracks
+ ]
+
+ async def get_artist(self, prov_artist_id) -> Artist:
+ """Get full artist details by id."""
+ if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
+ # This artist does not exist in jellyfin, so we can just load it from DB.
+
+ if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
+ prov_artist_id, self.instance_id
+ ):
+ return db_artist
+ msg = f"Artist not found: {prov_artist_id}"
+ raise MediaNotFoundError(msg)
+
+ if jellyfin_artist := API.get_item(self._jellyfin_server.jellyfin, prov_artist_id):
+ return await self._parse_artist(jellyfin_artist)
+ msg = f"Item {prov_artist_id} not found"
+ raise MediaNotFoundError(msg)
+
+ async def get_track(self, prov_track_id) -> Track:
+ """Get full track details by id."""
+ if jellyfin_track := API.get_item(self._jellyfin_server.jellyfin, prov_track_id):
+ return await self._parse_track(jellyfin_track)
+ msg = f"Item {prov_track_id} not found"
+ raise MediaNotFoundError(msg)
+
+ async def get_playlist(self, prov_playlist_id) -> Playlist:
+ """Get full playlist details by id."""
+ if jellyfin_playlist := API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id):
+ return await self._parse_playlist(jellyfin_playlist)
+ msg = f"Item {prov_playlist_id} not found"
+ raise MediaNotFoundError(msg)
+
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
+ if offset:
+ # paging not supported, we always return the whole list at once
+ return []
+ # TODO: Does Jellyfin support paging here?
+ jellyfin_playlist = API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id)
+ playlist_items = await self._get_children(
+ self._jellyfin_server, jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO
+ )
+ if not playlist_items:
+ return result
+ for index, jellyfin_track in enumerate(playlist_items, 1):
+ try:
+ if track := await self._parse_track(jellyfin_track):
+ if not track.position:
+ track.position = offset + index
+ result.append(track)
+ except (KeyError, ValueError) as err:
+ self.logger.error(
+ "Skipping track %s: %s", jellyfin_track.get(ITEM_KEY_NAME, index), str(err)
+ )
+ return result
+
+ async def get_artist_albums(self, prov_artist_id) -> list[Album]:
+ """Get a list of albums for the given artist."""
+ if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
+ artists_obj = await self._get_children(
+ self._jellyfin_server, prov_artist_id, ITEM_TYPE_ARTIST
+ )
+ for artist in artists_obj:
+ jellyfin_albums = await self._get_children(
+ self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM
+ )
+ if jellyfin_albums:
+ albums = []
+ for album_obj in jellyfin_albums:
+ albums.append(await self._parse_album(album_obj))
+ return albums
+ return []
+
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
+ """Return the content details for the given track when it will be streamed."""
+ jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id)
+ mimetype = self._media_mime_type(jellyfin_track)
+ media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]
+ url = API.audio_url(
+ self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS
+ )
+ if ITEM_KEY_MEDIA_CODEC in media_stream:
+ content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])
+ else:
+ content_type = ContentType.try_parse(mimetype)
+ return StreamDetails(
+ item_id=jellyfin_track[ITEM_KEY_ID],
+ provider=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=content_type,
+ channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS],
+ ),
+ stream_type=StreamType.HTTP,
+ duration=int(
+ jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000
+ ), # 10000000 ticks per millisecond)
+ path=url,
+ )
+
+ def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None:
+ """Return the URL for the primary image of a media item if available."""
+ image_tags = media_item[ITEM_KEY_IMAGE_TAGS]
+
+ if "Primary" not in image_tags:
+ return None
+
+ item_id = media_item[ITEM_KEY_ID]
+ return API.artwork(client.jellyfin, item_id, "Primary", MAX_IMAGE_WIDTH)
+
+ def _get_stream_url(self, client: JellyfinClient, media_item: str) -> str:
+ """Return the stream URL for a media item."""
+ return API.audio_url(client.jellyfin, media_item) # type: ignore[no-any-return]
+
+ async def _get_children(
+ self, client: JellyfinClient, parent_id: str, item_type: str
+ ) -> list[dict[str, Any]]:
+ """Return all children for the parent_id whose item type is item_type."""
+ params = {
+ "Recursive": "true",
+ ITEM_KEY_PARENT_ID: parent_id,
+ }
+ if item_type in ITEM_TYPE_ARTIST:
+ params["IncludeItemTypes"] = f"{ITEM_TYPE_MUSICARTISTS},{ITEM_TYPE_ARTIST}"
+ else:
+ params["IncludeItemTypes"] = item_type
+ if item_type in ITEM_TYPE_AUDIO:
+ params["Fields"] = ITEM_KEY_MEDIA_SOURCES
+
+ result = client.jellyfin.user_items("", params)
+ return result["Items"]
+
+ async def _get_music_libraries(self, client: JellyfinClient) -> list[dict[str, Any]]:
+ """Return all supported libraries a user has access to."""
+ response = API.get_media_folders(client.jellyfin)
+ libraries = response["Items"]
+ result = []
+ for library in libraries:
+ if ITEM_KEY_COLLECTION_TYPE in library and library[ITEM_KEY_COLLECTION_TYPE] in "music":
+ result.append(library)
+ return result
+
+ async def _get_playlists(self, client: JellyfinClient) -> list[dict[str, Any]]:
+ """Return all supported libraries a user has access to."""
+ response = API.get_media_folders(client.jellyfin)
+ libraries = response["Items"]
+ result = []
+ for library in libraries:
+ if (
+ ITEM_KEY_COLLECTION_TYPE in library
+ and library[ITEM_KEY_COLLECTION_TYPE] in "playlists"
+ ):
+ result.append(library)
+ return result
+
+ def _media_mime_type(self, media_item: dict[str, Any]) -> str | None:
+ """Return the mime type of a media item."""
+ if not media_item.get(ITEM_KEY_MEDIA_SOURCES):
+ return None
+
+ media_source = media_item[ITEM_KEY_MEDIA_SOURCES][0]
+
+ if "Path" not in media_source:
+ return None
+
+ path = media_source["Path"]
+ mime_type, _ = mimetypes.guess_type(path)
+
+ return mime_type