import hashlib
import time
from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
from aiohttp import client_exceptions
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
class QobuzProvider(MusicProvider):
"""Provider for the Qobux music service."""
- _user_auth_info: str | None = None
+ _user_auth_info: dict[str, Any] | None = None
# rate limiter needs to be specified on provider-level,
# so make it an instance attribute
throttler = ThrottlerManager(rate_limit=1, period=2)
]
if not media_types:
return result
- params = {"query": search_query, "limit": limit}
+ params: dict[str, Any] = {"query": search_query, "limit": limit}
if len(media_types) == 1:
# qobuz does not support multiple searchtypes, falls back to all if no type given
if media_types[0] == MediaType.ARTIST:
params["type"] = "playlists"
if searchresult := await self._get_data("catalog/search", **params):
if "artists" in searchresult and MediaType.ARTIST in media_types:
- result.artists += [
+ result.artists = [
self._parse_artist(item)
for item in searchresult["artists"]["items"]
if (item and item["id"])
]
if "albums" in searchresult and MediaType.ALBUM in media_types:
- result.albums += [
+ result.albums = [
await self._parse_album(item)
for item in searchresult["albums"]["items"]
if (item and item["id"])
]
if "tracks" in searchresult and MediaType.TRACK in media_types:
- result.tracks += [
+ result.tracks = [
await self._parse_track(item)
for item in searchresult["tracks"]["items"]
if (item and item["id"])
]
if "playlists" in searchresult and MediaType.PLAYLIST in media_types:
- result.playlists += [
+ result.playlists = [
self._parse_playlist(item)
for item in searchresult["playlists"]["items"]
if (item and item["id"])
@use_cache(3600 * 24 * 30) # Cache for 30 days
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- params = {"artist_id": prov_artist_id}
- if (artist_obj := await self._get_data("artist/get", **params)) and artist_obj["id"]:
+ params: dict[str, Any] = {"artist_id": prov_artist_id}
+ artist_obj = await self._get_data("artist/get", **params)
+ if artist_obj and artist_obj.get("id"):
return self._parse_artist(artist_obj)
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
@use_cache(3600 * 24 * 30) # Cache for 30 days
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- params = {"album_id": prov_album_id}
- if (album_obj := await self._get_data("album/get", **params)) and album_obj["id"]:
+ params: dict[str, Any] = {"album_id": prov_album_id}
+ album_obj = await self._get_data("album/get", **params)
+ if album_obj and album_obj.get("id"):
return await self._parse_album(album_obj)
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
@use_cache(3600 * 24 * 30) # Cache for 30 days
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- params = {"track_id": prov_track_id}
- if (track_obj := await self._get_data("track/get", **params)) and track_obj["id"]:
+ params: dict[str, Any] = {"track_id": prov_track_id}
+ track_obj = await self._get_data("track/get", **params)
+ if track_obj and track_obj.get("id"):
return await self._parse_track(track_obj)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
@use_cache(3600 * 24 * 30) # Cache for 30 days
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- params = {"playlist_id": prov_playlist_id}
- if (playlist_obj := await self._get_data("playlist/get", **params)) and playlist_obj["id"]:
+ params: dict[str, Any] = {"playlist_id": prov_playlist_id}
+ playlist_obj = await self._get_data("playlist/get", **params)
+ if playlist_obj and playlist_obj.get("id"):
return self._parse_playlist(playlist_obj)
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
offset=offset,
limit=page_size,
)
+ if not qobuz_result:
+ return result
+
for index, track_obj in enumerate(qobuz_result["tracks"]["items"], 1):
if not (track_obj and track_obj["id"]):
continue
offset=0,
limit=100,
)
+ if not result:
+ return []
return [
await self._parse_album(item)
for item in result["albums"]["items"]
offset=0,
limit=25,
)
- if result and result["playlists"]:
+ if result and result.get("playlists"):
return [
await self._parse_track(item)
for item in result["playlists"][0]["tracks"]["items"]
searchresult = await self._get_data(
"catalog/search", query=artist.name, limit=25, type="tracks"
)
+ if not searchresult:
+ return []
+
return [
await self._parse_track(item)
for item in searchresult["tracks"]["items"]
)
]
- async def get_similar_artists(self, prov_artist_id) -> None:
+ async def get_similar_artists(self, prov_artist_id: str) -> None:
"""Get similar artists for given artist."""
# https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3
- async def library_add(self, item: MediaItemType):
+ async def library_add(self, item: MediaItemType) -> bool:
"""Add item to library."""
result = None
if item.media_type == MediaType.ARTIST:
result = await self._get_data("favorite/create", track_ids=item.item_id)
elif item.media_type == MediaType.PLAYLIST:
result = await self._get_data("playlist/subscribe", playlist_id=item.item_id)
- return result
+ return result is not None
- async def library_remove(self, prov_item_id, media_type: MediaType):
+ async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Remove item from library."""
result = None
if media_type == MediaType.ARTIST:
result = await self._get_data("playlist/delete", playlist_id=prov_item_id)
else:
result = await self._get_data("playlist/unsubscribe", playlist_id=prov_item_id)
- return result
+ return result is not None
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
- return await self._get_data(
+ await self._get_data(
"playlist/addTracks",
playlist_id=prov_playlist_id,
track_ids=",".join(prov_track_ids),
)
async def remove_playlist_tracks(
- self, prov_playlist_id: str, positions_to_remove: tuple[int]
- ) -> Any:
+ self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
+ ) -> None:
"""Remove track(s) from playlist."""
playlist_track_ids = set()
for pos in positions_to_remove:
playlist_track_id = qobuz_result["tracks"]["items"][0]["playlist_track_id"]
playlist_track_ids.add(str(playlist_track_id))
- return await self._get_data(
+ await self._get_data(
"playlist/deleteTracks",
playlist_id=prov_playlist_id,
playlist_track_ids=",".join(playlist_track_ids),
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- streamdata = None
+ streamdata: dict[str, Any] | None = None
for format_id in [27, 7, 6, 5]:
# it seems that simply requesting for highest available quality does not work
# from time to time the api response is empty for this request ?!
allow_seek=True,
)
- async def _report_playback_started(self, streamdata: dict) -> None:
+ async def _report_playback_started(self, streamdata: dict[str, Any]) -> None:
"""Report playback start to qobuz."""
# TODO: need to figure out if the streamed track is purchased by user
# https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
# {"albums":{"total":0,"items":[]},
# "tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
+ assert self._user_auth_info is not None # for type checking
device_id = self._user_auth_info["user"]["device"]["id"]
credential_id = self._user_auth_info["user"]["credential"]["id"]
user_id = self._user_auth_info["user"]["id"]
streamdetails: StreamDetails,
) -> None:
"""Handle callback when an item completed streaming."""
+ if self._user_auth_info is None:
+ msg = "User auth info not available"
+ raise LoginFailed(msg)
user_id = self._user_auth_info["user"]["id"]
async with self.throttler.bypass():
await self._get_data(
duration=try_parse_int(streamdetails.seconds_streamed),
)
- def _parse_artist(self, artist_obj: dict) -> Artist:
+ def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
"""Parse qobuz artist object to generic layout."""
artist = Artist(
item_id=str(artist_obj["id"]),
artist.mbid = VARIOUS_ARTISTS_MBID
artist.name = VARIOUS_ARTISTS_NAME
if img := self.__get_image(artist_obj):
- artist.metadata.images = [
+ artist.metadata.add_image(
MediaItemImage(
type=ImageType.THUMB,
path=img,
provider=self.lookup_key,
remotely_accessible=True,
)
- ]
+ )
if artist_obj.get("biography"):
artist.metadata.description = artist_obj["biography"].get("content")
return artist
album.metadata.explicit = True
return album
- async def _parse_track(self, track_obj: dict) -> Track:
+ async def _parse_track(self, track_obj: dict[str, Any]) -> Track:
"""Parse qobuz track object to generic layout."""
name, version = parse_title_and_version(track_obj["title"], track_obj.get("version"))
track = Track(
if track_obj.get("parental_warning"):
track.metadata.explicit = True
if img := self.__get_image(track_obj):
- track.metadata.images = [
+ track.metadata.add_image(
MediaItemImage(
type=ImageType.THUMB,
path=img,
provider=self.lookup_key,
remotely_accessible=True,
)
- ]
-
+ )
return track
- def _parse_playlist(self, playlist_obj: str) -> Playlist:
+ def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
"""Parse qobuz playlist object to generic layout."""
+ if self._user_auth_info is None:
+ msg = "User auth info not available"
+ raise LoginFailed(msg)
+
is_editable = (
playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
or playlist_obj["is_collaborative"]
is_editable=is_editable,
)
if img := self.__get_image(playlist_obj):
- playlist.metadata.images = [
+ playlist.metadata.add_image(
MediaItemImage(
type=ImageType.THUMB,
path=img,
provider=self.lookup_key,
remotely_accessible=True,
)
- ]
+ )
return playlist
@lock
- async def _auth_token(self) -> None:
+ async def _auth_token(self) -> str | None:
"""Login to qobuz and store the token."""
if self._user_auth_info:
- return self._user_auth_info["user_auth_token"]
- params = {
+ return str(self._user_auth_info["user_auth_token"])
+ params: dict[str, Any] = {
"username": self.config.get_value(CONF_USERNAME),
"password": self.config.get_value(CONF_PASSWORD),
"device_manufacturer_id": "music_assistant",
"Successfully logged in to Qobuz as %s", details["user"]["display_name"]
)
self.mass.metadata.set_default_preferred_language(details["user"]["country_code"])
- return details["user_auth_token"]
+ return str(details["user_auth_token"])
return None
- async def _get_all_items(self, endpoint, key="tracks", **kwargs) -> list[dict]:
+ async def _get_all_items(
+ self, endpoint: str, key: str = "tracks", **kwargs: Any
+ ) -> list[dict[str, Any]]:
"""Get all items from a paged list."""
limit = 50
offset = 0
- all_items = []
+ all_items: list[dict[str, Any]] = []
while True:
kwargs["limit"] = limit
kwargs["offset"] = offset
@throttle_with_retries
async def _get_data(
self, endpoint: str, sign_request: bool = False, **kwargs: Any
- ) -> dict | None:
+ ) -> dict[str, Any] | None:
"""Get data from api."""
self.logger.debug("Handling GET request to %s", endpoint)
url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
raise MediaNotFoundError(f"{endpoint} not found")
response.raise_for_status()
try:
- return await response.json(loads=json_loads)
+ return cast("dict[str, Any]", await response.json(loads=json_loads))
except client_exceptions.ContentTypeError as err:
text = err.message or await response.text() or err.status
msg = f"Error while handling {endpoint}: {text}"
self,
endpoint: str,
params: dict[str, Any] | None = None,
- data: dict[str, Any] | None = None,
+ data: dict[str, Any] | list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Post data to api."""
self.logger.debug("Handling POST request to %s", endpoint)
data = {}
url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
params["app_id"] = app_var(0)
- params["user_auth_token"] = await self._auth_token()
+ auth_token = await self._auth_token()
+ if auth_token is None:
+ msg = "Authentication token is required"
+ raise LoginFailed(msg)
+ params["user_auth_token"] = auth_token
async with self.mass.http_session.post(
url, params=params, json=data, ssl=False
) as response:
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
response.raise_for_status()
- return await response.json(loads=json_loads)
+ return cast("dict[str, Any]", await response.json(loads=json_loads))
- def __get_image(self, obj: dict) -> str | None:
+ def __get_image(self, obj: dict[str, Any]) -> str | None:
"""Try to parse image from Qobuz media object."""
if obj.get("image"):
for key in ["extralarge", "large", "medium", "small"]:
if obj["image"].get(key):
- if "2a96cbd8b46e442fc41c2b86b821562f" in obj["image"][key]:
+ img_value: str = obj["image"][key]
+ if "2a96cbd8b46e442fc41c2b86b821562f" in img_value:
continue
- return obj["image"][key]
+ return img_value
if obj.get("images300"):
# playlists seem to use this strange format
- return obj["images300"][0]
+ return str(obj["images300"][0])
if obj.get("album"):
return self.__get_image(obj["album"])
if obj.get("artist"):