import socket
import uuid
from asyncio import TaskGroup
-from typing import TYPE_CHECKING, Any
+from collections.abc import AsyncGenerator
+from aiojellyfin import MediaItem as JellyMediaItem
+from aiojellyfin import MediaLibrary as JellyMediaLibrary
from aiojellyfin import SessionConfiguration, authenticate_by_name
-if TYPE_CHECKING:
- from collections.abc import AsyncGenerator, Callable, Coroutine
-
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueType,
Artist,
AudioFormat,
ItemMapping,
- MediaItem,
MediaItemImage,
Playlist,
ProviderMapping,
SearchResults,
Track,
+ UniqueList,
)
+from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.streamdetails import StreamDetails
-
-if TYPE_CHECKING:
- from music_assistant.common.models.provider import ProviderManifest
-
-from music_assistant.constants import VARIOUS_ARTISTS_NAME
-
-if TYPE_CHECKING:
- from music_assistant.server import MusicAssistant
-if TYPE_CHECKING:
- from music_assistant.server.models import ProviderInstanceType
-
+from music_assistant.server.models import ProviderInstanceType
from music_assistant.server.models.music_provider import MusicProvider
+from music_assistant.server.server import MusicAssistant
from .const import (
CLIENT_VERSION,
"""Initialize provider(instance) with given configuration."""
session_config = SessionConfiguration(
session=self.mass.http_session,
- url=self.config.get_value(CONF_URL),
+ url=str(self.config.get_value(CONF_URL)),
verify_ssl=False,
app_name=USER_APP_NAME,
app_version=CLIENT_VERSION,
try:
self._client = await authenticate_by_name(
session_config,
- self.config.get_value(CONF_USERNAME),
- self.config.get_value(CONF_PASSWORD),
+ str(self.config.get_value(CONF_USERNAME)),
+ str(self.config.get_value(CONF_PASSWORD)),
)
except Exception as err:
raise LoginFailed(f"Authentication failed: {err}") from err
name=name,
)
- async def _parse(self, jellyfin_media) -> MediaItem | None:
- if jellyfin_media.type == "artist":
- return await self._parse_artist(jellyfin_media)
- elif jellyfin_media.type == "album":
- return await self._parse_album(jellyfin_media)
- elif jellyfin_media.type == "track":
- return await self._parse_track(jellyfin_media)
- elif jellyfin_media.type == "playlist":
- return await self._parse_playlist(jellyfin_media)
- return None
-
- async def _search_track(self, search_query, limit) -> list[dict[str, Any]]:
+ async def _search_track(self, search_query: str, limit: int) -> list[Track]:
resultset = await self._client.search_media_items(
term=search_query,
media=ITEM_TYPE_AUDIO,
limit=limit,
)
- return resultset["Items"]
+ tracks = []
+ for item in resultset["Items"]:
+ tracks.append(await self._parse_track(item))
+ return tracks
- async def _search_album(self, search_query, limit) -> list[dict[str, Any]]:
+ async def _search_album(self, search_query: str, limit: int) -> list[Album]:
if "-" in search_query:
searchterms = search_query.split(" - ")
albumname = searchterms[1]
media=ITEM_TYPE_ALBUM,
limit=limit,
)
- return resultset["Items"]
+ albums = []
+ for item in resultset["Items"]:
+ albums.append(await self._parse_album(item))
+ return albums
- async def _search_artist(self, search_query, limit) -> list[dict[str, Any]]:
+ async def _search_artist(self, search_query: str, limit: int) -> list[Artist]:
resultset = await self._client.search_media_items(
term=search_query,
media=ITEM_TYPE_ARTIST,
limit=limit,
)
- return resultset["Items"]
+ artists = []
+ for item in resultset["Items"]:
+ artists.append(await self._parse_artist(item))
+ return artists
- async def _search_playlist(self, search_query, limit) -> list[dict[str, Any]]:
+ async def _search_playlist(self, search_query: str, limit: int) -> list[Playlist]:
resultset = await self._client.search_media_items(
term=search_query,
media="Playlist",
limit=limit,
)
- return resultset["Items"]
+ playlists = []
+ for item in resultset["Items"]:
+ playlists.append(await self._parse_playlist(item))
+ return playlists
- async def _search_and_parse(
- self, search_coro: Coroutine, parse_coro: Callable
- ) -> list[MediaItem]:
- task_results = []
- async with TaskGroup() as tg:
- for item in await search_coro:
- task_results.append(tg.create_task(parse_coro(item)))
-
- results = []
- for task in task_results:
- results.append(task.result())
-
- return results
-
- async def _parse_album(self, jellyfin_album: dict[str, Any]) -> Album:
+ async def _parse_album(self, jellyfin_album: JellyMediaItem) -> Album:
"""Parse a Jellyfin Album response to an Album model object."""
album_id = jellyfin_album[ITEM_KEY_ID]
album = Album(
if ITEM_KEY_PRODUCTION_YEAR in current_jellyfin_album:
album.year = current_jellyfin_album[ITEM_KEY_PRODUCTION_YEAR]
if thumb := self._get_thumbnail_url(jellyfin_album):
- album.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ album.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
if ITEM_KEY_OVERVIEW in current_jellyfin_album:
album.metadata.description = current_jellyfin_album[ITEM_KEY_OVERVIEW]
if ITEM_KEY_MUSICBRAINZ_RELEASE_GROUP in current_jellyfin_album[ITEM_KEY_PROVIDER_IDS]:
if ITEM_KEY_SORT_NAME in current_jellyfin_album:
album.sort_name = current_jellyfin_album[ITEM_KEY_SORT_NAME]
if ITEM_KEY_ALBUM_ARTIST in current_jellyfin_album:
- album.artists.append(
- self._get_item_mapping(
- MediaType.ARTIST,
- current_jellyfin_album[ITEM_KEY_ALBUM_ARTISTS][0].get(ITEM_KEY_ID),
- current_jellyfin_album[ITEM_KEY_ALBUM_ARTIST],
+ for album_artist in current_jellyfin_album[ITEM_KEY_ALBUM_ARTISTS]:
+ album.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ album_artist[ITEM_KEY_ID],
+ album_artist[ITEM_KEY_NAME],
+ )
)
- )
elif len(current_jellyfin_album.get(ITEM_KEY_ARTIST_ITEMS, [])) >= 1:
for artist_item in current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS]:
album.artists.append(
album.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return album
- async def _parse_artist(self, jellyfin_artist: dict[str, Any]) -> Artist:
+ async def _parse_artist(self, jellyfin_artist: JellyMediaItem) -> Artist:
"""Parse a Jellyfin Artist response to Artist model object."""
artist_id = jellyfin_artist[ITEM_KEY_ID]
current_artist = await self._client.get_item(artist_id)
if ITEM_KEY_SORT_NAME in current_artist:
artist.sort_name = current_artist[ITEM_KEY_SORT_NAME]
if thumb := self._get_thumbnail_url(jellyfin_artist):
- artist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ artist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
user_data = current_artist.get(ITEM_KEY_USER_DATA, {})
artist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return artist
- async def _parse_track(self, jellyfin_track: dict[str, Any]) -> Track:
+ async def _parse_track(self, jellyfin_track: JellyMediaItem) -> Track:
"""Parse a Jellyfin Track response to a Track model object."""
current_jellyfin_track = await self._client.get_item(jellyfin_track[ITEM_KEY_ID])
available = False
track.position = track_idx
if thumb := self._get_thumbnail_url(jellyfin_track):
- track.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ track.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
if current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]:
for artist_item in current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]:
artist_item[ITEM_KEY_NAME],
)
)
- else:
- track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))
- else:
- track.artists.append(await self._parse_artist(name=VARIOUS_ARTISTS_NAME))
+
if ITEM_KEY_ALBUM_ID in current_jellyfin_track and ITEM_KEY_ALBUM in current_jellyfin_track:
track.album = self._get_item_mapping(
MediaType.ALBUM,
track.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return track
- async def _parse_playlist(self, jellyfin_playlist: dict[str, Any]) -> Playlist:
+ async def _parse_playlist(self, jellyfin_playlist: JellyMediaItem) -> Playlist:
"""Parse a Jellyfin Playlist response to a Playlist object."""
playlistid = jellyfin_playlist[ITEM_KEY_ID]
playlist = Playlist(
if ITEM_KEY_OVERVIEW in jellyfin_playlist:
playlist.metadata.description = jellyfin_playlist[ITEM_KEY_OVERVIEW]
if thumb := self._get_thumbnail_url(jellyfin_playlist):
- playlist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
user_data = jellyfin_playlist.get(ITEM_KEY_USER_DATA, {})
playlist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
playlist.is_editable = False
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
- tasks = {}
+ artists = None
+ albums = None
+ tracks = None
+ playlists = None
async with TaskGroup() as tg:
- for media_type in media_types:
- if media_type == MediaType.ARTIST:
- tasks[MediaType.ARTIST] = tg.create_task(
- self._search_and_parse(
- self._search_artist(search_query, limit), self._parse_artist
- )
- )
- elif media_type == MediaType.ALBUM:
- tasks[MediaType.ALBUM] = tg.create_task(
- self._search_and_parse(
- self._search_album(search_query, limit), self._parse_album
- )
- )
- elif media_type == MediaType.TRACK:
- tasks[MediaType.TRACK] = tg.create_task(
- self._search_and_parse(
- self._search_track(search_query, limit), self._parse_track
- )
- )
- elif media_type == MediaType.PLAYLIST:
- tasks[MediaType.PLAYLIST] = tg.create_task(
- self._search_and_parse(
- self._search_playlist(search_query, limit), self._parse_playlist
- )
- )
+ if MediaType.ARTIST in media_types:
+ artists = tg.create_task(self._search_artist(search_query, limit))
+ if MediaType.ALBUM in media_types:
+ albums = tg.create_task(self._search_album(search_query, limit))
+ if MediaType.TRACK in media_types:
+ tracks = tg.create_task(self._search_track(search_query, limit))
+ if MediaType.PLAYLIST in media_types:
+ playlists = tg.create_task(self._search_playlist(search_query, limit))
search_results = SearchResults()
- for media_type, task in tasks.items():
- if media_type == MediaType.ARTIST:
- search_results.artists = task.result()
- elif media_type == MediaType.ALBUM:
- search_results.albums = task.result()
- elif media_type == MediaType.TRACK:
- search_results.tracks = task.result()
- elif media_type == MediaType.PLAYLIST:
- search_results.playlists = task.result()
+ if artists:
+ search_results.artists += artists.result()
+ if albums:
+ search_results.albums += albums.result()
+ if tracks:
+ search_results.tracks += tracks.result()
+ if playlists:
+ search_results.playlists += playlists.result()
return search_results
else: # emby playlists are only audio type
yield await self._parse_playlist(playlist)
- async def get_album(self, prov_album_id) -> Album:
+ async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
if jellyfin_album := await self._client.get_item(prov_album_id):
return await self._parse_album(jellyfin_album)
for jellyfin_album_track in jellyfin_album_tracks
]
- async def get_artist(self, prov_artist_id) -> Artist:
+ async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
# This artist does not exist in jellyfin, so we can just load it from DB.
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
- async def get_track(self, prov_track_id) -> Track:
+ async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
if jellyfin_track := await self._client.get_item(prov_track_id):
return await self._parse_track(jellyfin_track)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist(self, prov_playlist_id) -> Playlist:
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
if jellyfin_playlist := await self._client.get_item(prov_playlist_id):
return await self._parse_playlist(jellyfin_playlist)
)
return result
- async def get_artist_albums(self, prov_artist_id) -> list[Album]:
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of albums for the given artist."""
if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
artists_obj = await self._get_children(prov_artist_id, ITEM_TYPE_ARTIST)
if ITEM_KEY_MEDIA_CODEC in media_stream:
content_type = ContentType.try_parse(media_stream[ITEM_KEY_MEDIA_CODEC])
else:
- content_type = ContentType.try_parse(mimetype)
+ content_type = ContentType.try_parse(mimetype) if mimetype else ContentType.UNKNOWN
return StreamDetails(
item_id=jellyfin_track[ITEM_KEY_ID],
provider=self.instance_id,
path=url,
)
- def _get_thumbnail_url(self, media_item: dict[str, Any]) -> str | None:
+ def _get_thumbnail_url(self, media_item: JellyMediaItem) -> str | None:
"""Return the URL for the primary image of a media item if available."""
image_tags = media_item[ITEM_KEY_IMAGE_TAGS]
def _get_stream_url(self, media_item: str) -> str:
"""Return the stream URL for a media item."""
- return self._client.audio_url(media_item) # type: ignore[no-any-return]
+ return self._client.audio_url(media_item)
- async def _get_children(self, parent_id: str, item_type: str) -> list[dict[str, Any]]:
+ async def _get_children(self, parent_id: str, item_type: str) -> list[JellyMediaItem]:
"""Return all children for the parent_id whose item type is item_type."""
- params = {
+ params: dict[str, str | int] = {
"Recursive": "true",
ITEM_KEY_PARENT_ID: parent_id,
}
result = await self._client.user_items("", params)
return result["Items"]
- async def _get_music_libraries(self) -> list[dict[str, Any]]:
+ async def _get_music_libraries(self) -> list[JellyMediaLibrary]:
"""Return all supported libraries a user has access to."""
response = await self._client.get_media_folders()
libraries = response["Items"]
result.append(library)
return result
- async def _get_playlists(self) -> list[dict[str, Any]]:
+ async def _get_playlists(self) -> list[JellyMediaLibrary]:
"""Return all supported libraries a user has access to."""
response = await self._client.get_media_folders()
libraries = response["Items"]
result.append(library)
return result
- def _media_mime_type(self, media_item: dict[str, Any]) -> str | None:
+ def _media_mime_type(self, media_item: JellyMediaItem) -> str | None:
"""Return the mime type of a media item."""
if not media_item.get(ITEM_KEY_MEDIA_SOURCES):
return None