from __future__ import annotations
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from aiohttp import ClientSession
from ibroadcastaio import IBroadcastClient
ConfigEntryType,
ContentType,
ImageType,
+ MediaType,
ProviderFeature,
StreamType,
)
AudioFormat,
ItemMapping,
MediaItemImage,
- MediaType,
Playlist,
ProviderMapping,
Track,
+ UniqueList,
)
from music_assistant_models.streamdetails import StreamDetails
from music_assistant.helpers.util import parse_title_and_version
from music_assistant.models.music_provider import MusicProvider
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_PLAYLISTS,
ProviderFeature.BROWSE,
ProviderFeature.ARTIST_ALBUMS,
-)
+}
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
- from music_assistant import MusicAssistant
+ from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
- from music_assistant.models.provider import ProviderManifest
async def setup(
class IBroadcastProvider(MusicProvider):
"""Provider for iBroadcast."""
- _user_id = None
- _client = None
- _token = None
+ _user_id: str
+ _client: IBroadcastClient
+ _token: str
async def handle_async_init(self) -> None:
"""Set up the iBroadcast provider."""
async with ClientSession() as session:
self._client = IBroadcastClient(session)
status = await self._client.login(
- self.config.get_value(CONF_USERNAME), self.config.get_value(CONF_PASSWORD)
+ self.config.get_value(CONF_USERNAME),
+ self.config.get_value(CONF_PASSWORD),
)
self._user_id = status["user"]["id"]
self._token = status["user"]["token"]
await self._client.refresh_library()
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
albums = []
for album in albums_objs:
try:
- albums.append(self._parse_album(album))
+ albums.append(await self._parse_album(album))
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
self.logger.debug("Parse album failed: %s", album, exc_info=error)
continue
self.logger.debug("Parse track failed: %s", track, exc_info=error)
continue
- def _get_artist_item_mapping(self, artist_id, artist_obj: dict) -> ItemMapping:
- if (not artist_id and artist_obj["name"] == "Various Artists") or artist_id == 0:
+ def _get_artist_item_mapping(self, artist_id: str, artist_obj: dict[str, Any]) -> ItemMapping:
+ if (not artist_id and artist_obj["name"] == "Various Artists") or artist_id == "0":
artist_id = VARIOUS_ARTISTS_MBID
- return self._get_item_mapping(MediaType.ARTIST, artist_id, artist_obj.get("name"))
+ return self._get_item_mapping(MediaType.ARTIST, artist_id, str(artist_obj.get("name")))
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
return ItemMapping(
return tracks
return await self._get_tracks(playlist_obj["tracks"], True)
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
# How to buildup a stream url:
# [streaming_server]/[url]?Expires=[now]&Signature=[user token]&file_id=[file ID]
tracks.append(track)
return tracks
- async def _parse_artist(self, artist_obj: dict) -> Artist:
+ async def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
"""Parse a iBroadcast user response to Artist model object."""
artist_id = artist_obj["artist_id"]
artist = Artist(
)
# Artwork
if "artwork_id" in artist_obj:
- artist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=await self._client.get_artist_artwork_url(artist_id),
- provider=self.instance_id,
- remotely_accessible=True,
- )
- ]
+ artist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=await self._client.get_artist_artwork_url(artist_id),
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
return artist
- async def _parse_album(self, album_obj: dict) -> Album:
+ async def _parse_album(self, album_obj: dict[str, Any]) -> Album:
"""Parse ibroadcast album object to generic layout."""
album_id = album_obj["album_id"]
name, version = parse_title_and_version(album_obj["name"])
)
},
)
- artist = None
if album_obj["artist_id"] == 0:
artist = Artist(
item_id=VARIOUS_ARTISTS_MBID,
)
},
)
+ album.artists.append(artist)
else:
- artist = self._get_item_mapping(
+ artist_mapping = self._get_item_mapping(
MediaType.ARTIST,
album_obj["artist_id"],
(await self._client.get_artist(album_obj["artist_id"]))["name"]
if await self._client.get_artist(album_obj["artist_id"])
else UNKNOWN_ARTIST,
)
- album.artists.append(artist)
+ album.artists.append(artist_mapping)
if "rating" in album_obj and album_obj["rating"] == 5:
album.favorite = True
# There is only an artwork in the tracks, lets get the first track one
artwork_url = await self._client.get_album_artwork_url(album_id)
if artwork_url:
- album.metadata.images = [self._get_artwork_object(artwork_url)]
+ album.metadata.images = UniqueList([self._get_artwork_object(artwork_url)])
return album
def _get_artwork_object(self, url: str) -> MediaItemImage:
remotely_accessible=True,
)
- async def _parse_track(self, track_obj: dict) -> Track:
+ async def _parse_track(self, track_obj: dict[str, Any]) -> Track:
"""Parse an iBroadcast track object to a Track model object."""
track = Track(
item_id=track_obj["track_id"],
# Track artists
if "artist_id" in track_obj:
artist_id = track_obj["artist_id"]
- track.artists = [
- self._get_artist_item_mapping(artist_id, await self._client.get_artist(artist_id))
- ]
+ track.artists = UniqueList(
+ [self._get_artist_item_mapping(artist_id, await self._client.get_artist(artist_id))]
+ )
# additional artists structure: 'artists_additional': [[artist id, phrase, type]]
track.artists.extend(
[
self._get_artist_item_mapping(
- additional_artist[0], await self._client.get_artist(additional_artist[0])
+ additional_artist[0],
+ await self._client.get_artist(additional_artist[0]),
)
for additional_artist in track_obj["artists_additional"]
if additional_artist[0]
raise InvalidDataError(msg)
# Artwork
- track.metadata.images = [
- self._get_artwork_object(
- await self._client.get_track_artwork_url(track_obj["track_id"])
- )
- ]
+ track.metadata.images = UniqueList(
+ [
+ self._get_artwork_object(
+ await self._client.get_track_artwork_url(track_obj["track_id"])
+ )
+ ]
+ )
# Genre
- genres = []
+ genres: set[str] = set()
if track_obj["genre"]:
- genres = [track_obj["genre"]]
+ genres.add(track_obj["genre"])
if track_obj["genres_additional"]:
- genres.extend(track_obj["genres_additional"])
+ genres.add(track_obj["genres_additional"])
track.metadata.genres = genres
if track_obj["album_id"]:
album = await self._client.get_album(track_obj["album_id"])
)
return track
- async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
+ async def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
"""Parse an iBroadcast Playlist response to a Playlist object."""
playlist_id = str(playlist_obj["playlist_id"])
playlist = Playlist(
)
# Can be supported in future, the API has options available
playlist.is_editable = False
- playlist.metadata.images = [
- self._get_artwork_object(await self._client.get_playlist_artwork_url(int(playlist_id)))
- ]
+ playlist.metadata.images = UniqueList(
+ [
+ self._get_artwork_object(
+ await self._client.get_playlist_artwork_url(int(playlist_id))
+ )
+ ]
+ )
if "description" in playlist_obj:
playlist.metadata.description = playlist_obj["description"]
return playlist