from __future__ import annotations
-import asyncio
import time
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
ImageType,
+ MediaType,
ProviderFeature,
StreamType,
)
Artist,
AudioFormat,
MediaItemImage,
- MediaType,
Playlist,
ProviderMapping,
SearchResults,
Track,
+ UniqueList,
)
from music_assistant_models.streamdetails import StreamDetails
from soundcloudpy import SoundcloudAsyncAPI
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator, Callable
+ from collections.abc import AsyncGenerator
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest
- from music_assistant import MusicAssistant
+ from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
class SoundcloudMusicProvider(MusicProvider):
"""Provider for Soundcloud."""
- _headers = None
- _context = None
- _cookies = None
- _signature_timestamp = 0
- _cipher = None
- _user_id = None
- _soundcloud = None
- _me = None
+ _user_id: str = ""
+ _soundcloud: SoundcloudAsyncAPI = None
+ _me: dict[str, Any] = {}
async def handle_async_init(self) -> None:
"""Set up the Soundcloud provider."""
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
- @classmethod
- async def _run_async(cls, call: Callable, *args, **kwargs): # noqa: ANN206
- return await asyncio.to_thread(call, *args, **kwargs)
-
async def search(
- self, search_query: str, media_types=list[MediaType], limit: int = 10
+ self, search_query: str, media_types: list[MediaType], limit: int = 10
) -> SearchResults:
"""Perform search on musicprovider.
for item in searchresult["collection"]:
media_type = item["kind"]
if media_type == "user" and MediaType.ARTIST in media_types:
- result.artists.append(await self._parse_artist(item))
+ result.artists = [*result.artists, await self._parse_artist(item)]
elif media_type == "track" and MediaType.TRACK in media_types:
if item.get("duration") == item.get("full_duration"):
# skip if it's a preview track (e.g. in case of free accounts)
- result.tracks.append(await self._parse_track(item))
+ result.tracks = [*result.tracks, await self._parse_track(item)]
elif media_type == "playlist" and MediaType.PLAYLIST in media_types:
- result.playlists.append(await self._parse_playlist(item))
+ result.playlists = [*result.playlists, await self._parse_playlist(item)]
return result
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Soundcloud."""
time_start = time.time()
- async for item in self._soundcloud.get_tracks_liked():
- track = await self._soundcloud.get_track_details(item)
+ async for track in self._soundcloud.get_track_details_liked(self._user_id):
try:
- yield await self._parse_track(track[0])
- except IndexError:
- continue
- except (KeyError, TypeError, InvalidDataError) as error:
- self.logger.debug("Parse track failed: %s", track, exc_info=error)
+ yield await self._parse_track(track)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ # somehow certain track id's don't exist (anymore)
+ self.logger.debug(
+ "%s: Parse track with id %s failed: %s",
+ type(error).__name__,
+ track["id"],
+ track,
+ )
continue
self.logger.debug(
round(time.time() - time_start, 2),
)
- async def get_artist(self, prov_artist_id) -> Artist:
+ async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- artist_obj = await self._soundcloud.get_user_details(user_id=prov_artist_id)
+ artist_obj = await self._soundcloud.get_user_details(prov_artist_id)
try:
- artist = await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
+ if artist_obj:
+ artist = await self._parse_artist(artist_obj)
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
self.logger.debug("Parse artist failed: %s", artist_obj, exc_info=error)
return artist
- async def get_track(self, prov_track_id) -> Track:
+ async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- track_obj = await self._soundcloud.get_track_details(track_id=prov_track_id)
+ track_obj = await self._soundcloud.get_track_details(prov_track_id)
try:
track = await self._parse_track(track_obj[0])
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
self.logger.debug("Parse track failed: %s", track_obj, exc_info=error)
return track
- async def get_playlist(self, prov_playlist_id) -> Playlist:
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
+ playlist_obj = await self._soundcloud.get_playlist_details(prov_playlist_id)
try:
playlist = await self._parse_playlist(playlist_obj)
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
if page > 0:
# TODO: soundcloud doesn't seem to support paging for playlist tracks ?!
return result
- playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
+ playlist_obj = await self._soundcloud.get_playlist_details(prov_playlist_id)
if "tracks" not in playlist_obj:
return result
for index, item in enumerate(playlist_obj["tracks"], 1):
- # TODO: is it really needed to grab the entire track with an api call ?
- song = await self._soundcloud.get_track_details(item["id"])
try:
- if track := await self._parse_track(song[0], index):
- result.append(track)
+ # Skip some ugly "tracks" entries, example:
+ # {'id': 123, 'kind': 'track', 'monetization_model': 'NOT_APPLICABLE',
+ # 'policy': 'ALLOW'}
+ if "title" in item:
+ if track := await self._parse_track(item, index):
+ result.append(track)
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
- self.logger.debug("Parse track failed: %s", song, exc_info=error)
+ self.logger.debug("Parse track failed: %s", item, exc_info=error)
continue
return result
- async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
+ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of 25 most popular tracks for the given artist."""
- tracks_obj = await self._soundcloud.get_popular_tracks_user(
- user_id=prov_artist_id, limit=25
- )
+ tracks_obj = await self._soundcloud.get_popular_tracks_user(prov_artist_id, 25)
tracks = []
for item in tracks_obj["collection"]:
song = await self._soundcloud.get_track_details(item["id"])
continue
return tracks
- async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
+ async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Retrieve a dynamic list of tracks based on the provided item."""
- tracks_obj = await self._soundcloud.get_recommended(track_id=prov_track_id, limit=limit)
+ tracks_obj = await self._soundcloud.get_recommended(prov_track_id, limit)
tracks = []
for item in tracks_obj["collection"]:
song = await self._soundcloud.get_track_details(item["id"])
path=url,
)
- async def _parse_artist(self, artist_obj: dict) -> Artist:
+ async def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
"""Parse a Soundcloud user response to Artist model object."""
artist_id = None
permalink = artist_obj["permalink"]
if artist_obj.get("description"):
artist.metadata.description = artist_obj["description"]
if artist_obj.get("avatar_url"):
- img_url = artist_obj["avatar_url"]
- artist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=img_url,
- provider=self.lookup_key,
- remotely_accessible=True,
- )
- ]
+ img_url = self._transform_artwork_url(artist_obj["avatar_url"])
+ artist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img_url,
+ provider=self.lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
return artist
- async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
+ async def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
"""Parse a Soundcloud Playlist response to a Playlist object."""
playlist_id = str(playlist_obj["id"])
playlist = Playlist(
if playlist_obj.get("description"):
playlist.metadata.description = playlist_obj["description"]
if playlist_obj.get("artwork_url"):
- playlist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=self._transform_artwork_url(playlist_obj["artwork_url"]),
- provider=self.lookup_key,
- remotely_accessible=True,
- )
- ]
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=self._transform_artwork_url(playlist_obj["artwork_url"]),
+ provider=self.lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
if playlist_obj.get("genre"):
playlist.metadata.genres = playlist_obj["genre"]
if playlist_obj.get("tag_list"):
playlist.metadata.style = playlist_obj["tag_list"]
return playlist
- async def _parse_track(self, track_obj: dict, playlist_position: int = 0) -> Track:
+ async def _parse_track(self, track_obj: dict[str, Any], playlist_position: int = 0) -> Track:
"""Parse a Soundcloud Track response to a Track model object."""
name, version = parse_title_and_version(track_obj["title"])
track_id = str(track_obj["id"])
track.artists.append(artist)
if track_obj.get("artwork_url"):
- track.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=self._transform_artwork_url(track_obj["artwork_url"]),
- provider=self.lookup_key,
- remotely_accessible=True,
- )
- ]
+ track.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=self._transform_artwork_url(track_obj["artwork_url"]),
+ provider=self.lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
if track_obj.get("description"):
track.metadata.description = track_obj["description"]
if track_obj.get("genre"):
- track.metadata.genres = [track_obj["genre"]]
+ track.metadata.genres = {track_obj["genre"]}
if track_obj.get("tag_list"):
track.metadata.style = track_obj["tag_list"]
return track