ContentType,
ExternalID,
ImageType,
+ MediaType,
ProviderFeature,
StreamType,
)
AudioFormat,
MediaItemImage,
MediaItemType,
- MediaType,
Playlist,
ProviderMapping,
SearchResults,
VARIOUS_ARTISTS_NAME,
)
from music_assistant.controllers.cache import use_cache
-from music_assistant.helpers.app_vars import app_var
+from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
from music_assistant.helpers.json import json_loads
from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.helpers.util import (
ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
ProviderFeature.LIBRARY_TRACKS_EDIT,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
+ ProviderFeature.PLAYLIST_CREATE,
ProviderFeature.BROWSE,
ProviderFeature.SEARCH,
ProviderFeature.ARTIST_ALBUMS,
yield self._parse_playlist(item)
@use_cache(3600 * 24 * 30) # Cache for 30 days
- async def get_artist(self, prov_artist_id) -> Artist:
+ async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
params = {"artist_id": prov_artist_id}
if (artist_obj := await self._get_data("artist/get", **params)) and artist_obj["id"]:
raise MediaNotFoundError(msg)
@use_cache(3600 * 24 * 30) # Cache for 30 days
- async def get_album(self, prov_album_id) -> Album:
+ async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
params = {"album_id": prov_album_id}
if (album_obj := await self._get_data("album/get", **params)) and album_obj["id"]:
raise MediaNotFoundError(msg)
@use_cache(3600 * 24 * 30) # Cache for 30 days
- async def get_track(self, prov_track_id) -> Track:
+ async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
params = {"track_id": prov_track_id}
if (track_obj := await self._get_data("track/get", **params)) and track_obj["id"]:
raise MediaNotFoundError(msg)
@use_cache(3600 * 24 * 30) # Cache for 30 days
- async def get_playlist(self, prov_playlist_id) -> Playlist:
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
params = {"playlist_id": prov_playlist_id}
if (playlist_obj := await self._get_data("playlist/get", **params)) and playlist_obj["id"]:
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
+ async def create_playlist(self, name: str) -> Playlist:
+ """Create a new playlist on Qobuz with the given name."""
+ playlist_obj = await self._get_data(
+ "playlist/create",
+ name=name,
+ description="",
+ is_public=0,
+ is_collaborative=0,
+ )
+ if not playlist_obj or not playlist_obj.get("id"):
+ msg = f"Failed to create playlist: {name}"
+ raise InvalidDataError(msg)
+ return self._parse_playlist(playlist_obj)
+
@use_cache(3600 * 24 * 30) # Cache for 30 days
- async def get_album_tracks(self, prov_album_id) -> list[Track]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get all album tracks for given album id."""
params = {"album_id": prov_album_id}
return [
return result
@use_cache(3600 * 24 * 14) # Cache for 14 days
- async def get_artist_albums(self, prov_artist_id) -> list[Album]:
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of albums for the given artist."""
result = await self._get_data(
"artist/get",
]
@use_cache(3600 * 24 * 14) # Cache for 14 days
- async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
+ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of most popular tracks for the given artist."""
result = await self._get_data(
"artist/get",
async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int]
- ) -> None:
+ ) -> Any:
"""Remove track(s) from playlist."""
playlist_track_ids = set()
for pos in positions_to_remove:
duration=try_parse_int(streamdetails.seconds_streamed),
)
- def _parse_artist(self, artist_obj: dict):
+ def _parse_artist(self, artist_obj: dict) -> Artist:
"""Parse qobuz artist object to generic layout."""
artist = Artist(
item_id=str(artist_obj["id"]),
return track
- def _parse_playlist(self, playlist_obj):
+ def _parse_playlist(self, playlist_obj: str) -> Playlist:
"""Parse qobuz playlist object to generic layout."""
is_editable = (
playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
return playlist
@lock
- async def _auth_token(self):
+ async def _auth_token(self) -> None:
"""Login to qobuz and store the token."""
if self._user_auth_info:
return self._user_auth_info["user_auth_token"]
return details["user_auth_token"]
return None
- async def _get_all_items(self, endpoint, key="tracks", **kwargs):
+ async def _get_all_items(self, endpoint, key="tracks", **kwargs) -> list[dict]:
"""Get all items from a paged list."""
limit = 50
offset = 0
return all_items
@throttle_with_retries
- async def _get_data(self, endpoint, sign_request=False, **kwargs):
+ async def _get_data(
+ self, endpoint: str, sign_request: bool = False, **kwargs: Any
+ ) -> dict | None:
"""Get data from api."""
self.logger.debug("Handling GET request to %s", endpoint)
url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
raise InvalidDataError(msg)
@throttle_with_retries
- async def _post_data(self, endpoint, params=None, data=None):
+ async def _post_data(
+ self,
+ endpoint: str,
+ params: dict[str, Any] | None = None,
+ data: dict[str, Any] | None = None,
+ ) -> dict[str, Any]:
"""Post data to api."""
self.logger.debug("Handling POST request to %s", endpoint)
if not params: