from __future__ import annotations
-import asyncio
import logging
import mimetypes
import socket
from asyncio import TaskGroup
from typing import TYPE_CHECKING, Any
+from aiojellyfin import SessionConfiguration, authenticate_by_name
+
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Callable, Coroutine
-from jellyfin_apiclient_python import JellyfinClient
-from jellyfin_apiclient_python.api import API
-
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueType,
class JellyfinProvider(MusicProvider):
"""Provider for a jellyfin music library."""
- # _jellyfin_server : JellyfinClient = None
-
async def handle_async_init(self) -> None:
"""Initialize provider(instance) with given configuration."""
+ session_config = SessionConfiguration(
+ session=self.mass.http_session,
+ url=self.config.get_value(CONF_URL),
+ verify_ssl=False,
+ app_name=USER_APP_NAME,
+ app_version=CLIENT_VERSION,
+ device_name=socket.gethostname(),
+ device_id=str(uuid.uuid4()),
+ )
- def connect() -> JellyfinClient:
- try:
- client = JellyfinClient()
- device_name = socket.gethostname()
- device_id = str(uuid.uuid4())
- client.config.app(USER_APP_NAME, CLIENT_VERSION, device_name, device_id)
- if CONF_URL.startswith("https://"):
- JellyfinClient.config.data["auth.ssl"] = True
- else:
- client.config.data["auth.ssl"] = False
- jellyfin_server_url = self.config.get_value(CONF_URL)
- jellyfin_server_user = self.config.get_value(CONF_USERNAME)
- jellyfin_server_password = self.config.get_value(CONF_PASSWORD)
- client.auth.connect_to_address(jellyfin_server_url)
- client.auth.login(
- jellyfin_server_url, jellyfin_server_user, jellyfin_server_password
- )
- credentials = client.auth.credentials.get_credentials()
- if not credentials["Servers"]:
- raise IndexError("No servers found")
- server = credentials["Servers"][0]
- server["username"] = jellyfin_server_user
- _jellyfin_server = client
- # json.dumps(server)
- except Exception as err:
- msg = f"Authentication failed: {err}"
- raise LoginFailed(msg) from err
- return _jellyfin_server
-
- self._jellyfin_server = await self._run_async(connect)
+ try:
+ self._client = await authenticate_by_name(
+ session_config,
+ self.config.get_value(CONF_USERNAME),
+ self.config.get_value(CONF_PASSWORD),
+ )
+ except Exception as err:
+ raise LoginFailed(f"Authentication failed: {err}") from err
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return True if the provider is a streaming provider."""
return False
- async def _run_async(self, call: Callable, *args, **kwargs):
- return await self.mass.create_task(call, *args, **kwargs)
-
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
return ItemMapping(
media_type=media_type,
return None
async def _search_track(self, search_query, limit) -> list[dict[str, Any]]:
- resultset = await self._run_async(
- API.search_media_items,
- self._jellyfin_server.jellyfin,
+ resultset = await self._client.search_media_items(
term=search_query,
media=ITEM_TYPE_AUDIO,
limit=limit,
albumname = searchterms[1]
else:
albumname = search_query
- resultset = await self._run_async(
- API.search_media_items,
- self._jellyfin_server.jellyfin,
+ resultset = await self._client.search_media_items(
term=albumname,
media=ITEM_TYPE_ALBUM,
limit=limit,
return resultset["Items"]
async def _search_artist(self, search_query, limit) -> list[dict[str, Any]]:
- resultset = await self._run_async(
- API.search_media_items,
- self._jellyfin_server.jellyfin,
+ resultset = await self._client.search_media_items(
term=search_query,
media=ITEM_TYPE_ARTIST,
limit=limit,
return resultset["Items"]
async def _search_playlist(self, search_query, limit) -> list[dict[str, Any]]:
- resultset = await self._run_async(
- API.search_media_items,
- self._jellyfin_server.jellyfin,
+ resultset = await self._client.search_media_items(
term=search_query,
media="Playlist",
limit=limit,
)
},
)
- current_jellyfin_album = API.get_item(self._jellyfin_server.jellyfin, album_id)
+ current_jellyfin_album = await self._client.get_item(album_id)
if ITEM_KEY_PRODUCTION_YEAR in current_jellyfin_album:
album.year = current_jellyfin_album[ITEM_KEY_PRODUCTION_YEAR]
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_album):
+ if thumb := self._get_thumbnail_url(jellyfin_album):
album.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
async def _parse_artist(self, jellyfin_artist: dict[str, Any]) -> Artist:
"""Parse a Jellyfin Artist response to Artist model object."""
artist_id = jellyfin_artist[ITEM_KEY_ID]
- current_artist = API.get_item(self._jellyfin_server.jellyfin, artist_id)
+ current_artist = await self._client.get_item(artist_id)
if not artist_id:
msg = "Artist does not have a valid ID"
raise InvalidDataError(msg)
)
if ITEM_KEY_SORT_NAME in current_artist:
artist.sort_name = current_artist[ITEM_KEY_SORT_NAME]
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_artist):
+ if thumb := self._get_thumbnail_url(jellyfin_artist):
artist.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
async def _parse_track(self, jellyfin_track: dict[str, Any]) -> Track:
"""Parse a Jellyfin Track response to a Track model object."""
- current_jellyfin_track = await asyncio.to_thread(
- API.get_item, self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID]
- )
+ current_jellyfin_track = await self._client.get_item(jellyfin_track[ITEM_KEY_ID])
available = False
content = None
available = current_jellyfin_track[ITEM_KEY_CAN_DOWNLOAD]
ContentType.try_parse(content) if content else ContentType.UNKNOWN
),
),
- url=self._get_stream_url(self._jellyfin_server, jellyfin_track[ITEM_KEY_ID]),
+ url=self._get_stream_url(jellyfin_track[ITEM_KEY_ID]),
)
},
)
track.track_number = track_idx
track.position = track_idx
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_track):
+ if thumb := self._get_thumbnail_url(jellyfin_track):
track.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
)
)
elif ITEM_KEY_ALBUM_ID in current_jellyfin_track:
- parent_album = API.get_item(
- self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_ALBUM_ID]
- )
+ parent_album = await self._client.get_item(current_jellyfin_track[ITEM_KEY_ALBUM_ID])
if ITEM_KEY_ALBUM_ARTISTS in parent_album:
for artist_item in parent_album[ITEM_KEY_ALBUM_ARTISTS]:
track.artists.append(
current_jellyfin_track[ITEM_KEY_ALBUM],
)
elif ITEM_KEY_ALBUM_ID in current_jellyfin_track:
- parent_album = API.get_item(
- self._jellyfin_server.jellyfin, current_jellyfin_track[ITEM_KEY_ALBUM_ID]
- )
+ parent_album = await self._client.get_item(current_jellyfin_track[ITEM_KEY_ALBUM_ID])
track.album = self._get_item_mapping(
MediaType.ALBUM,
parent_album[ITEM_KEY_ID],
)
if ITEM_KEY_OVERVIEW in jellyfin_playlist:
playlist.metadata.description = jellyfin_playlist[ITEM_KEY_OVERVIEW]
- if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_playlist):
+ if thumb := self._get_thumbnail_url(jellyfin_playlist):
playlist.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Jellyfin Music."""
- jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)
+ jellyfin_libraries = await self._get_music_libraries()
for jellyfin_library in jellyfin_libraries:
- response = API._get(
- self._jellyfin_server.jellyfin,
- "Artists",
- {
- ITEM_KEY_PARENT_ID: jellyfin_library[ITEM_KEY_ID],
- "ArtistType": "Artist,AlbumArtist",
- },
- )
+ response = await self._client.artists(jellyfin_library[ITEM_KEY_ID])
artists_obj = response["Items"]
for artist in artists_obj:
yield await self._parse_artist(artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Jellyfin Music."""
- jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)
+ jellyfin_libraries = await self._get_music_libraries()
for jellyfin_library in jellyfin_libraries:
- artists_obj = await self._get_children(
- self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST
- )
+ artists_obj = await self._get_children(jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST)
for artist in artists_obj:
- albums_obj = await self._get_children(
- self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM
- )
+ albums_obj = await self._get_children(artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM)
for album in albums_obj:
yield await self._parse_album(album)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Jellyfin Music."""
- jellyfin_libraries = await self._get_music_libraries(self._jellyfin_server)
- self._jellyfin_server.default_timeout = 120
+ jellyfin_libraries = await self._get_music_libraries()
for jellyfin_library in jellyfin_libraries:
- artists_obj = await self._get_children(
- self._jellyfin_server, jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST
- )
+ artists_obj = await self._get_children(jellyfin_library[ITEM_KEY_ID], ITEM_TYPE_ARTIST)
for artist in artists_obj:
- albums_obj = await self._get_children(
- self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM
- )
+ albums_obj = await self._get_children(artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM)
for album in albums_obj:
- tracks_obj = await self._get_children(
- self._jellyfin_server, album[ITEM_KEY_ID], ITEM_TYPE_AUDIO
- )
+ tracks_obj = await self._get_children(album[ITEM_KEY_ID], ITEM_TYPE_AUDIO)
for track in tracks_obj:
yield await self._parse_track(track)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
- playlist_libraries = await self._get_playlists(self._jellyfin_server)
+ playlist_libraries = await self._get_playlists()
for playlist_library in playlist_libraries:
- playlists_obj = await self._get_children(
- self._jellyfin_server, playlist_library[ITEM_KEY_ID], "Playlist"
- )
+ playlists_obj = await self._get_children(playlist_library[ITEM_KEY_ID], "Playlist")
for playlist in playlists_obj:
if "MediaType" in playlist: # Only jellyfin has this property
if playlist["MediaType"] == "Audio":
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
- if jellyfin_album := API.get_item(self._jellyfin_server.jellyfin, prov_album_id):
- return await self._run_async(self._parse_album(jellyfin_album))
+ if jellyfin_album := await self._client.get_item(prov_album_id):
+ return await self._parse_album(jellyfin_album)
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
- jellyfin_album_tracks = await self._get_children(
- self._jellyfin_server, prov_album_id, ITEM_TYPE_AUDIO
- )
+ jellyfin_album_tracks = await self._get_children(prov_album_id, ITEM_TYPE_AUDIO)
return [
await self._parse_track(jellyfin_album_track)
for jellyfin_album_track in jellyfin_album_tracks
msg = f"Artist not found: {prov_artist_id}"
raise MediaNotFoundError(msg)
- if jellyfin_artist := API.get_item(self._jellyfin_server.jellyfin, prov_artist_id):
+ if jellyfin_artist := await self._client.get_item(prov_artist_id):
return await self._parse_artist(jellyfin_artist)
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- if jellyfin_track := API.get_item(self._jellyfin_server.jellyfin, prov_track_id):
+ if jellyfin_track := await self._client.get_item(prov_track_id):
return await self._parse_track(jellyfin_track)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- if jellyfin_playlist := API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id):
+ if jellyfin_playlist := await self._client.get_item(prov_playlist_id):
return await self._parse_playlist(jellyfin_playlist)
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
# paging not supported, we always return the whole list at once
return []
# TODO: Does Jellyfin support paging here?
- jellyfin_playlist = API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id)
- playlist_items = await self._get_children(
- self._jellyfin_server, jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO
- )
+ jellyfin_playlist = await self._client.get_item(prov_playlist_id)
+ playlist_items = await self._get_children(jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO)
if not playlist_items:
return result
for index, jellyfin_track in enumerate(playlist_items, 1):
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
- artists_obj = await self._get_children(
- self._jellyfin_server, prov_artist_id, ITEM_TYPE_ARTIST
- )
+ artists_obj = await self._get_children(prov_artist_id, ITEM_TYPE_ARTIST)
for artist in artists_obj:
- jellyfin_albums = await self._get_children(
- self._jellyfin_server, artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM
- )
+ jellyfin_albums = await self._get_children(artist[ITEM_KEY_ID], ITEM_TYPE_ALBUM)
if jellyfin_albums:
albums = []
for album_obj in jellyfin_albums:
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- jellyfin_track = API.get_item(self._jellyfin_server.jellyfin, item_id)
+ jellyfin_track = await self._client.get_item(item_id)
mimetype = self._media_mime_type(jellyfin_track)
media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]
- url = API.audio_url(
- self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS
- )
+ url = self._client.audio_url(jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS)
if ITEM_KEY_MEDIA_CODEC in media_stream:
content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])
else:
path=url,
)
- def _get_thumbnail_url(self, client: JellyfinClient, media_item: dict[str, Any]) -> str | None:
+ def _get_thumbnail_url(self, media_item: dict[str, Any]) -> str | None:
"""Return the URL for the primary image of a media item if available."""
image_tags = media_item[ITEM_KEY_IMAGE_TAGS]
return None
item_id = media_item[ITEM_KEY_ID]
- return API.artwork(client.jellyfin, item_id, "Primary", MAX_IMAGE_WIDTH)
+ return self._client.artwork(item_id, "Primary", MAX_IMAGE_WIDTH)
- def _get_stream_url(self, client: JellyfinClient, media_item: str) -> str:
+ def _get_stream_url(self, media_item: str) -> str:
"""Return the stream URL for a media item."""
- return API.audio_url(client.jellyfin, media_item) # type: ignore[no-any-return]
+ return self._client.audio_url(media_item) # type: ignore[no-any-return]
- async def _get_children(
- self, client: JellyfinClient, parent_id: str, item_type: str
- ) -> list[dict[str, Any]]:
+ async def _get_children(self, parent_id: str, item_type: str) -> list[dict[str, Any]]:
"""Return all children for the parent_id whose item type is item_type."""
params = {
"Recursive": "true",
if item_type in ITEM_TYPE_AUDIO:
params["Fields"] = ITEM_KEY_MEDIA_SOURCES
- result = client.jellyfin.user_items("", params)
+ result = await self._client.user_items("", params)
return result["Items"]
- async def _get_music_libraries(self, client: JellyfinClient) -> list[dict[str, Any]]:
+ async def _get_music_libraries(self) -> list[dict[str, Any]]:
"""Return all supported libraries a user has access to."""
- response = API.get_media_folders(client.jellyfin)
+ response = await self._client.get_media_folders()
libraries = response["Items"]
result = []
for library in libraries:
result.append(library)
return result
- async def _get_playlists(self, client: JellyfinClient) -> list[dict[str, Any]]:
+ async def _get_playlists(self) -> list[dict[str, Any]]:
"""Return all supported libraries a user has access to."""
- response = API.get_media_folders(client.jellyfin)
+ response = await self._client.get_media_folders()
libraries = response["Items"]
result = []
for library in libraries: