Add Initial support for Soundcloud (#566)
authorGiel Janssens <gieljnssns@me.com>
Thu, 23 Mar 2023 19:47:38 +0000 (20:47 +0100)
committerGitHub <noreply@github.com>
Thu, 23 Mar 2023 19:47:38 +0000 (20:47 +0100)
Initial support for Soundcloud as Musicprovider

.vscode/settings.json
music_assistant/server/providers/soundcloud/__init__.py [new file with mode: 0644]
music_assistant/server/providers/soundcloud/manifest.json [new file with mode: 0644]
music_assistant/server/providers/soundcloud/soundcloudpy/asyncsoundcloudpy.py [new file with mode: 0644]
pyproject.toml

index b3ec5701d475402bb5fa23dc5847cd9632bd89af..44b46c5fea7706833a3966bb89e6c3febf7be216 100644 (file)
@@ -1,5 +1,5 @@
 {
-    "[python]": {
+  "[python]": {
         "editor.formatOnSave": true,
         "editor.codeActionsOnSave": {
             "source.fixAll": true,
diff --git a/music_assistant/server/providers/soundcloud/__init__.py b/music_assistant/server/providers/soundcloud/__init__.py
new file mode 100644 (file)
index 0000000..292985a
--- /dev/null
@@ -0,0 +1,313 @@
+"""Soundcloud support for MusicAssistant."""
+import asyncio
+from collections.abc import AsyncGenerator, Callable
+
+from music_assistant.common.helpers.util import parse_title_and_version
+from music_assistant.common.models.enums import ProviderFeature
+from music_assistant.common.models.errors import InvalidDataError, LoginFailed
+from music_assistant.common.models.media_items import (
+    Artist,
+    ContentType,
+    ImageType,
+    MediaItemImage,
+    MediaType,
+    Playlist,
+    ProviderMapping,
+    SearchResults,
+    StreamDetails,
+    Track,
+)
+from music_assistant.server.models.music_provider import MusicProvider
+
+from .soundcloudpy.asyncsoundcloudpy import SoundcloudAsync
+
+CONF_CLIENT_ID = "client_id"
+CONF_AUTHORIZATION = "authorization"
+
+SUPPORTED_FEATURES = (
+    ProviderFeature.LIBRARY_ARTISTS,
+    ProviderFeature.LIBRARY_TRACKS,
+    ProviderFeature.LIBRARY_PLAYLISTS,
+    ProviderFeature.BROWSE,
+    ProviderFeature.SEARCH,
+    ProviderFeature.ARTIST_TOPTRACKS,
+    ProviderFeature.SIMILAR_TRACKS,
+)
+
+
+class SoundcloudMusicProvider(MusicProvider):
+    """Provider for Soundcloud."""
+
+    _headers = None
+    _context = None
+    _cookies = None
+    _signature_timestamp = 0
+    _cipher = None
+    _user_id = None
+    _soundcloud = None
+    _me = None
+
+    async def setup(self) -> None:
+        """Set up the Soundcloud provider."""
+        if not self.config.get_value(CONF_CLIENT_ID) or not self.config.get_value(
+            CONF_AUTHORIZATION
+        ):
+            raise LoginFailed("Invalid login credentials")
+
+        client_id = self.config.get_value(CONF_CLIENT_ID)
+        auth_token = self.config.get_value(CONF_AUTHORIZATION)
+
+        async with SoundcloudAsync(auth_token, client_id) as account:
+            await account.login()
+
+        self._soundcloud = account
+        self._me = await account.get_account_details()
+        self._user_id = self._me["id"]
+
+    @property
+    def supported_features(self) -> tuple[ProviderFeature, ...]:
+        """Return the features supported by this Provider."""
+        return SUPPORTED_FEATURES
+
+    @classmethod
+    async def _run_async(cls, call: Callable, *args, **kwargs):
+        return await asyncio.to_thread(call, *args, **kwargs)
+
+    async def search(
+        self, search_query: str, media_types=list[MediaType] | None, limit: int = 10
+    ) -> SearchResults:
+        """Perform search on musicprovider.
+
+        :param search_query: Search query.
+        :param media_types: A list of media_types to include. All types if None.
+        :param limit: Number of items to return in the search (per type).
+        """
+        result = SearchResults()
+        searchtypes = []
+        if MediaType.ARTIST in media_types:
+            searchtypes.append("artist")
+        if MediaType.TRACK in media_types:
+            searchtypes.append("track")
+        if MediaType.PLAYLIST in media_types:
+            searchtypes.append("playlist")
+
+        searchresult = await self._soundcloud.search(search_query, limit)
+
+        for item in searchresult["collection"]:
+            media_type = item["kind"]
+            if media_type == "user":
+                result.artists.append(await self._parse_artist(item))
+            elif media_type == "track":
+                result.tracks.append(await self._parse_track(item))
+            elif media_type == "playlist":
+                result.playlists.append(await self._parse_playlist(item))
+        return result
+
+    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+        """Retrieve all library artists from Soundcloud."""
+        following = await self._soundcloud.get_following(self._user_id)
+        for artist in following["collection"]:
+            try:
+                yield await self._parse_artist(artist)
+            except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+                self.logger.debug("Parse artist failed: %s", artist, exc_info=error)
+                continue
+
+    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+        """Retrieve all library playlists from Soundcloud."""
+        playlists = await self._soundcloud.get_account_playlists()
+        for item in playlists["collection"]:
+            playlist_id = item["playlist"]["id"]
+            playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=playlist_id)
+            try:
+                yield await self._parse_playlist(playlist_obj)
+            except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+                self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
+                continue
+
+    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+        """Retrieve library tracks from Soundcloud."""
+        tracks = await self._soundcloud.get_tracks_liked()
+        for item in tracks["collection"]:
+            track = await self._soundcloud.get_track_details(item)
+            try:
+                yield await self._parse_track(track[0])
+            except IndexError:
+                continue
+            except (KeyError, TypeError, InvalidDataError) as error:
+                self.logger.debug("Parse track failed: %s", track, exc_info=error)
+                continue
+
+    async def get_artist(self, prov_artist_id) -> Artist:
+        """Get full artist details by id."""
+        artist_obj = await self._soundcloud.get_user_details(user_id=prov_artist_id)
+        try:
+            artist = await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
+        except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+            self.logger.debug("Parse artist failed: %s", artist_obj, exc_info=error)
+        return artist
+
+    async def get_track(self, prov_track_id) -> Track:
+        """Get full track details by id."""
+        track_obj = await self._soundcloud.get_track_details(track_id=prov_track_id)
+        try:
+            track = await self._parse_track(track_obj[0])
+        except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+            self.logger.debug("Parse track failed: %s", track_obj, exc_info=error)
+        return track
+
+    async def get_playlist(self, prov_playlist_id) -> Playlist:
+        """Get full playlist details by id."""
+        playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
+        try:
+            playlist = await self._parse_playlist(playlist_obj)
+        except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+            self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
+        return playlist
+
+    async def get_playlist_tracks(self, prov_playlist_id) -> list[Track]:
+        """Get all playlist tracks for given playlist id."""
+        playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
+        if "tracks" not in playlist_obj:
+            return []
+        tracks = []
+        for index, item in enumerate(playlist_obj["tracks"]):
+            song = await self._soundcloud.get_track_details(item["id"])
+            try:
+                track = await self._parse_track(song[0])
+                if track:
+                    track.position = index
+                    tracks.append(track)
+            except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+                self.logger.debug("Parse track failed: %s", song, exc_info=error)
+                continue
+        return tracks
+
+    async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
+        """Get a list of 25 most popular tracks for the given artist."""
+        tracks_obj = await self._soundcloud.get_popular_tracks_user(
+            user_id=prov_artist_id, limit=25
+        )
+        tracks = []
+        for item in tracks_obj["collection"]:
+            song = await self._soundcloud.get_track_details(item["id"])
+            try:
+                track = await self._parse_track(song[0])
+                tracks.append(track)
+            except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+                self.logger.debug("Parse track failed: %s", song, exc_info=error)
+                continue
+        return tracks
+
+    async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
+        """Retrieve a dynamic list of tracks based on the provided item."""
+        tracks_obj = await self._soundcloud.get_recommended(track_id=prov_track_id, limit=limit)
+        tracks = []
+        for item in tracks_obj["collection"]:
+            song = await self._soundcloud.get_track_details(item["id"])
+            try:
+                track = await self._parse_track(song[0])
+                tracks.append(track)
+            except (KeyError, TypeError, InvalidDataError, IndexError) as error:
+                self.logger.debug("Parse track failed: %s", song, exc_info=error)
+                continue
+
+        return tracks
+
+    async def get_stream_details(self, item_id: str) -> StreamDetails:
+        """Return the content details for the given track when it will be streamed."""
+        track_details = await self._soundcloud.get_track_details(track_id=item_id)
+        stream_format = track_details[0]["media"]["transcodings"][0]["format"]["mime_type"]
+        url = await self._soundcloud.get_stream_url(track_id=item_id)
+        return StreamDetails(
+            provider=self.domain,
+            item_id=item_id,
+            content_type=ContentType.try_parse(stream_format),
+            direct=url,
+        )
+
+    async def _parse_artist(self, artist_obj: dict) -> Artist:
+        """Parse a Soundcloud user response to Artist model object."""
+        artist_id = None
+        permalink = artist_obj["permalink"]
+        if "id" in artist_obj and artist_obj["id"]:
+            artist_id = artist_obj["id"]
+        if not artist_id:
+            raise InvalidDataError("Artist does not have a valid ID")
+        artist = Artist(item_id=artist_id, name=artist_obj["username"], provider=self.domain)
+        if artist_obj.get("description"):
+            artist.metadata.description = artist_obj["description"]
+        if artist_obj.get("avatar_url"):
+            img_url = artist_obj["avatar_url"]
+            artist.metadata.images = [MediaItemImage(ImageType.THUMB, img_url)]
+        artist.add_provider_mapping(
+            ProviderMapping(
+                item_id=str(artist_id),
+                provider_domain=self.domain,
+                provider_instance=self.instance_id,
+                url=f"https://soundcloud.com/{permalink}",
+            )
+        )
+        return artist
+
+    async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
+        """Parse a Soundcloud Playlist response to a Playlist object."""
+        playlist = Playlist(
+            item_id=playlist_obj["id"],
+            provider=self.domain,
+            name=playlist_obj["title"],
+        )
+        playlist.add_provider_mapping(
+            ProviderMapping(
+                item_id=playlist_obj["id"],
+                provider_domain=self.domain,
+                provider_instance=self.instance_id,
+            )
+        )
+        playlist.is_editable = False
+        if playlist_obj.get("description"):
+            playlist.metadata.description = playlist_obj["description"]
+        if playlist_obj.get("artwork_url"):
+            playlist.metadata.images = [
+                MediaItemImage(ImageType.THUMB, playlist_obj["artwork_url"])
+            ]
+        if playlist_obj.get("genre"):
+            playlist.metadata.genres = playlist_obj["genre"]
+        if playlist_obj.get("tag_list"):
+            playlist.metadata.style = playlist_obj["tag_list"]
+        return playlist
+
+    async def _parse_track(self, track_obj: dict) -> Track:
+        """Parse a Soundcloud Track response to a Track model object."""
+        name, version = parse_title_and_version(track_obj["title"])
+        track = Track(
+            item_id=track_obj["id"],
+            provider=self.domain,
+            name=name,
+            version=version,
+            duration=track_obj["duration"] / 1000,
+        )
+        user_id = track_obj["user"]["id"]
+        user = await self._soundcloud.get_user_details(user_id)
+        artist = await self._parse_artist(user)
+        if artist and artist.item_id not in {x.item_id for x in track.artists}:
+            track.artists.append(artist)
+
+        if track_obj.get("artwork_url"):
+            track.metadata.images = [MediaItemImage(ImageType.THUMB, track_obj["artwork_url"])]
+        if track_obj.get("description"):
+            track.metadata.description = track_obj["description"]
+        if track_obj.get("genre"):
+            track.metadata.genres = track_obj["genre"]
+        if track_obj.get("tag_list"):
+            track.metadata.style = track_obj["tag_list"]
+        track.add_provider_mapping(
+            ProviderMapping(
+                item_id=track_obj["id"],
+                provider_domain=self.domain,
+                provider_instance=self.instance_id,
+                content_type=ContentType.MP3,
+                url=track_obj["permalink_url"],
+            )
+        )
+        return track
diff --git a/music_assistant/server/providers/soundcloud/manifest.json b/music_assistant/server/providers/soundcloud/manifest.json
new file mode 100644 (file)
index 0000000..7e930a1
--- /dev/null
@@ -0,0 +1,23 @@
+{
+  "type": "music",
+  "domain": "soundcloud",
+  "name": "Soundcloud",
+  "description": "Support for the Soundcloud streaming provider in Music Assistant.",
+  "codeowners": ["@gieljnssns"],
+  "config_entries": [
+    {
+      "key": "client_id",
+      "type": "secure_string",
+      "label": "Client id"
+    },
+    {
+      "key": "authorization",
+      "type": "secure_string",
+      "label": "Authorization"
+    }
+  ],
+
+  "requirements": [],
+  "documentation": "https://github.com/orgs/music-assistant/discussions/1160",
+  "multi_instance": true
+}
diff --git a/music_assistant/server/providers/soundcloud/soundcloudpy/asyncsoundcloudpy.py b/music_assistant/server/providers/soundcloud/soundcloudpy/asyncsoundcloudpy.py
new file mode 100644 (file)
index 0000000..da78063
--- /dev/null
@@ -0,0 +1,322 @@
+"""Async helpers for connecting to the Soundcloud API.
+
+This file is based on soundcloudpy from Naím Rodríguez https://github.com/naim-prog
+Original package https://github.com/naim-prog/soundcloud-py
+"""
+from __future__ import annotations
+
+import aiohttp
+from aiohttp.client import ClientSession
+from attr import dataclass
+
+BASE_URL = "https://api-v2.soundcloud.com"
+
+
+@dataclass
+class SoundcloudAsync:
+    """Soundcloud."""
+
+    o_auth: str
+    client_id: str
+    headers = None
+    app_version = None
+    firefox_version = None
+    request_timeout: float = 8.0
+
+    session: ClientSession | None = None
+
+    async def get(self, url, headers=None, params=None):
+        """Async get."""
+        async with aiohttp.ClientSession(headers=headers) as session:
+            async with session.get(url=url, params=params) as response:
+                return await response.json()
+
+    async def login(self):
+        """Login to soundcloud."""
+        if len(self.client_id) != 32:
+            raise ValueError("Not valid client id")
+
+        # To get the last version of Firefox to prevent some type of deprecated version
+        json_versions = await self.get(
+            # self,
+            url="https://product-details.mozilla.org/1.0/firefox_versions.json",
+        )
+
+        self.firefox_version = dict(json_versions).get("LATEST_FIREFOX_VERSION")
+
+        #: Default headers that work properly for the API
+        #: User-Agent as if it was requested through Firefox Browser
+        self.headers = {
+            "Authorization": self.o_auth,
+            "Accept": "application/json",
+            "User-Agent": (
+                f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{self.firefox_version})"
+                f" Gecko/20100101 Firefox/{self.firefox_version}"
+            ),
+        }
+
+        # Version of soundcloud app
+        app_json = await self.get(url="https://soundcloud.com/versions.json")
+        self.app_version = dict(app_json).get("app")
+
+    # ---------------- USER ----------------
+
+    async def get_account_details(self):
+        """Get account details."""
+        return await self.get(url=f"{BASE_URL}/me", headers=self.headers)
+
+    async def get_user_details(self, user_id):
+        """:param user_id: id of the user requested"""
+        return await self.get(
+            url=f"{BASE_URL}/users/{user_id}?client_id={self.client_id}",
+            headers=self.headers,
+        )
+
+    async def get_following(self, own_user_id, limit=120):
+        """:param limit: max numbers of following accounts"""
+        return await self.get(
+            f"{BASE_URL}/users/{own_user_id}/followings?client_id={self.client_id}"
+            f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_recommended_users(self, limit=5):
+        """:param limit: max numbers of follower accounts to get"""
+        return await self.get(
+            f"{BASE_URL}/me/suggested/users/who_to_follow?view=recommended-first&client_id="
+            f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&app_version="
+            f"{self.app_version}",
+            headers=self.headers,
+        )
+
+    # ---------------- TRACKS ----------------
+
+    async def get_last_track_info(self, limit=1):
+        """:param limit: number of last tracks reproduced"""
+        return await self.get(
+            f"{BASE_URL}/me/play-history/tracks?client_id={self.client_id}&limit={limit}",
+            headers=self.headers,
+        )
+
+    async def get_track_likes_users(self, track_id):
+        """:param track_id: track id"""
+        return await self.get(
+            f"{BASE_URL}/tracks/{track_id}/likers?client_id={self.client_id}",
+            headers=self.headers,
+        )
+
+    async def get_track_details(self, track_id):
+        """:param track_id: track id"""
+        return await self.get(
+            f"{BASE_URL}/tracks?ids={track_id}&client_id={self.client_id}",
+            headers=self.headers,
+        )
+
+    async def get_tracks_liked(self, limit=50):
+        """:param limit: number of tracks to get"""
+        return await self.get(
+            f"{BASE_URL}/me/track_likes/ids?client_id={self.client_id}&"
+            f"limit={limit}&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_track_by_genre_recent(self, genre, limit=10):
+        """Get track by genre recent.
+
+        :param genre: string of the genre to get tracks
+        :param limit: limit of playlists to get
+        """
+        return await self.get(
+            f"{BASE_URL}/recent-tracks/{genre}?client_id={self.client_id}"
+            f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_track_by_genre_popular(self, genre, limit=10):
+        """Get track by genre popular.
+
+        :param genre: string of the genre to get tracks
+        :param limit: limit of playlists to get
+        """
+        return await self.get(
+            f"{BASE_URL}/search/tracks?q=&filter.genre_or_tag={genre}&sort=popular&client_id="
+            f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&app_version="
+            f"{self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_tracks_from_user(self, user_id, limit=10):
+        """Get tracks from user.
+
+        :param user_id: id of the user to get his tracks
+        :param limit:   number of tracks to get from user
+        """
+        return await self.get(
+            f"{BASE_URL}/users/{user_id}/tracks?representation=&client_id="
+            f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&app_version="
+            f"{self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_popular_tracks_user(self, user_id, limit=12):
+        """Get popular track from user.
+
+        :param user_id: id of the user to get his tracks
+        :param limit:   number of tracks to get from user
+        """
+        return await self.get(
+            f"{BASE_URL}/users/{user_id}/toptracks?client_id={self.client_id}"
+            f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    # ---------------- PLAYLISTS ----------------
+
+    async def get_account_playlists(self):
+        """Get account playlists."""
+        return await self.get(
+            f"{BASE_URL}/me/library/all?client_id{self.client_id}",
+            headers=self.headers,
+        )
+
+    async def get_playlist_details(self, playlist_id):
+        """:param playlist_id: playlist id"""
+        return await self.get(
+            f"{BASE_URL}/playlists/{playlist_id}?representation=full&client_id={self.client_id}",
+            headers=self.headers,
+        )
+
+    async def get_playlists_by_genre(self, genre, limit=10):
+        """Get playlists by genre.
+
+        :param genre: string of the genre to get tracks
+        :param limit: limit of playlists to get
+        """
+        return await self.get(
+            f"{BASE_URL}/playlists/discovery?tag={genre}&client_id="
+            f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&"
+            f"app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_playlists_from_user(self, user_id, limit=10):
+        """Get playlists from user.
+
+        :param user_id: user id to get his playlists
+        :param limit: limit of playlists to get
+        """
+        return await self.get(
+            f"{BASE_URL}/users/{user_id}/playlists_without_albums?client_id="
+            f"{self.client_id}&limit={limit}&offset=0&linked_partitioning=1&"
+            f"app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    # ---------------- MISCELLANEOUS ----------------
+
+    async def get_recommended(self, track_id):
+        """:param track_id: track id to get recommended tracks from this"""
+        return await self.get(
+            f"{BASE_URL}/tracks/{track_id}/related?client_id={self.client_id}",
+            headers=self.headers,
+        )
+
+    async def get_stream_url(self, track_id):
+        """:param track_id: track id"""
+        full_json = await self.get_track_details(track_id)
+        media_url = full_json[0]["media"]["transcodings"][0]["url"]
+        track_auth = full_json[0]["track_authorization"]
+        stream_url = f"{media_url}?client_id={self.client_id}&track_authorization={track_auth}"
+        req = await self.get(
+            stream_url,
+            headers=self.headers,
+        )
+
+        return dict(req).get("url")
+
+    async def get_comments_track(self, track_id, limit=100):
+        """Get track comments.
+
+        :param track_id: track id for get the comments from
+        :param limit:    limit of tracks to get
+        :add: with "next_href" in the return json you can keep getting more comments than limit
+        """
+        return await self.get(
+            f"{BASE_URL}/tracks/{track_id}/comments?threaded=0&filter_replies="
+            f"1&client_id={self.client_id}&limit={limit}&offset=0&linked_partitioning="
+            f"1&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_mixed_selection(self, limit=5):
+        """:param limit: limit of recommended playlists make for you"""
+        return await self.get(
+            f"{BASE_URL}/mixed-selections?variant_ids=&client_id="
+            f"{self.client_id}&limit={limit}&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def search(self, query_string, limit=25):
+        """Search.
+
+        :param query_string: string to search on soundcloud for tracks
+        :param limit: limit of recommended playlists make for you
+        """
+        return await self.get(
+            f"{BASE_URL}/search?q={query_string}&variant_ids=&facet=model&client_id="
+            f"{self.client_id}&limit={limit}&offset=0&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_subscribe_feed(self, limit=10):
+        """Get subscribe feed."""
+        return await self.get(
+            f"{BASE_URL}/stream?offset=10&limit={limit}&promoted_playlist=true&client_id"
+            f"={self.client_id}&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_album_from_user(self, user_id, limit=5):
+        """Get album from user.
+
+        :param user_id: user to get the albums from
+        :param limit:   numbers of albums to get from the user
+        """
+        return await self.get(
+            f"{BASE_URL}/users/{user_id}/albums?client_id={self.client_id}"
+            f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def get_all_feed_user(self, user_id, limit=20):
+        """Get all feed from user.
+
+        :param user_id: user to get the albums from
+        :param limit:   numbers of items to get from the user's feed
+        """
+        return await self.get(
+            f"{BASE_URL}/stream/users/{user_id}?client_id={self.client_id}"
+            f"&limit={limit}&offset=0&linked_partitioning=1&app_version={self.app_version}",
+            headers=self.headers,
+        )
+
+    async def close(self) -> None:
+        """Close open client session."""
+        if self.session:
+            await self.session.close()
+
+    async def __aenter__(self) -> SoundcloudAsync:
+        """Async enter.
+
+        Returns:
+            The SoundcloudAsync object.
+        """
+        return self
+
+    async def __aexit__(self, *_exc_info) -> None:
+        """Async exit.
+
+        Args:
+            _exc_info: Exec type.
+        """
+        await self.close()
index 621c0374fffccdbff00ebb6217029ea6b57c3f4b..2b37664404031d06296eac8739ece4e11f053919 100644 (file)
@@ -65,7 +65,7 @@ target-version = ['py311']
 line-length = 100
 
 [tool.codespell]
-ignore-words-list = "provid,hass"
+ignore-words-list = "provid,hass,followings"
 
 [tool.mypy]
 python_version = "3.11"