--- /dev/null
+"""Soundcloud support for MusicAssistant."""
+import asyncio
+from collections.abc import AsyncGenerator, Callable
+
+from music_assistant.common.helpers.util import parse_title_and_version
+from music_assistant.common.models.enums import ProviderFeature
+from music_assistant.common.models.errors import InvalidDataError, LoginFailed
+from music_assistant.common.models.media_items import (
+ Artist,
+ ContentType,
+ ImageType,
+ MediaItemImage,
+ MediaType,
+ Playlist,
+ ProviderMapping,
+ SearchResults,
+ StreamDetails,
+ Track,
+)
+from music_assistant.server.models.music_provider import MusicProvider
+
+from .soundcloudpy.asyncsoundcloudpy import SoundcloudAsync
+
+CONF_CLIENT_ID = "client_id"
+CONF_AUTHORIZATION = "authorization"
+
+SUPPORTED_FEATURES = (
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ ProviderFeature.BROWSE,
+ ProviderFeature.SEARCH,
+ ProviderFeature.ARTIST_TOPTRACKS,
+ ProviderFeature.SIMILAR_TRACKS,
+)
+
+
+class SoundcloudMusicProvider(MusicProvider):
+ """Provider for Soundcloud."""
+
+ _headers = None
+ _context = None
+ _cookies = None
+ _signature_timestamp = 0
+ _cipher = None
+ _user_id = None
+ _soundcloud = None
+ _me = None
+
+ async def setup(self) -> None:
+ """Set up the Soundcloud provider."""
+ if not self.config.get_value(CONF_CLIENT_ID) or not self.config.get_value(
+ CONF_AUTHORIZATION
+ ):
+ raise LoginFailed("Invalid login credentials")
+
+ client_id = self.config.get_value(CONF_CLIENT_ID)
+ auth_token = self.config.get_value(CONF_AUTHORIZATION)
+
+ async with SoundcloudAsync(auth_token, client_id) as account:
+ await account.login()
+
+ self._soundcloud = account
+ self._me = await account.get_account_details()
+ self._user_id = self._me["id"]
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return SUPPORTED_FEATURES
+
+ @classmethod
+ async def _run_async(cls, call: Callable, *args, **kwargs):
+ return await asyncio.to_thread(call, *args, **kwargs)
+
+ async def search(
+ self, search_query: str, media_types=list[MediaType] | None, limit: int = 10
+ ) -> SearchResults:
+ """Perform search on musicprovider.
+
+ :param search_query: Search query.
+ :param media_types: A list of media_types to include. All types if None.
+ :param limit: Number of items to return in the search (per type).
+ """
+ result = SearchResults()
+ searchtypes = []
+ if MediaType.ARTIST in media_types:
+ searchtypes.append("artist")
+ if MediaType.TRACK in media_types:
+ searchtypes.append("track")
+ if MediaType.PLAYLIST in media_types:
+ searchtypes.append("playlist")
+
+ searchresult = await self._soundcloud.search(search_query, limit)
+
+ for item in searchresult["collection"]:
+ media_type = item["kind"]
+ if media_type == "user":
+ result.artists.append(await self._parse_artist(item))
+ elif media_type == "track":
+ result.tracks.append(await self._parse_track(item))
+ elif media_type == "playlist":
+ result.playlists.append(await self._parse_playlist(item))
+ return result
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve all library artists from Soundcloud."""
+ following = await self._soundcloud.get_following(self._user_id)
+ for artist in following["collection"]:
+ try:
+ yield await self._parse_artist(artist)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse artist failed: %s", artist, exc_info=error)
+ continue
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Retrieve all library playlists from Soundcloud."""
+ playlists = await self._soundcloud.get_account_playlists()
+ for item in playlists["collection"]:
+ playlist_id = item["playlist"]["id"]
+ playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=playlist_id)
+ try:
+ yield await self._parse_playlist(playlist_obj)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
+ continue
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """Retrieve library tracks from Soundcloud."""
+ tracks = await self._soundcloud.get_tracks_liked()
+ for item in tracks["collection"]:
+ track = await self._soundcloud.get_track_details(item)
+ try:
+ yield await self._parse_track(track[0])
+ except IndexError:
+ continue
+ except (KeyError, TypeError, InvalidDataError) as error:
+ self.logger.debug("Parse track failed: %s", track, exc_info=error)
+ continue
+
+ async def get_artist(self, prov_artist_id) -> Artist:
+ """Get full artist details by id."""
+ artist_obj = await self._soundcloud.get_user_details(user_id=prov_artist_id)
+ try:
+ artist = await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse artist failed: %s", artist_obj, exc_info=error)
+ return artist
+
+ async def get_track(self, prov_track_id) -> Track:
+ """Get full track details by id."""
+ track_obj = await self._soundcloud.get_track_details(track_id=prov_track_id)
+ try:
+ track = await self._parse_track(track_obj[0])
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse track failed: %s", track_obj, exc_info=error)
+ return track
+
+ async def get_playlist(self, prov_playlist_id) -> Playlist:
+ """Get full playlist details by id."""
+ playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
+ try:
+ playlist = await self._parse_playlist(playlist_obj)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
+ return playlist
+
+ async def get_playlist_tracks(self, prov_playlist_id) -> list[Track]:
+ """Get all playlist tracks for given playlist id."""
+ playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
+ if "tracks" not in playlist_obj:
+ return []
+ tracks = []
+ for index, item in enumerate(playlist_obj["tracks"]):
+ song = await self._soundcloud.get_track_details(item["id"])
+ try:
+ track = await self._parse_track(song[0])
+ if track:
+ track.position = index
+ tracks.append(track)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse track failed: %s", song, exc_info=error)
+ continue
+ return tracks
+
+ async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
+ """Get a list of 25 most popular tracks for the given artist."""
+ tracks_obj = await self._soundcloud.get_popular_tracks_user(
+ user_id=prov_artist_id, limit=25
+ )
+ tracks = []
+ for item in tracks_obj["collection"]:
+ song = await self._soundcloud.get_track_details(item["id"])
+ try:
+ track = await self._parse_track(song[0])
+ tracks.append(track)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse track failed: %s", song, exc_info=error)
+ continue
+ return tracks
+
+ async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
+ """Retrieve a dynamic list of tracks based on the provided item."""
+ tracks_obj = await self._soundcloud.get_recommended(track_id=prov_track_id, limit=limit)
+ tracks = []
+ for item in tracks_obj["collection"]:
+ song = await self._soundcloud.get_track_details(item["id"])
+ try:
+ track = await self._parse_track(song[0])
+ tracks.append(track)
+ except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+ self.logger.debug("Parse track failed: %s", song, exc_info=error)
+ continue
+
+ return tracks
+
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
+ """Return the content details for the given track when it will be streamed."""
+ track_details = await self._soundcloud.get_track_details(track_id=item_id)
+ stream_format = track_details[0]["media"]["transcodings"][0]["format"]["mime_type"]
+ url = await self._soundcloud.get_stream_url(track_id=item_id)
+ return StreamDetails(
+ provider=self.domain,
+ item_id=item_id,
+ content_type=ContentType.try_parse(stream_format),
+ direct=url,
+ )
+
+ async def _parse_artist(self, artist_obj: dict) -> Artist:
+ """Parse a Soundcloud user response to Artist model object."""
+ artist_id = None
+ permalink = artist_obj["permalink"]
+ if "id" in artist_obj and artist_obj["id"]:
+ artist_id = artist_obj["id"]
+ if not artist_id:
+ raise InvalidDataError("Artist does not have a valid ID")
+ artist = Artist(item_id=artist_id, name=artist_obj["username"], provider=self.domain)
+ if artist_obj.get("description"):
+ artist.metadata.description = artist_obj["description"]
+ if artist_obj.get("avatar_url"):
+ img_url = artist_obj["avatar_url"]
+ artist.metadata.images = [MediaItemImage(ImageType.THUMB, img_url)]
+ artist.add_provider_mapping(
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"https://soundcloud.com/{permalink}",
+ )
+ )
+ return artist
+
+ async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
+ """Parse a Soundcloud Playlist response to a Playlist object."""
+ playlist = Playlist(
+ item_id=playlist_obj["id"],
+ provider=self.domain,
+ name=playlist_obj["title"],
+ )
+ playlist.add_provider_mapping(
+ ProviderMapping(
+ item_id=playlist_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ )
+ playlist.is_editable = False
+ if playlist_obj.get("description"):
+ playlist.metadata.description = playlist_obj["description"]
+ if playlist_obj.get("artwork_url"):
+ playlist.metadata.images = [
+ MediaItemImage(ImageType.THUMB, playlist_obj["artwork_url"])
+ ]
+ if playlist_obj.get("genre"):
+ playlist.metadata.genres = playlist_obj["genre"]
+ if playlist_obj.get("tag_list"):
+ playlist.metadata.style = playlist_obj["tag_list"]
+ return playlist
+
+ async def _parse_track(self, track_obj: dict) -> Track:
+ """Parse a Soundcloud Track response to a Track model object."""
+ name, version = parse_title_and_version(track_obj["title"])
+ track = Track(
+ item_id=track_obj["id"],
+ provider=self.domain,
+ name=name,
+ version=version,
+ duration=track_obj["duration"] / 1000,
+ )
+ user_id = track_obj["user"]["id"]
+ user = await self._soundcloud.get_user_details(user_id)
+ artist = await self._parse_artist(user)
+ if artist and artist.item_id not in {x.item_id for x in track.artists}:
+ track.artists.append(artist)
+
+ if track_obj.get("artwork_url"):
+ track.metadata.images = [MediaItemImage(ImageType.THUMB, track_obj["artwork_url"])]
+ if track_obj.get("description"):
+ track.metadata.description = track_obj["description"]
+ if track_obj.get("genre"):
+ track.metadata.genres = track_obj["genre"]
+ if track_obj.get("tag_list"):
+ track.metadata.style = track_obj["tag_list"]
+ track.add_provider_mapping(
+ ProviderMapping(
+ item_id=track_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ content_type=ContentType.MP3,
+ url=track_obj["permalink_url"],
+ )
+ )
+ return track
--- /dev/null
+"""Async helpers for connecting to the Soundcloud API.
+
+This file is based on soundcloudpy from Naím Rodríguez https://github.com/naim-prog
+Original package https://github.com/naim-prog/soundcloud-py
+"""
+from __future__ import annotations
+
+import aiohttp
+from aiohttp.client import ClientSession
+from attr import dataclass
+
+BASE_URL = "https://api-v2.soundcloud.com"
+
+
+@dataclass
+class SoundcloudAsync:
+ """Soundcloud."""
+
+ o_auth: str
+ client_id: str
+ headers = None
+ app_version = None
+ firefox_version = None
+ request_timeout: float = 8.0
+
+ session: ClientSession | None = None
+
+ async def get(self, url, headers=None, params=None):
+ """Async get."""
+ async with aiohttp.ClientSession(headers=headers) as session:
+ async with session.get(url=url, params=params) as response:
+ return await response.json()
+
+ async def login(self):
+ """Login to soundcloud."""
+ if len(self.client_id) != 32:
+ raise ValueError("Not valid client id")
+
+ # To get the last version of Firefox to prevent some type of deprecated version
+ json_versions = await self.get(
+ # self,
+ url="https://product-details.mozilla.org/1.0/firefox_versions.json",
+ )
+
+ self.firefox_version = dict(json_versions).get("LATEST_FIREFOX_VERSION")
+
+ #: Default headers that work properly for the API
+ #: User-Agent as if it was requested through Firefox Browser
+ self.headers = {
+ "Authorization": self.o_auth,
+ "Accept": "application/json",
+ "User-Agent": (
+ f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{self.firefox_version})"
+ f" Gecko/20100101 Firefox/{self.firefox_version}"
+ ),
+ }
+
+ # Version of soundcloud app
+ app_json = await self.get(url="https://soundcloud.com/versions.json")
+ self.app_version = dict(app_json).get("app")
+
+ # ---------------- USER ----------------
+
+ async def get_account_details(self):
+ """Get account details."""
+ return await self.get(url=f"{BASE_URL}/me", headers=self.headers)
+
+ async def get_user_details(self, user_id):
+ """:param user_id: id of the user requested"""
+ return await self.get(
+ url=f"{BASE_URL}/users/{user_id}?client_id={self.client_id}",
+ headers=self.headers,
+ )
+
+ async def get_following(self, own_user_id, limit=120):
+ """:param limit: max numbers of following accounts"""
+ return await self.get(
+ f"{BASE_URL}/users/{own_user_id}/followings?client_id={self.client_id}"
+ f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_recommended_users(self, limit=5):
+ """:param limit: max numbers of follower accounts to get"""
+ return await self.get(
+ f"{BASE_URL}/me/suggested/users/who_to_follow?view=recommended-first&client_id="
+ f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&app_version="
+ f"{self.app_version}",
+ headers=self.headers,
+ )
+
+ # ---------------- TRACKS ----------------
+
+ async def get_last_track_info(self, limit=1):
+ """:param limit: number of last tracks reproduced"""
+ return await self.get(
+ f"{BASE_URL}/me/play-history/tracks?client_id={self.client_id}&limit={limit}",
+ headers=self.headers,
+ )
+
+ async def get_track_likes_users(self, track_id):
+ """:param track_id: track id"""
+ return await self.get(
+ f"{BASE_URL}/tracks/{track_id}/likers?client_id={self.client_id}",
+ headers=self.headers,
+ )
+
+ async def get_track_details(self, track_id):
+ """:param track_id: track id"""
+ return await self.get(
+ f"{BASE_URL}/tracks?ids={track_id}&client_id={self.client_id}",
+ headers=self.headers,
+ )
+
+ async def get_tracks_liked(self, limit=50):
+ """:param limit: number of tracks to get"""
+ return await self.get(
+ f"{BASE_URL}/me/track_likes/ids?client_id={self.client_id}&"
+ f"limit={limit}&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_track_by_genre_recent(self, genre, limit=10):
+ """Get track by genre recent.
+
+ :param genre: string of the genre to get tracks
+ :param limit: limit of playlists to get
+ """
+ return await self.get(
+ f"{BASE_URL}/recent-tracks/{genre}?client_id={self.client_id}"
+ f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_track_by_genre_popular(self, genre, limit=10):
+ """Get track by genre popular.
+
+ :param genre: string of the genre to get tracks
+ :param limit: limit of playlists to get
+ """
+ return await self.get(
+ f"{BASE_URL}/search/tracks?q=&filter.genre_or_tag={genre}&sort=popular&client_id="
+ f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&app_version="
+ f"{self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_tracks_from_user(self, user_id, limit=10):
+ """Get tracks from user.
+
+ :param user_id: id of the user to get his tracks
+ :param limit: number of tracks to get from user
+ """
+ return await self.get(
+ f"{BASE_URL}/users/{user_id}/tracks?representation=&client_id="
+ f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&app_version="
+ f"{self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_popular_tracks_user(self, user_id, limit=12):
+ """Get popular track from user.
+
+ :param user_id: id of the user to get his tracks
+ :param limit: number of tracks to get from user
+ """
+ return await self.get(
+ f"{BASE_URL}/users/{user_id}/toptracks?client_id={self.client_id}"
+ f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ # ---------------- PLAYLISTS ----------------
+
+ async def get_account_playlists(self):
+ """Get account playlists."""
+ return await self.get(
+ f"{BASE_URL}/me/library/all?client_id{self.client_id}",
+ headers=self.headers,
+ )
+
+ async def get_playlist_details(self, playlist_id):
+ """:param playlist_id: playlist id"""
+ return await self.get(
+ f"{BASE_URL}/playlists/{playlist_id}?representation=full&client_id={self.client_id}",
+ headers=self.headers,
+ )
+
+ async def get_playlists_by_genre(self, genre, limit=10):
+ """Get playlists by genre.
+
+ :param genre: string of the genre to get tracks
+ :param limit: limit of playlists to get
+ """
+ return await self.get(
+ f"{BASE_URL}/playlists/discovery?tag={genre}&client_id="
+ f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&"
+ f"app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_playlists_from_user(self, user_id, limit=10):
+ """Get playlists from user.
+
+ :param user_id: user id to get his playlists
+ :param limit: limit of playlists to get
+ """
+ return await self.get(
+ f"{BASE_URL}/users/{user_id}/playlists_without_albums?client_id="
+ f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&"
+ f"app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ # ---------------- MISCELLANEOUS ----------------
+
+ async def get_recommended(self, track_id):
+ """:param track_id: track id to get recommended tracks from this"""
+ return await self.get(
+ f"{BASE_URL}/tracks/{track_id}/related?client_id={self.client_id}",
+ headers=self.headers,
+ )
+
+ async def get_stream_url(self, track_id):
+ """:param track_id: track id"""
+ full_json = await self.get_track_details(track_id)
+ media_url = full_json[0]["media"]["transcodings"][0]["url"]
+ track_auth = full_json[0]["track_authorization"]
+ stream_url = f"{media_url}?client_id={self.client_id}&track_authorization={track_auth}"
+ req = await self.get(
+ stream_url,
+ headers=self.headers,
+ )
+
+ return dict(req).get("url")
+
+ async def get_comments_track(self, track_id, limit=100):
+ """Get track comments.
+
+ :param track_id: track id for get the comments from
+ :param limit: limit of tracks to get
+ :add: with "next_href" in the return json you can keep getting more comments than limit
+ """
+ return await self.get(
+ f"{BASE_URL}/tracks/{track_id}/comments?threaded=0&filter_replies="
+ f"1&client_id={self.client_id}&limit={limit}&offset=0&linked_partitioning="
+ f"1&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_mixed_selection(self, limit=5):
+ """:param limit: limit of recommended playlists make for you"""
+ return await self.get(
+ f"{BASE_URL}/mixed-selections?variant_ids=&client_id="
+ f"{self.client_id}&limit={limit}&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def search(self, query_string, limit=25):
+ """Search.
+
+ :param query_string: string to search on soundcloud for tracks
+ :param limit: limit of recommended playlists make for you
+ """
+ return await self.get(
+ f"{BASE_URL}/search?q={query_string}&variant_ids=&facet=model&client_id="
+ f"{self.client_id}&limit={limit}&offset=0&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_subscribe_feed(self, limit=10):
+ """Get subscribe feed."""
+ return await self.get(
+ f"{BASE_URL}/stream?offset=10&limit={limit}&promoted_playlist=true&client_id"
+ f"={self.client_id}&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_album_from_user(self, user_id, limit=5):
+ """Get album from user.
+
+ :param user_id: user to get the albums from
+ :param limit: numbers of albums to get from the user
+ """
+ return await self.get(
+ f"{BASE_URL}/users/{user_id}/albums?client_id={self.client_id}"
+ f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def get_all_feed_user(self, user_id, limit=20):
+ """Get all feed from user.
+
+ :param user_id: user to get the albums from
+ :param limit: numbers of items to get from the user's feed
+ """
+ return await self.get(
+ f"{BASE_URL}/stream/users/{user_id}?client_id={self.client_id}"
+ f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+ headers=self.headers,
+ )
+
+ async def close(self) -> None:
+ """Close open client session."""
+ if self.session:
+ await self.session.close()
+
+ async def __aenter__(self) -> SoundcloudAsync:
+ """Async enter.
+
+ Returns:
+ The SoundcloudAsync object.
+ """
+ return self
+
+ async def __aexit__(self, *_exc_info) -> None:
+ """Async exit.
+
+ Args:
+ _exc_info: Exec type.
+ """
+ await self.close()