from asyncio import TaskGroup
from collections.abc import AsyncGenerator
+from aiojellyfin import Album as JellyAlbum
+from aiojellyfin import Artist as JellyArtist
from aiojellyfin import MediaItem as JellyMediaItem
from aiojellyfin import MediaLibrary as JellyMediaLibrary
+from aiojellyfin import Playlist as JellyPlaylist
from aiojellyfin import SessionConfiguration, authenticate_by_name
+from aiojellyfin import Track as JellyTrack
from music_assistant.common.models.config_entries import (
ConfigEntry,
ITEM_KEY_MUSICBRAINZ_TRACK,
ITEM_KEY_NAME,
ITEM_KEY_OVERVIEW,
- ITEM_KEY_PARENT_ID,
ITEM_KEY_PARENT_INDEX_NUM,
ITEM_KEY_PRODUCTION_YEAR,
ITEM_KEY_PROVIDER_IDS,
ITEM_KEY_RUNTIME_TICKS,
ITEM_KEY_SORT_NAME,
ITEM_KEY_USER_DATA,
- ITEM_TYPE_ALBUM,
- ITEM_TYPE_ARTIST,
- ITEM_TYPE_AUDIO,
- ITEM_TYPE_MUSICARTISTS,
MAX_IMAGE_WIDTH,
SUPPORTED_CONTAINER_FORMATS,
TRACK_FIELDS,
)
async def _search_track(self, search_query: str, limit: int) -> list[Track]:
- resultset = await self._client.search_media_items(
- term=search_query,
- media=ITEM_TYPE_AUDIO,
+ resultset = await self._client.tracks(
+ search_term=search_query,
limit=limit,
enable_user_data=True,
fields=TRACK_FIELDS,
albumname = searchterms[1]
else:
albumname = search_query
- resultset = await self._client.search_media_items(
- term=albumname,
- media=ITEM_TYPE_ALBUM,
+ resultset = await self._client.albums(
+ search_term=albumname,
limit=limit,
enable_user_data=True,
fields=ALBUM_FIELDS,
return albums
async def _search_artist(self, search_query: str, limit: int) -> list[Artist]:
- resultset = await self._client.search_media_items(
- term=search_query,
- media=ITEM_TYPE_ARTIST,
+ resultset = await self._client.artists(
+ search_term=search_query,
limit=limit,
enable_user_data=True,
fields=ARTIST_FIELDS,
return artists
async def _search_playlist(self, search_query: str, limit: int) -> list[Playlist]:
- resultset = await self._client.search_media_items(
- term=search_query,
- media="Playlist",
+ resultset = await self._client.playlists(
+ search_term=search_query,
limit=limit,
+ enable_user_data=True,
)
playlists = []
for item in resultset["Items"]:
playlists.append(self._parse_playlist(item))
return playlists
- def _parse_album(self, jellyfin_album: JellyMediaItem) -> Album:
+ def _parse_album(self, jellyfin_album: JellyAlbum) -> Album:
"""Parse a Jellyfin Album response to an Album model object."""
album_id = jellyfin_album[ITEM_KEY_ID]
album = Album(
album.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return album
- def _parse_artist(self, jellyfin_artist: JellyMediaItem) -> Artist:
+ def _parse_artist(self, jellyfin_artist: JellyArtist) -> Artist:
"""Parse a Jellyfin Artist response to Artist model object."""
artist_id = jellyfin_artist[ITEM_KEY_ID]
artist = Artist(
artist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return artist
- def _parse_track(self, jellyfin_track: JellyMediaItem) -> Track:
+ def _parse_track(self, jellyfin_track: JellyTrack) -> Track:
"""Parse a Jellyfin Track response to a Track model object."""
available = False
content = None
track.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return track
- def _parse_playlist(self, jellyfin_playlist: JellyMediaItem) -> Playlist:
+ def _parse_playlist(self, jellyfin_playlist: JellyPlaylist) -> Playlist:
"""Parse a Jellyfin Playlist response to a Playlist object."""
playlistid = jellyfin_playlist[ITEM_KEY_ID]
playlist = Playlist(
"""Retrieve all library playlists from the provider."""
playlist_libraries = await self._get_playlists()
for playlist_library in playlist_libraries:
- playlists_obj = await self._get_children(playlist_library[ITEM_KEY_ID], "Playlist")
- for playlist in playlists_obj:
+ playlists_obj = await self._client.playlists(playlist_library[ITEM_KEY_ID])
+ for playlist in playlists_obj["Items"]:
if "MediaType" in playlist: # Only jellyfin has this property
if playlist["MediaType"] == "Audio":
yield self._parse_playlist(playlist)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- if jellyfin_album := await self._client.get_item(prov_album_id):
+ if jellyfin_album := await self._client.get_album(prov_album_id):
return self._parse_album(jellyfin_album)
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
- jellyfin_album_tracks = await self._get_children(prov_album_id, ITEM_TYPE_AUDIO)
+ jellyfin_album_tracks = await self._client.tracks(
+ prov_album_id, enable_user_data=True, fields=TRACK_FIELDS
+ )
return [
self._parse_track(jellyfin_album_track)
- for jellyfin_album_track in jellyfin_album_tracks
+ for jellyfin_album_track in jellyfin_album_tracks["Items"]
]
async def get_artist(self, prov_artist_id: str) -> Artist:
artist.mbid = UNKNOWN_ARTIST_ID_MBID
return artist
- if jellyfin_artist := await self._client.get_item(prov_artist_id):
+ if jellyfin_artist := await self._client.get_artist(prov_artist_id):
return self._parse_artist(jellyfin_artist)
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- if jellyfin_track := await self._client.get_item(prov_track_id):
+ if jellyfin_track := await self._client.get_track(prov_track_id):
return self._parse_track(jellyfin_track)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- if jellyfin_playlist := await self._client.get_item(prov_playlist_id):
+ if jellyfin_playlist := await self._client.get_playlist(prov_playlist_id):
return self._parse_playlist(jellyfin_playlist)
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
# paging not supported, we always return the whole list at once
return []
# TODO: Does Jellyfin support paging here?
- jellyfin_playlist = await self._client.get_item(prov_playlist_id)
- playlist_items = await self._get_children(jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO)
+ jellyfin_playlist = await self._client.get_playlist(prov_playlist_id)
+ playlist_items = await self._client.tracks(
+ jellyfin_playlist[ITEM_KEY_ID], enable_user_data=True, fields=TRACK_FIELDS
+ )
if not playlist_items:
return result
- for index, jellyfin_track in enumerate(playlist_items, 1):
+ for index, jellyfin_track in enumerate(playlist_items["Items"], 1):
try:
if track := self._parse_track(jellyfin_track):
if not track.position:
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- jellyfin_track = await self._client.get_item(item_id)
+ jellyfin_track = await self._client.get_track(item_id)
mimetype = self._media_mime_type(jellyfin_track)
media_stream = jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0]
url = self._client.audio_url(jellyfin_track[ITEM_KEY_ID], SUPPORTED_CONTAINER_FORMATS)
"""Return the stream URL for a media item."""
return self._client.audio_url(media_item)
- async def _get_children(self, parent_id: str, item_type: str) -> list[JellyMediaItem]:
- """Return all children for the parent_id whose item type is item_type."""
- params: dict[str, str | int] = {
- "Recursive": "true",
- ITEM_KEY_PARENT_ID: parent_id,
- }
- if item_type in ITEM_TYPE_ARTIST:
- params["IncludeItemTypes"] = f"{ITEM_TYPE_MUSICARTISTS},{ITEM_TYPE_ARTIST}"
- else:
- params["IncludeItemTypes"] = item_type
- if item_type in ITEM_TYPE_AUDIO:
- params["Fields"] = ",".join(TRACK_FIELDS)
-
- result = await self._client.user_items("", params)
- return result["Items"]
-
async def _get_music_libraries(self) -> list[JellyMediaLibrary]:
"""Return all supported libraries a user has access to."""
response = await self._client.get_media_folders()
result.append(library)
return result
- def _media_mime_type(self, media_item: JellyMediaItem) -> str | None:
+ def _media_mime_type(self, media_item: JellyTrack) -> str | None:
"""Return the mime type of a media item."""
if not media_item.get(ITEM_KEY_MEDIA_SOURCES):
return None