)
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.constants import UNKNOWN_ARTIST_ID_MBID
from music_assistant.server.models import ProviderInstanceType
from music_assistant.server.models.music_provider import MusicProvider
from music_assistant.server.server import MusicAssistant
MAX_IMAGE_WIDTH,
SUPPORTED_CONTAINER_FORMATS,
TRACK_FIELDS,
+ UNKNOWN_ARTIST_MAPPING,
USER_APP_NAME,
USER_DATA_KEY_IS_FAVORITE,
)
)
tracks = []
for item in resultset["Items"]:
- tracks.append(await self._parse_track(item))
+ tracks.append(self._parse_track(item))
return tracks
async def _search_album(self, search_query: str, limit: int) -> list[Album]:
artist_item[ITEM_KEY_NAME],
)
)
+ else:
+ album.artists.append(UNKNOWN_ARTIST_MAPPING)
+
user_data = jellyfin_album.get(ITEM_KEY_USER_DATA, {})
album.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return album
artist.favorite = user_data.get(USER_DATA_KEY_IS_FAVORITE, False)
return artist
- async def _parse_track(self, jellyfin_track: JellyMediaItem) -> Track:
+ def _parse_track(self, jellyfin_track: JellyMediaItem) -> Track:
"""Parse a Jellyfin Track response to a Track model object."""
available = False
content = None
},
)
- if disc_number := jellyfin_track.get(ITEM_KEY_PARENT_INDEX_NUM):
- track.disc_number = disc_number
- if "IndexNumber" in jellyfin_track:
- if jellyfin_track["IndexNumber"] >= 1:
- track_idx = jellyfin_track["IndexNumber"]
- track.track_number = track_idx
- track.position = track_idx
+ track.disc_number = jellyfin_track.get(ITEM_KEY_PARENT_INDEX_NUM, 0)
+ track.track_number = jellyfin_track.get("IndexNumber", 0)
+ if track.track_number >= 0:
+ track.position = track.track_number
if thumb := self._get_thumbnail_url(jellyfin_track):
track.metadata.images = UniqueList(
artist_item[ITEM_KEY_NAME],
)
)
- elif ITEM_KEY_ALBUM_ID in jellyfin_track:
- parent_album = await self._client.get_item(jellyfin_track[ITEM_KEY_ALBUM_ID])
- if ITEM_KEY_ALBUM_ARTISTS in parent_album:
- for artist_item in parent_album[ITEM_KEY_ALBUM_ARTISTS]:
- track.artists.append(
- self._get_item_mapping(
- MediaType.ARTIST,
- artist_item[ITEM_KEY_ID],
- artist_item[ITEM_KEY_NAME],
- )
- )
+ else:
+ track.artists.append(UNKNOWN_ARTIST_MAPPING)
- if ITEM_KEY_ALBUM_ID in jellyfin_track and ITEM_KEY_ALBUM in jellyfin_track:
+ if ITEM_KEY_ALBUM_ID in jellyfin_track:
+ if not (album_name := jellyfin_track.get(ITEM_KEY_ALBUM)):
+ self.logger.debug("Track %s has AlbumID but no AlbumName", track.name)
+ album_name = f"Unknown Album ({jellyfin_track[ITEM_KEY_ALBUM_ID]})"
track.album = self._get_item_mapping(
MediaType.ALBUM,
jellyfin_track[ITEM_KEY_ALBUM_ID],
- jellyfin_track[ITEM_KEY_ALBUM],
- )
- elif ITEM_KEY_ALBUM_ID in jellyfin_track:
- parent_album = await self._client.get_item(jellyfin_track[ITEM_KEY_ALBUM_ID])
- track.album = self._get_item_mapping(
- MediaType.ALBUM,
- parent_album[ITEM_KEY_ID],
- parent_album[ITEM_KEY_NAME],
+ album_name,
)
if ITEM_KEY_RUNTIME_TICKS in jellyfin_track:
"""Retrieve library tracks from Jellyfin Music."""
jellyfin_libraries = await self._get_music_libraries()
for jellyfin_library in jellyfin_libraries:
- albums = await self._client.albums(jellyfin_library[ITEM_KEY_ID])
- for album in albums["Items"]:
- tracks_obj = await self._get_children(album[ITEM_KEY_ID], ITEM_TYPE_AUDIO)
- for track in tracks_obj:
- yield await self._parse_track(track)
+ offset = 0
+ limit = 100
+
+ response = await self._client.tracks(
+ jellyfin_library[ITEM_KEY_ID],
+ start_index=offset,
+ limit=limit,
+ enable_user_data=True,
+ fields=TRACK_FIELDS,
+ )
+ for track in response["Items"]:
+ yield self._parse_track(track)
+
+ while offset < response["TotalRecordCount"]:
+ response = await self._client.tracks(
+ jellyfin_library[ITEM_KEY_ID],
+ start_index=offset,
+ limit=limit,
+ enable_user_data=True,
+ fields=TRACK_FIELDS,
+ )
+ for track in response["Items"]:
+ yield self._parse_track(track)
+
+ offset += limit
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
"""Get album tracks for given album id."""
jellyfin_album_tracks = await self._get_children(prov_album_id, ITEM_TYPE_AUDIO)
return [
- await self._parse_track(jellyfin_album_track)
+ self._parse_track(jellyfin_album_track)
for jellyfin_album_track in jellyfin_album_tracks
]
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
- # This artist does not exist in jellyfin, so we can just load it from DB.
-
- if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
- prov_artist_id, self.instance_id
- ):
- return db_artist
- msg = f"Artist not found: {prov_artist_id}"
- raise MediaNotFoundError(msg)
+ if prov_artist_id == UNKNOWN_ARTIST_MAPPING.item_id:
+ artist = Artist(
+ item_id=UNKNOWN_ARTIST_MAPPING.item_id,
+ name=UNKNOWN_ARTIST_MAPPING.name,
+ provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=UNKNOWN_ARTIST_MAPPING.item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ artist.mbid = UNKNOWN_ARTIST_ID_MBID
+ return artist
if jellyfin_artist := await self._client.get_item(prov_artist_id):
return self._parse_artist(jellyfin_artist)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
if jellyfin_track := await self._client.get_item(prov_track_id):
- return await self._parse_track(jellyfin_track)
+ return self._parse_track(jellyfin_track)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
return result
for index, jellyfin_track in enumerate(playlist_items, 1):
try:
- if track := await self._parse_track(jellyfin_track):
+ if track := self._parse_track(jellyfin_track):
if not track.position:
track.position = offset + index
result.append(track)
"""Get a list of albums for the given artist."""
if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
return []
- albums = await self._client.album(
+ albums = await self._client.albums(
prov_artist_id, fields=ALBUM_FIELDS, enable_user_data=True
)
return [self._parse_album(album) for album in albums["Items"]]
else:
params["IncludeItemTypes"] = item_type
if item_type in ITEM_TYPE_AUDIO:
- params["Fields"] = TRACK_FIELDS
+ params["Fields"] = ",".join(TRACK_FIELDS)
result = await self._client.user_items("", params)
return result["Items"]