From 1af5293e318108f7d47930ed6b69830d531db25e Mon Sep 17 00:00:00 2001 From: OzGav Date: Mon, 10 Nov 2025 19:43:22 +1000 Subject: [PATCH] Typing fixes for the Qobuz provider (#2610) --- music_assistant/providers/qobuz/__init__.py | 130 ++++++++++++-------- pyproject.toml | 1 - 2 files changed, 78 insertions(+), 53 deletions(-) diff --git a/music_assistant/providers/qobuz/__init__.py b/music_assistant/providers/qobuz/__init__.py index a3e96ff9..80a97d5e 100644 --- a/music_assistant/providers/qobuz/__init__.py +++ b/music_assistant/providers/qobuz/__init__.py @@ -6,7 +6,7 @@ import datetime import hashlib import time from contextlib import suppress -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from aiohttp import client_exceptions from music_assistant_models.config_entries import ConfigEntry, ConfigValueType @@ -127,7 +127,7 @@ async def get_config_entries( class QobuzProvider(MusicProvider): """Provider for the Qobux music service.""" - _user_auth_info: str | None = None + _user_auth_info: dict[str, Any] | None = None # rate limiter needs to be specified on provider-level, # so make it an instance attribute throttler = ThrottlerManager(rate_limit=1, period=2) @@ -161,7 +161,7 @@ class QobuzProvider(MusicProvider): ] if not media_types: return result - params = {"query": search_query, "limit": limit} + params: dict[str, Any] = {"query": search_query, "limit": limit} if len(media_types) == 1: # qobuz does not support multiple searchtypes, falls back to all if no type given if media_types[0] == MediaType.ARTIST: @@ -174,25 +174,25 @@ class QobuzProvider(MusicProvider): params["type"] = "playlists" if searchresult := await self._get_data("catalog/search", **params): if "artists" in searchresult and MediaType.ARTIST in media_types: - result.artists += [ + result.artists = [ self._parse_artist(item) for item in searchresult["artists"]["items"] if (item and item["id"]) ] if "albums" in searchresult and MediaType.ALBUM in media_types: - result.albums += [ + result.albums = [ await self._parse_album(item) for item in searchresult["albums"]["items"] if (item and item["id"]) ] if "tracks" in searchresult and MediaType.TRACK in media_types: - result.tracks += [ + result.tracks = [ await self._parse_track(item) for item in searchresult["tracks"]["items"] if (item and item["id"]) ] if "playlists" in searchresult and MediaType.PLAYLIST in media_types: - result.playlists += [ + result.playlists = [ self._parse_playlist(item) for item in searchresult["playlists"]["items"] if (item and item["id"]) @@ -230,8 +230,9 @@ class QobuzProvider(MusicProvider): @use_cache(3600 * 24 * 30) # Cache for 30 days async def get_artist(self, prov_artist_id: str) -> Artist: """Get full artist details by id.""" - params = {"artist_id": prov_artist_id} - if (artist_obj := await self._get_data("artist/get", **params)) and artist_obj["id"]: + params: dict[str, Any] = {"artist_id": prov_artist_id} + artist_obj = await self._get_data("artist/get", **params) + if artist_obj and artist_obj.get("id"): return self._parse_artist(artist_obj) msg = f"Item {prov_artist_id} not found" raise MediaNotFoundError(msg) @@ -239,8 +240,9 @@ class QobuzProvider(MusicProvider): @use_cache(3600 * 24 * 30) # Cache for 30 days async def get_album(self, prov_album_id: str) -> Album: """Get full album details by id.""" - params = {"album_id": prov_album_id} - if (album_obj := await self._get_data("album/get", **params)) and album_obj["id"]: + params: dict[str, Any] = {"album_id": prov_album_id} + album_obj = await self._get_data("album/get", **params) + if album_obj and album_obj.get("id"): return await self._parse_album(album_obj) msg = f"Item {prov_album_id} not found" raise MediaNotFoundError(msg) @@ -248,8 +250,9 @@ class QobuzProvider(MusicProvider): @use_cache(3600 * 24 * 30) # Cache for 30 days async def get_track(self, prov_track_id: str) -> Track: """Get full track details by id.""" - params = {"track_id": prov_track_id} - if (track_obj := await self._get_data("track/get", **params)) and track_obj["id"]: + params: dict[str, Any] = {"track_id": prov_track_id} + track_obj = await self._get_data("track/get", **params) + if track_obj and track_obj.get("id"): return await self._parse_track(track_obj) msg = f"Item {prov_track_id} not found" raise MediaNotFoundError(msg) @@ -257,8 +260,9 @@ class QobuzProvider(MusicProvider): @use_cache(3600 * 24 * 30) # Cache for 30 days async def get_playlist(self, prov_playlist_id: str) -> Playlist: """Get full playlist details by id.""" - params = {"playlist_id": prov_playlist_id} - if (playlist_obj := await self._get_data("playlist/get", **params)) and playlist_obj["id"]: + params: dict[str, Any] = {"playlist_id": prov_playlist_id} + playlist_obj = await self._get_data("playlist/get", **params) + if playlist_obj and playlist_obj.get("id"): return self._parse_playlist(playlist_obj) msg = f"Item {prov_playlist_id} not found" raise MediaNotFoundError(msg) @@ -301,6 +305,9 @@ class QobuzProvider(MusicProvider): offset=offset, limit=page_size, ) + if not qobuz_result: + return result + for index, track_obj in enumerate(qobuz_result["tracks"]["items"], 1): if not (track_obj and track_obj["id"]): continue @@ -319,6 +326,8 @@ class QobuzProvider(MusicProvider): offset=0, limit=100, ) + if not result: + return [] return [ await self._parse_album(item) for item in result["albums"]["items"] @@ -335,7 +344,7 @@ class QobuzProvider(MusicProvider): offset=0, limit=25, ) - if result and result["playlists"]: + if result and result.get("playlists"): return [ await self._parse_track(item) for item in result["playlists"][0]["tracks"]["items"] @@ -346,6 +355,9 @@ class QobuzProvider(MusicProvider): searchresult = await self._get_data( "catalog/search", query=artist.name, limit=25, type="tracks" ) + if not searchresult: + return [] + return [ await self._parse_track(item) for item in searchresult["tracks"]["items"] @@ -357,11 +369,11 @@ class QobuzProvider(MusicProvider): ) ] - async def get_similar_artists(self, prov_artist_id) -> None: + async def get_similar_artists(self, prov_artist_id: str) -> None: """Get similar artists for given artist.""" # https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3 - async def library_add(self, item: MediaItemType): + async def library_add(self, item: MediaItemType) -> bool: """Add item to library.""" result = None if item.media_type == MediaType.ARTIST: @@ -372,9 +384,9 @@ class QobuzProvider(MusicProvider): result = await self._get_data("favorite/create", track_ids=item.item_id) elif item.media_type == MediaType.PLAYLIST: result = await self._get_data("playlist/subscribe", playlist_id=item.item_id) - return result + return result is not None - async def library_remove(self, prov_item_id, media_type: MediaType): + async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool: """Remove item from library.""" result = None if media_type == MediaType.ARTIST: @@ -389,11 +401,11 @@ class QobuzProvider(MusicProvider): result = await self._get_data("playlist/delete", playlist_id=prov_item_id) else: result = await self._get_data("playlist/unsubscribe", playlist_id=prov_item_id) - return result + return result is not None async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: """Add track(s) to playlist.""" - return await self._get_data( + await self._get_data( "playlist/addTracks", playlist_id=prov_playlist_id, track_ids=",".join(prov_track_ids), @@ -401,8 +413,8 @@ class QobuzProvider(MusicProvider): ) async def remove_playlist_tracks( - self, prov_playlist_id: str, positions_to_remove: tuple[int] - ) -> Any: + self, prov_playlist_id: str, positions_to_remove: tuple[int, ...] + ) -> None: """Remove track(s) from playlist.""" playlist_track_ids = set() for pos in positions_to_remove: @@ -420,7 +432,7 @@ class QobuzProvider(MusicProvider): playlist_track_id = qobuz_result["tracks"]["items"][0]["playlist_track_id"] playlist_track_ids.add(str(playlist_track_id)) - return await self._get_data( + await self._get_data( "playlist/deleteTracks", playlist_id=prov_playlist_id, playlist_track_ids=",".join(playlist_track_ids), @@ -428,7 +440,7 @@ class QobuzProvider(MusicProvider): async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: """Return the content details for the given track when it will be streamed.""" - streamdata = None + streamdata: dict[str, Any] | None = None for format_id in [27, 7, 6, 5]: # it seems that simply requesting for highest available quality does not work # from time to time the api response is empty for this request ?! @@ -469,12 +481,13 @@ class QobuzProvider(MusicProvider): allow_seek=True, ) - async def _report_playback_started(self, streamdata: dict) -> None: + async def _report_playback_started(self, streamdata: dict[str, Any]) -> None: """Report playback start to qobuz.""" # TODO: need to figure out if the streamed track is purchased by user # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx # {"albums":{"total":0,"items":[]}, # "tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}} + assert self._user_auth_info is not None # for type checking device_id = self._user_auth_info["user"]["device"]["id"] credential_id = self._user_auth_info["user"]["credential"]["id"] user_id = self._user_auth_info["user"]["id"] @@ -503,6 +516,9 @@ class QobuzProvider(MusicProvider): streamdetails: StreamDetails, ) -> None: """Handle callback when an item completed streaming.""" + if self._user_auth_info is None: + msg = "User auth info not available" + raise LoginFailed(msg) user_id = self._user_auth_info["user"]["id"] async with self.throttler.bypass(): await self._get_data( @@ -512,7 +528,7 @@ class QobuzProvider(MusicProvider): duration=try_parse_int(streamdetails.seconds_streamed), ) - def _parse_artist(self, artist_obj: dict) -> Artist: + def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist: """Parse qobuz artist object to generic layout.""" artist = Artist( item_id=str(artist_obj["id"]), @@ -531,14 +547,14 @@ class QobuzProvider(MusicProvider): artist.mbid = VARIOUS_ARTISTS_MBID artist.name = VARIOUS_ARTISTS_NAME if img := self.__get_image(artist_obj): - artist.metadata.images = [ + artist.metadata.add_image( MediaItemImage( type=ImageType.THUMB, path=img, provider=self.lookup_key, remotely_accessible=True, ) - ] + ) if artist_obj.get("biography"): artist.metadata.description = artist_obj["biography"].get("content") return artist @@ -617,7 +633,7 @@ class QobuzProvider(MusicProvider): album.metadata.explicit = True return album - async def _parse_track(self, track_obj: dict) -> Track: + async def _parse_track(self, track_obj: dict[str, Any]) -> Track: """Parse qobuz track object to generic layout.""" name, version = parse_title_and_version(track_obj["title"], track_obj.get("version")) track = Track( @@ -690,19 +706,22 @@ class QobuzProvider(MusicProvider): if track_obj.get("parental_warning"): track.metadata.explicit = True if img := self.__get_image(track_obj): - track.metadata.images = [ + track.metadata.add_image( MediaItemImage( type=ImageType.THUMB, path=img, provider=self.lookup_key, remotely_accessible=True, ) - ] - + ) return track - def _parse_playlist(self, playlist_obj: str) -> Playlist: + def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist: """Parse qobuz playlist object to generic layout.""" + if self._user_auth_info is None: + msg = "User auth info not available" + raise LoginFailed(msg) + is_editable = ( playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"] or playlist_obj["is_collaborative"] @@ -723,22 +742,22 @@ class QobuzProvider(MusicProvider): is_editable=is_editable, ) if img := self.__get_image(playlist_obj): - playlist.metadata.images = [ + playlist.metadata.add_image( MediaItemImage( type=ImageType.THUMB, path=img, provider=self.lookup_key, remotely_accessible=True, ) - ] + ) return playlist @lock - async def _auth_token(self) -> None: + async def _auth_token(self) -> str | None: """Login to qobuz and store the token.""" if self._user_auth_info: - return self._user_auth_info["user_auth_token"] - params = { + return str(self._user_auth_info["user_auth_token"]) + params: dict[str, Any] = { "username": self.config.get_value(CONF_USERNAME), "password": self.config.get_value(CONF_PASSWORD), "device_manufacturer_id": "music_assistant", @@ -750,14 +769,16 @@ class QobuzProvider(MusicProvider): "Successfully logged in to Qobuz as %s", details["user"]["display_name"] ) self.mass.metadata.set_default_preferred_language(details["user"]["country_code"]) - return details["user_auth_token"] + return str(details["user_auth_token"]) return None - async def _get_all_items(self, endpoint, key="tracks", **kwargs) -> list[dict]: + async def _get_all_items( + self, endpoint: str, key: str = "tracks", **kwargs: Any + ) -> list[dict[str, Any]]: """Get all items from a paged list.""" limit = 50 offset = 0 - all_items = [] + all_items: list[dict[str, Any]] = [] while True: kwargs["limit"] = limit kwargs["offset"] = offset @@ -776,7 +797,7 @@ class QobuzProvider(MusicProvider): @throttle_with_retries async def _get_data( self, endpoint: str, sign_request: bool = False, **kwargs: Any - ) -> dict | None: + ) -> dict[str, Any] | None: """Get data from api.""" self.logger.debug("Handling GET request to %s", endpoint) url = f"http://www.qobuz.com/api.json/0.2/{endpoint}" @@ -818,7 +839,7 @@ class QobuzProvider(MusicProvider): raise MediaNotFoundError(f"{endpoint} not found") response.raise_for_status() try: - return await response.json(loads=json_loads) + return cast("dict[str, Any]", await response.json(loads=json_loads)) except client_exceptions.ContentTypeError as err: text = err.message or await response.text() or err.status msg = f"Error while handling {endpoint}: {text}" @@ -829,7 +850,7 @@ class QobuzProvider(MusicProvider): self, endpoint: str, params: dict[str, Any] | None = None, - data: dict[str, Any] | None = None, + data: dict[str, Any] | list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Post data to api.""" self.logger.debug("Handling POST request to %s", endpoint) @@ -839,7 +860,11 @@ class QobuzProvider(MusicProvider): data = {} url = f"http://www.qobuz.com/api.json/0.2/{endpoint}" params["app_id"] = app_var(0) - params["user_auth_token"] = await self._auth_token() + auth_token = await self._auth_token() + if auth_token is None: + msg = "Authentication token is required" + raise LoginFailed(msg) + params["user_auth_token"] = auth_token async with self.mass.http_session.post( url, params=params, json=data, ssl=False ) as response: @@ -854,19 +879,20 @@ class QobuzProvider(MusicProvider): if response.status == 404: raise MediaNotFoundError(f"{endpoint} not found") response.raise_for_status() - return await response.json(loads=json_loads) + return cast("dict[str, Any]", await response.json(loads=json_loads)) - def __get_image(self, obj: dict) -> str | None: + def __get_image(self, obj: dict[str, Any]) -> str | None: """Try to parse image from Qobuz media object.""" if obj.get("image"): for key in ["extralarge", "large", "medium", "small"]: if obj["image"].get(key): - if "2a96cbd8b46e442fc41c2b86b821562f" in obj["image"][key]: + img_value: str = obj["image"][key] + if "2a96cbd8b46e442fc41c2b86b821562f" in img_value: continue - return obj["image"][key] + return img_value if obj.get("images300"): # playlists seem to use this strange format - return obj["images300"][0] + return str(obj["images300"][0]) if obj.get("album"): return self.__get_image(obj["album"]) if obj.get("artist"): diff --git a/pyproject.toml b/pyproject.toml index 21e9e907..5aeefab1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,7 +144,6 @@ exclude = [ '^music_assistant/providers/apple_music/.*$', '^music_assistant/providers/bluesound/.*$', '^music_assistant/providers/chromecast/.*$', - '^music_assistant/providers/qobuz/.*$', '^music_assistant/providers/siriusxm/.*$', '^music_assistant/providers/sonos/.*$', '^music_assistant/providers/snapcast/.*$', -- 2.34.1