--- /dev/null
+"""Jellyfin support for MusicAssistant."""\r
+\r
+from __future__ import annotations\r
+\r
+import mimetypes\r
+import socket\r
+import uuid\r
+from asyncio import TaskGroup\r
+from typing import TYPE_CHECKING, Any\r
+\r
+if TYPE_CHECKING:\r
+ from collections.abc import AsyncGenerator, Callable, Coroutine\r
+\r
+from aiohttp import ClientTimeout\r
+from jellyfin_apiclient_python import JellyfinClient\r
+from jellyfin_apiclient_python.api import API\r
+\r
+from music_assistant.common.models.config_entries import (\r
+ ConfigEntry,\r
+ ConfigValueType,\r
+ ProviderConfig,\r
+)\r
+from music_assistant.common.models.enums import (\r
+ ConfigEntryType,\r
+ ContentType,\r
+ ImageType,\r
+ MediaType,\r
+ ProviderFeature,\r
+)\r
+from music_assistant.common.models.errors import (\r
+ InvalidDataError,\r
+ LoginFailed,\r
+ MediaNotFoundError,\r
+ MusicAssistantError,\r
+)\r
+from music_assistant.common.models.media_items import (\r
+ Album,\r
+ AlbumTrack,\r
+ Artist,\r
+ AudioFormat,\r
+ ItemMapping,\r
+ MediaItem,\r
+ MediaItemImage,\r
+ Playlist,\r
+ PlaylistTrack,\r
+ ProviderMapping,\r
+ SearchResults,\r
+ StreamDetails,\r
+ Track,\r
+)\r
+from music_assistant.common.models.media_items import (\r
+ Album as JellyfinAlbum,\r
+)\r
+from music_assistant.common.models.media_items import (\r
+ Artist as JellyfinArtist,\r
+)\r
+from music_assistant.common.models.media_items import (\r
+ Playlist as JellyfinPlaylist,\r
+)\r
+from music_assistant.common.models.media_items import (\r
+ Track as JellyfinTrack,\r
+)\r
+\r
+if TYPE_CHECKING:\r
+ from music_assistant.common.models.provider import ProviderManifest\r
+from music_assistant.constants import VARIOUS_ARTISTS_NAME\r
+\r
+if TYPE_CHECKING:\r
+ from music_assistant.server import MusicAssistant\r
+if TYPE_CHECKING:\r
+ from music_assistant.server.models import ProviderInstanceType\r
+from music_assistant.server.models.music_provider import MusicProvider\r
+\r
+from .const import (\r
+ CLIENT_VERSION,\r
+ ITEM_KEY_ALBUM,\r
+ ITEM_KEY_ALBUM_ARTIST,\r
+ ITEM_KEY_ARTIST_ITEMS,\r
+ ITEM_KEY_CAN_DOWNLOAD,\r
+ ITEM_KEY_COLLECTION_TYPE,\r
+ ITEM_KEY_ID,\r
+ ITEM_KEY_IMAGE_TAGS,\r
+ ITEM_KEY_INDEX_NUMBER,\r
+ ITEM_KEY_MEDIA_CHANNELS,\r
+ ITEM_KEY_MEDIA_CODEC,\r
+ ITEM_KEY_MEDIA_SOURCES,\r
+ ITEM_KEY_MEDIA_STREAMS,\r
+ ITEM_KEY_MUSICBRAINZ_ARTIST,\r
+ ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP,\r
+ ITEM_KEY_MUSICBRAINZ_TRACK,\r
+ ITEM_KEY_NAME,\r
+ ITEM_KEY_OVERVIEW,\r
+ ITEM_KEY_PARENT_ID,\r
+ ITEM_KEY_PARENT_INDEX_NUM,\r
+ ITEM_KEY_PRODUCTION_YEAR,\r
+ ITEM_KEY_PROVIDER_IDS,\r
+ ITEM_KEY_RUNTIME_TICKS,\r
+ ITEM_KEY_SORT_NAME,\r
+ ITEM_TYPE_ALBUM,\r
+ ITEM_TYPE_ARTIST,\r
+ ITEM_TYPE_AUDIO,\r
+ MAX_IMAGE_WIDTH,\r
+ USER_APP_NAME,\r
+)\r
+\r
+CONF_URL = "url"\r
+CONF_USERNAME = "username"\r
+CONF_PASSWORD = "password"\r
+FAKE_ARTIST_PREFIX = "_fake://"\r
+\r
+\r
+async def setup(\r
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig\r
+) -> ProviderInstanceType:\r
+ """Initialize provider(instance) with given configuration."""\r
+ prov = JellyfinProvider(mass, manifest, config)\r
+ await prov.handle_setup()\r
+ return prov\r
+\r
+\r
+async def get_config_entries(\r
+ mass: MusicAssistant,\r
+ instance_id: str | None = None, # pylint: disable=W0613\r
+ action: str | None = None,\r
+ values: dict[str, ConfigValueType] | None = None,\r
+) -> tuple[ConfigEntry, ...]:\r
+ """\r
+ Return Config entries to setup this provider.\r
+\r
+ instance_id: id of an existing provider instance (None if new instance setup).\r
+ action: [optional] action key called from config entries UI.\r
+ values: the (intermediate) raw values for config entries sent with the action.\r
+ """\r
+ # config flow auth action/step (authenticate button clicked)\r
+ # ruff: noqa: ARG001\r
+ return (\r
+ ConfigEntry(\r
+ key=CONF_URL,\r
+ type=ConfigEntryType.STRING,\r
+ label="Server",\r
+ required=True,\r
+ description="The url of the Jellyfin server to connect to.",\r
+ ),\r
+ ConfigEntry(\r
+ key=CONF_USERNAME,\r
+ type=ConfigEntryType.STRING,\r
+ label="Username",\r
+ required=True,\r
+ description="The username to authenticate to the remote server."\r
+ "the remote host, For example 'media'.",\r
+ ),\r
+ ConfigEntry(\r
+ key=CONF_PASSWORD,\r
+ type=ConfigEntryType.SECURE_STRING,\r
+ label="Password",\r
+ required=True,\r
+ description="The password to authenticate to the remote server.",\r
+ ),\r
+ )\r
+\r
+\r
+class JellyfinProvider(MusicProvider):\r
+ """Provider for a jellyfin music library."""\r
+\r
+ # _jellyfin_server : JellyfinClient = None\r
+\r
+ async def handle_setup(self) -> None:\r
+ """Initialize provider(instance) with given configuration."""\r
+\r
+ def connect() -> JellyfinClient:\r
+ try:\r
+ client = JellyfinClient()\r
+ device_name = socket.gethostname()\r
+ device_id = str(uuid.uuid4())\r
+ client.config.app(USER_APP_NAME, CLIENT_VERSION, device_name, device_id)\r
+ if CONF_URL.startswith("https://"):\r
+ JellyfinClient.config.data["auth.ssl"] = True\r
+ else:\r
+ client.config.data["auth.ssl"] = False\r
+ jellyfin_server_url = self.config.get_value(CONF_URL)\r
+ jellyfin_server_user = self.config.get_value(CONF_USERNAME)\r
+ jellyfin_server_password = self.config.get_value(CONF_PASSWORD)\r
+ client.auth.connect_to_address(jellyfin_server_url)\r
+ client.auth.login(\r
+ jellyfin_server_url, jellyfin_server_user, jellyfin_server_password\r
+ )\r
+ credentials = client.auth.credentials.get_credentials()\r
+ server = credentials["Servers"][0]\r
+ server["username"] = jellyfin_server_user\r
+ _jellyfin_server = client\r
+ # json.dumps(server)\r
+ except MusicAssistantError as err:\r
+ msg = "Authentication failed: %s", str(err)\r
+ raise LoginFailed(msg)\r
+ return _jellyfin_server\r
+\r
+ self._jellyfin_server = await self._run_async(connect)\r
+\r
+ @property\r
+ def supported_features(self) -> tuple[ProviderFeature, ...]:\r
+ """Return a list of supported features."""\r
+ return (\r
+ ProviderFeature.LIBRARY_ARTISTS,\r
+ ProviderFeature.LIBRARY_ALBUMS,\r
+ ProviderFeature.LIBRARY_TRACKS,\r
+ ProviderFeature.LIBRARY_PLAYLISTS,\r
+ ProviderFeature.BROWSE,\r
+ ProviderFeature.SEARCH,\r
+ ProviderFeature.ARTIST_ALBUMS,\r
+ )\r
+\r
+ @property\r
+ def is_unique(self) -> bool:\r
+ """\r
+ Return True if the (non user related) data in this provider instance is unique.\r
+\r
+ For example on a global streaming provider (like Spotify),\r
+ the data on all instances is the same.\r
+ For a file provider each instance has other items.\r
+ Setting this to False will only query one instance of the provider for search and lookups.\r
+ Setting this to True will query all instances of this provider for search and lookups.\r
+ """\r
+ return True\r
+\r
+ async def _run_async(self, call: Callable, *args, **kwargs):\r
+ return await self.mass.create_task(call, *args, **kwargs)\r
+\r
+ async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:\r
+ """Return the full image URL including the auth token."""\r
+ return path\r
+\r
+ def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:\r
+ return ItemMapping(\r
+ media_type=media_type,\r
+ item_id=key,\r
+ provider=self.instance_id,\r
+ name=name,\r
+ )\r
+\r
+ async def _parse(self, jellyfin_media) -> MediaItem | None:\r
+ if jellyfin_media.type == "artist":\r
+ return await self._parse_artist(jellyfin_media)\r
+ elif jellyfin_media.type == "album":\r
+ return await self._parse_album(jellyfin_media)\r
+ elif jellyfin_media.type == "track":\r
+ return await self._parse_track(jellyfin_media)\r
+ elif jellyfin_media.type == "playlist":\r
+ return await self._parse_playlist(jellyfin_media)\r
+ return None\r
+\r
+ async def _search_track(self, search_query, limit) -> list[JellyfinTrack]:\r
+ resultset = await self._run_async(\r
+ API.search_media_items,\r
+ self._jellyfin_server.jellyfin,\r
+ term=search_query,\r
+ media=ITEM_TYPE_AUDIO,\r
+ limit=limit,\r
+ )\r
+ return resultset["Items"]\r
+\r
+ async def _search_album(self, search_query, limit) -> list[JellyfinAlbum]:\r
+ if "-" in search_query:\r
+ searchterms = search_query.split(" - ")\r
+ albumname = searchterms[1]\r
+ else:\r
+ albumname = search_query\r
+ resultset = await self._run_async(\r
+ API.search_media_items,\r
+ self._jellyfin_server.jellyfin,\r
+ term=albumname,\r
+ media=ITEM_TYPE_ALBUM,\r
+ limit=limit,\r
+ )\r
+ return resultset["Items"]\r
+\r
+ async def _search_artist(self, search_query, limit) -> list[JellyfinArtist]:\r
+ resultset = await self._run_async(\r
+ API.search_media_items,\r
+ self._jellyfin_server.jellyfin,\r
+ term=search_query,\r
+ media=ITEM_TYPE_ARTIST,\r
+ limit=limit,\r
+ )\r
+ return resultset["Items"]\r
+\r
+ async def _search_playlist(self, search_query, limit) -> list[JellyfinPlaylist]:\r
+ resultset = await self._run_async(\r
+ API.search_media_items,\r
+ self._jellyfin_server.jellyfin,\r
+ term=search_query,\r
+ media="Playlist",\r
+ limit=limit,\r
+ )\r
+ return resultset["Items"]\r
+\r
+ async def _search_and_parse(\r
+ self, search_coro: Coroutine, parse_coro: Callable\r
+ ) -> list[MediaItem]:\r
+ task_results = []\r
+ async with TaskGroup() as tg:\r
+ for item in await search_coro:\r
+ task_results.append(tg.create_task(parse_coro(item)))\r
+\r
+ results = []\r
+ for task in task_results:\r
+ results.append(task.result())\r
+\r
+ return results\r
+\r
+ async def _parse_album(self, jellyfin_album: dict[str, Any]) -> Album:\r
+ """Parse a Jellyfin Album response to an Album model object."""\r
+ album_id = jellyfin_album[ITEM_KEY_ID]\r
+ album = Album(\r
+ item_id=album_id,\r
+ provider=self.domain,\r
+ name=jellyfin_album[ITEM_KEY_NAME],\r
+ provider_mappings={\r
+ ProviderMapping(\r
+ item_id=str(album_id),\r
+ provider_domain=self.domain,\r
+ provider_instance=self.instance_id,\r
+ )\r
+ },\r
+ )\r
+ current_jellyfin_album = API.get_item(self._jellyfin_server.jellyfin, album_id)\r
+ if ITEM_KEY_PRODUCTION_YEAR in current_jellyfin_album:\r
+ album.year = current_jellyfin_album[ITEM_KEY_PRODUCTION_YEAR]\r
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_album):\r
+ album.metadata.images = [\r
+ MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ ]\r
+ if ITEM_KEY_OVERVIEW in current_jellyfin_album:\r
+ album.metadata.description = current_jellyfin_album[ITEM_KEY_OVERVIEW]\r
+ if ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP in current_jellyfin_album[ITEM_KEY_PROVIDER_IDS]:\r
+ musicbrainzid = current_jellyfin_album[ITEM_KEY_PROVIDER_IDS][\r
+ ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP\r
+ ]\r
+ if len(musicbrainzid.split("-")) == 5:\r
+ album.mbid = musicbrainzid\r
+ if ITEM_KEY_SORT_NAME in current_jellyfin_album:\r
+ album.sort_name = current_jellyfin_album[ITEM_KEY_SORT_NAME]\r
+ if ITEM_KEY_ALBUM_ARTIST in current_jellyfin_album:\r
+ album.artists.append(\r
+ self._get_item_mapping(\r
+ MediaType.ARTIST,\r
+ current_jellyfin_album[ITEM_KEY_PARENT_ID],\r
+ current_jellyfin_album[ITEM_KEY_ALBUM_ARTIST],\r
+ )\r
+ )\r
+ elif len(current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS]) >= 1:\r
+ num_artists = len(current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS])\r
+ for i in range(num_artists):\r
+ album.artists.append(\r
+ self._get_item_mapping(\r
+ MediaType.ARTIST,\r
+ current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS][i][ITEM_KEY_ID],\r
+ current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS][i][ITEM_KEY_NAME],\r
+ )\r
+ )\r
+ return album\r
+\r
+ async def _parse_artist(self, jellyfin_artist: dict[str, Any]) -> Artist:\r
+ """Parse a Jellyfin Artist response to Artist model object."""\r
+ artist_id = jellyfin_artist[ITEM_KEY_ID]\r
+ current_artist = API.get_item(self._jellyfin_server.jellyfin, artist_id)\r
+ if not artist_id:\r
+ msg = "Artist does not have a valid ID"\r
+ raise InvalidDataError(msg)\r
+ artist = Artist(\r
+ item_id=artist_id,\r
+ name=jellyfin_artist[ITEM_KEY_NAME],\r
+ provider=self.domain,\r
+ provider_mappings={\r
+ ProviderMapping(\r
+ item_id=str(artist_id),\r
+ provider_domain=self.domain,\r
+ provider_instance=self.instance_id,\r
+ )\r
+ },\r
+ )\r
+ if ITEM_KEY_OVERVIEW in current_artist:\r
+ artist.metadata.description = current_artist[ITEM_KEY_OVERVIEW]\r
+ if ITEM_KEY_MUSICBRAINZ_ARTIST in current_artist[ITEM_KEY_PROVIDER_IDS]:\r
+ artist.mbid = current_artist[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_ARTIST]\r
+ if ITEM_KEY_SORT_NAME in current_artist:\r
+ artist.sort_name = current_artist[ITEM_KEY_SORT_NAME]\r
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_artist):\r
+ artist.metadata.images = [\r
+ MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ ]\r
+ return artist\r
+\r
+ async def _parse_track(\r
+ self, jellyfin_track: dict[str, Any], extra_init_kwargs: dict[str, Any] | None = None\r
+ ) -> Track | AlbumTrack | PlaylistTrack:\r
+ """Parse a Jellyfin Track response to a Track model object."""\r
+ if extra_init_kwargs and "position" in extra_init_kwargs:\r
+ track_class = PlaylistTrack\r
+ elif (\r
+ extra_init_kwargs\r
+ and "disc_number" in extra_init_kwargs\r
+ and "track_number" in extra_init_kwargs\r
+ ):\r
+ track_class = AlbumTrack\r
+ else:\r
+ track_class = Track\r
+ current_jellyfin_track = API.get_item(\r
+ self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID]\r
+ )\r
+ available = False\r
+ content = None\r
+ available = current_jellyfin_track[ITEM_KEY_CAN_DOWNLOAD]\r
+ content = current_jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CODEC]\r
+ track = track_class(\r
+ item_id=jellyfin_track[ITEM_KEY_ID],\r
+ provider=self.instance_id,\r
+ name=jellyfin_track[ITEM_KEY_NAME],\r
+ **extra_init_kwargs or {},\r
+ provider_mappings={\r
+ ProviderMapping(\r
+ item_id=jellyfin_track[ITEM_KEY_ID],\r
+ provider_domain=self.domain,\r
+ provider_instance=self.instance_id,\r
+ available=available,\r
+ audio_format=AudioFormat(\r
+ content_type=(\r
+ ContentType.try_parse(content) if content else ContentType.UNKNOWN\r
+ ),\r
+ ),\r
+ url=self._get_stream_url(self._jellyfin_server, jellyfin_track[ITEM_KEY_ID]),\r
+ )\r
+ },\r
+ )\r
+\r
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_track):\r
+ track.metadata.images = [\r
+ MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ ]\r
+ if len(current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]) >= 1:\r
+ track.artists.append(\r
+ self._get_item_mapping(\r
+ MediaType.ARTIST,\r
+ current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS][0][ITEM_KEY_ID],\r
+ current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS][0][ITEM_KEY_NAME],\r
+ )\r
+ )\r
+ num_artists = len(current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS])\r
+ for i in range(num_artists):\r
+ track.artists.append(\r
+ self._get_item_mapping(\r
+ MediaType.ARTIST,\r
+ current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS][i][ITEM_KEY_ID],\r
+ current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS][i][ITEM_KEY_NAME],\r
+ )\r
+ )\r
+ elif ITEM_KEY_PARENT_ID in current_jellyfin_track:\r
+ parent_album = API.get_item(\r
+ self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_PARENT_ID]\r
+ )\r
+ track.artists.append(\r
+ self._get_item_mapping(\r
+ MediaType.ARTIST,\r
+ parent_album[ITEM_KEY_PARENT_ID],\r
+ parent_album[ITEM_KEY_ALBUM_ARTIST],\r
+ )\r
+ )\r
+ track.artists.append(\r
+ self._get_item_mapping(\r
+ MediaType.ARTIST,\r
+ parent_album[ITEM_KEY_PARENT_ID],\r
+ parent_album[ITEM_KEY_ALBUM_ARTIST],\r
+ )\r
+ )\r
+ else:\r
+ track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))\r
+ if ITEM_KEY_PARENT_ID in current_jellyfin_track:\r
+ track.album = self._get_item_mapping(\r
+ MediaType.ALBUM,\r
+ current_jellyfin_track[ITEM_KEY_PARENT_ID],\r
+ current_jellyfin_track[ITEM_KEY_ALBUM],\r
+ )\r
+ if ITEM_KEY_PARENT_INDEX_NUM in current_jellyfin_track:\r
+ track.disc_number = current_jellyfin_track[ITEM_KEY_PARENT_INDEX_NUM]\r
+ if ITEM_KEY_RUNTIME_TICKS in current_jellyfin_track:\r
+ track.duration = int(\r
+ current_jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000\r
+ ) # 10000000 ticks per millisecond\r
+ track.track_number = current_jellyfin_track.get(ITEM_KEY_INDEX_NUMBER, 99)\r
+ if ITEM_KEY_MUSICBRAINZ_TRACK in current_jellyfin_track[ITEM_KEY_PROVIDER_IDS]:\r
+ track.mbid = current_jellyfin_track[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_TRACK]\r
+ return track\r
+\r
+ async def _parse_playlist(self, jellyfin_playlist: JellyfinPlaylist) -> Playlist:\r
+ """Parse a Jellyfin Playlist response to a Playlist object."""\r
+ playlistid = jellyfin_playlist[ITEM_KEY_ID]\r
+ playlist = Playlist(\r
+ item_id=playlistid,\r
+ provider=self.domain,\r
+ name=jellyfin_playlist[ITEM_KEY_NAME],\r
+ provider_mappings={\r
+ ProviderMapping(\r
+ item_id=playlistid,\r
+ provider_domain=self.domain,\r
+ provider_instance=self.instance_id,\r
+ )\r
+ },\r
+ )\r
+ if ITEM_KEY_OVERVIEW in jellyfin_playlist:\r
+ playlist.metadata.description = jellyfin_playlist[ITEM_KEY_OVERVIEW]\r
+ if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_playlist):\r
+ playlist.metadata.images = [\r
+ MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ ]\r
+ playlist.is_editable = False\r
+ return playlist\r
+\r
+ async def search(\r
+ self,\r
+ search_query: str,\r
+ media_types: list[MediaType] | None = None,\r
+ limit: int = 20,\r
+ ) -> SearchResults:\r
+ """Perform search on the plex library.\r
+\r
+ :param search_query: Search query.\r
+ :param media_types: A list of media_types to include. All types if None.\r
+ :param limit: Number of items to return in the search (per type).\r
+ """\r
+ if not media_types:\r
+ media_types = [MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST]\r
+\r
+ tasks = {}\r
+\r
+ async with TaskGroup() as tg:\r
+ for media_type in media_types:\r
+ if media_type == MediaType.ARTIST:\r
+ tasks[MediaType.ARTIST] = tg.create_task(\r
+ self._search_and_parse(\r
+ self._search_artist(search_query, limit), self._parse_artist\r
+ )\r
+ )\r
+ elif media_type == MediaType.ALBUM:\r
+ tasks[MediaType.ALBUM] = tg.create_task(\r
+ self._search_and_parse(\r
+ self._search_album(search_query, limit), self._parse_album\r
+ )\r
+ )\r
+ elif media_type == MediaType.TRACK:\r
+ tasks[MediaType.TRACK] = tg.create_task(\r
+ self._search_and_parse(\r
+ self._search_track(search_query, limit), self._parse_track\r
+ )\r
+ )\r
+ elif media_type == MediaType.PLAYLIST:\r
+ tasks[MediaType.PLAYLIST] = tg.create_task(\r
+ self._search_and_parse(\r
+ self._search_playlist(search_query, limit), self._parse_playlist\r
+ )\r
+ )\r
+\r
+ search_results = SearchResults()\r
+\r
+ for media_type, task in tasks.items():\r
+ if media_type == MediaType.ARTIST:\r
+ search_results.artists = task.result()\r
+ elif media_type == MediaType.ALBUM:\r
+ search_results.albums = task.result()\r
+ elif media_type == MediaType.TRACK:\r
+ search_results.tracks = task.result()\r
+ elif media_type == MediaType.PLAYLIST:\r
+ search_results.playlists = task.result()\r
+\r
+ return search_results\r
+\r
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:\r
+ """Retrieve all library artists from Jellyfin Music."""\r
+ jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)\r
+ for jellyfin_library in jellyfin_libraries:\r
+ artists_obj = await self._get_children(\r
+ self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST\r
+ )\r
+ for artist in artists_obj:\r
+ yield await self._parse_artist(artist)\r
+\r
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:\r
+ """Retrieve all library albums from Jellyfin Music."""\r
+ jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)\r
+ for jellyfin_library in jellyfin_libraries:\r
+ artists_obj = await self._get_children(\r
+ self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST\r
+ )\r
+ for artist in artists_obj:\r
+ albums_obj = await self._get_children(\r
+ self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM\r
+ )\r
+ for album in albums_obj:\r
+ yield await self._parse_album(album)\r
+\r
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:\r
+ """Retrieve library tracks from Jellyfin Music."""\r
+ jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)\r
+ self._jellyfin_server.default_timeout = 120\r
+ for jellyfin_library in jellyfin_libraries:\r
+ artists_obj = await self._get_children(\r
+ self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST\r
+ )\r
+ for artist in artists_obj:\r
+ albums_obj = await self._get_children(\r
+ self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM\r
+ )\r
+ for album in albums_obj:\r
+ tracks_obj = await self._get_children(\r
+ self._jellyfin_server, album[ITEM_KEY_ID], ITEM_TYPE_AUDIO\r
+ )\r
+ for track in tracks_obj:\r
+ yield await self._parse_track(track)\r
+\r
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:\r
+ """Retrieve all library playlists from the provider."""\r
+ playlist_libraries = await self._get_playlists(self._jellyfin_server)\r
+ for playlist_library in playlist_libraries:\r
+ playlists_obj = await self._get_children(\r
+ self._jellyfin_server, playlist_library[ITEM_KEY_ID], "Playlist"\r
+ )\r
+ for playlist in playlists_obj:\r
+ if playlist["MediaType"] == "Audio":\r
+ yield await self._parse_playlist(playlist)\r
+\r
+ async def get_album(self, prov_album_id) -> Album:\r
+ """Get full album details by id."""\r
+ if jellyfin_album := API.get_item(self._jellyfin_server.jellyfin, prov_album_id):\r
+ return await self._run_async(self._parse_album(jellyfin_album))\r
+ msg = f"Item {prov_album_id} not found"\r
+ raise MediaNotFoundError(msg)\r
+\r
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:\r
+ """Get album tracks for given album id."""\r
+ jellyfin_album_tracks = await self._get_children(\r
+ self._jellyfin_server, prov_album_id, ITEM_TYPE_AUDIO\r
+ )\r
+ tracks = []\r
+ for jellyfin_album_track in jellyfin_album_tracks:\r
+ discnum = jellyfin_album_track.get(ITEM_KEY_PARENT_INDEX_NUM, 1)\r
+ if "IndexNumber" in jellyfin_album_track:\r
+ if jellyfin_album_track["IndexNumber"] >= 1:\r
+ tracknum = jellyfin_album_track["IndexNumber"]\r
+ else:\r
+ tracknum = jellyfin_album_track["IndexNumber"]\r
+ else:\r
+ tracknum = 99\r
+ track = await self._parse_track(\r
+ jellyfin_album_track,\r
+ {\r
+ "disc_number": discnum,\r
+ "track_number": tracknum,\r
+ },\r
+ )\r
+ tracks.append(track)\r
+ return tracks\r
+\r
+ async def get_artist(self, prov_artist_id) -> Artist:\r
+ """Get full artist details by id."""\r
+ if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):\r
+ # This artist does not exist in jellyfin, so we can just load it from DB.\r
+\r
+ if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(\r
+ prov_artist_id, self.instance_id\r
+ ):\r
+ return db_artist\r
+ msg = f"Artist not found: {prov_artist_id}"\r
+ raise MediaNotFoundError(msg)\r
+\r
+ if jellyfin_artist := API.get_item(self._jellyfin_server.jellyfin, prov_artist_id):\r
+ return await self._parse_artist(jellyfin_artist)\r
+ msg = f"Item {prov_artist_id} not found"\r
+ raise MediaNotFoundError(msg)\r
+\r
+ async def get_track(self, prov_track_id) -> Track:\r
+ """Get full track details by id."""\r
+ if jellyfin_track := API.get_item(self._jellyfin_server.jellyfin, prov_track_id):\r
+ return await self._parse_track(jellyfin_track)\r
+ msg = f"Item {prov_track_id} not found"\r
+ raise MediaNotFoundError(msg)\r
+\r
+ async def get_playlist(self, prov_playlist_id) -> Playlist:\r
+ """Get full playlist details by id."""\r
+ if jellyfin_playlist := API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id):\r
+ return await self._parse_playlist(jellyfin_playlist)\r
+ msg = f"Item {prov_playlist_id} not found"\r
+ raise MediaNotFoundError(msg)\r
+\r
+ async def get_playlist_tracks( # type: ignore[return]\r
+ self, prov_playlist_id: str\r
+ ) -> AsyncGenerator[Track, None]:\r
+ """Get all playlist tracks for given playlist id."""\r
+ jellyfin_playlist = API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id)\r
+\r
+ playlist_items = await self._get_children(\r
+ self._jellyfin_server, jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO\r
+ )\r
+\r
+ if not playlist_items:\r
+ yield None\r
+ for index, jellyfin_track in enumerate(playlist_items):\r
+ if track := await self._parse_track(jellyfin_track, {"position": index + 1}):\r
+ yield track\r
+\r
+ async def get_artist_albums(self, prov_artist_id) -> list[Album]:\r
+ """Get a list of albums for the given artist."""\r
+ if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):\r
+ artists_obj = await self._get_children(\r
+ self._jellyfin_server, prov_artist_id, ITEM_TYPE_ARTIST\r
+ )\r
+ for artist in artists_obj:\r
+ jellyfin_albums = await self._get_children(\r
+ self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM\r
+ )\r
+ if jellyfin_albums:\r
+ albums = []\r
+ for album_obj in jellyfin_albums:\r
+ albums.append(await self._parse_album(album_obj))\r
+ return albums\r
+ return []\r
+\r
+ async def get_stream_details(self, item_id: str) -> StreamDetails:\r
+ """Return the content details for the given track when it will be streamed."""\r
+ jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id)\r
+ mimetype = self._media_mime_type(jellyfin_track)\r
+ media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]\r
+ if ITEM_KEY_MEDIA_CODEC in media_stream:\r
+ media_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])\r
+ else:\r
+ media_type = ContentType.try_parse(mimetype)\r
+ return StreamDetails(\r
+ item_id=jellyfin_track[ITEM_KEY_ID],\r
+ provider=self.instance_id,\r
+ audio_format=AudioFormat(\r
+ content_type=media_type,\r
+ channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS],\r
+ ),\r
+ duration=int(\r
+ jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000\r
+ ), # 10000000 ticks per millisecond)\r
+ data=jellyfin_track,\r
+ )\r
+\r
+ def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None:\r
+ """Return the URL for the primary image of a media item if available."""\r
+ image_tags = media_item[ITEM_KEY_IMAGE_TAGS]\r
+\r
+ if "Primary" not in image_tags:\r
+ return None\r
+\r
+ item_id = media_item[ITEM_KEY_ID]\r
+ return API.artwork(client.jellyfin, item_id, "Primary", MAX_IMAGE_WIDTH)\r
+\r
+ def _get_stream_url(self, client: JellyfinClient, media_item: str) -> str:\r
+ """Return the stream URL for a media item."""\r
+ return API.audio_url(client.jellyfin, media_item) # type: ignore[no-any-return]\r
+\r
+ async def _get_children(\r
+ self, client: JellyfinClient, parent_id: str, item_type: str\r
+ ) -> list[dict[str, Any]]:\r
+ """Return all children for the parent_id whose item type is item_type."""\r
+ params = {\r
+ "Recursive": "true",\r
+ ITEM_KEY_PARENT_ID: parent_id,\r
+ "IncludeItemTypes": item_type,\r
+ }\r
+ if item_type in ITEM_TYPE_AUDIO:\r
+ params["Fields"] = ITEM_KEY_MEDIA_SOURCES\r
+\r
+ result = client.jellyfin.user_items("", params)\r
+ return result["Items"]\r
+\r
+ async def _get_music_libraries(self, client: JellyfinClient) -> list[dict[str, Any]]:\r
+ """Return all supported libraries a user has access to."""\r
+ response = API.get_media_folders(client.jellyfin)\r
+ libraries = response["Items"]\r
+ result = []\r
+ for library in libraries:\r
+ if ITEM_KEY_COLLECTION_TYPE in library and library[ITEM_KEY_COLLECTION_TYPE] in "music":\r
+ result.append(library)\r
+ return result\r
+\r
+ async def _get_playlists(self, client: JellyfinClient) -> list[dict[str, Any]]:\r
+ """Return all supported libraries a user has access to."""\r
+ response = API.get_media_folders(client.jellyfin)\r
+ libraries = response["Items"]\r
+ result = []\r
+ for library in libraries:\r
+ if (\r
+ ITEM_KEY_COLLECTION_TYPE in library\r
+ and library[ITEM_KEY_COLLECTION_TYPE] in "playlists"\r
+ ):\r
+ result.append(library)\r
+ return result\r
+\r
+ def _media_mime_type(self, media_item: dict[str, Any]) -> str | None:\r
+ """Return the mime type of a media item."""\r
+ if not media_item.get(ITEM_KEY_MEDIA_SOURCES):\r
+ return None\r
+\r
+ media_source = media_item[ITEM_KEY_MEDIA_SOURCES][0]\r
+\r
+ if "Path" not in media_source:\r
+ return None\r
+\r
+ path = media_source["Path"]\r
+ mime_type, _ = mimetypes.guess_type(path)\r
+\r
+ return mime_type\r
+\r
+ async def get_audio_stream(self, streamdetails: StreamDetails) -> AsyncGenerator[bytes, None]:\r
+ """Return the audio stream for the provider item."""\r
+ url = API.audio_url(self._jellyfin_server.jellyfin, streamdetails.item_id)\r
+\r
+ timeout = ClientTimeout(total=0, connect=30, sock_read=600)\r
+ async with self.mass.http_session.get(url, timeout=timeout) as resp:\r
+ async for chunk in resp.content.iter_any():\r
+ yield chunk\r