--- /dev/null
+"""KION Music provider support for Music Assistant."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, cast
+
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
+from music_assistant_models.enums import ConfigEntryType, ProviderFeature
+
+from .constants import (
+ CONF_ACTION_CLEAR_AUTH,
+ CONF_QUALITY,
+ CONF_TOKEN,
+ QUALITY_HIGH,
+ QUALITY_LOSSLESS,
+)
+from .provider import KionMusicProvider
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+SUPPORTED_FEATURES = {
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_ALBUMS,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ ProviderFeature.ARTIST_ALBUMS,
+ ProviderFeature.ARTIST_TOPTRACKS,
+ ProviderFeature.SEARCH,
+ ProviderFeature.LIBRARY_ARTISTS_EDIT,
+ ProviderFeature.LIBRARY_ALBUMS_EDIT,
+ ProviderFeature.LIBRARY_TRACKS_EDIT,
+ ProviderFeature.BROWSE,
+}
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return KionMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
+
+
+async def get_config_entries(
+ mass: MusicAssistant, # noqa: ARG001
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """Return Config entries to setup this provider."""
+ if values is None:
+ values = {}
+
+ # Handle clear auth action
+ if action == CONF_ACTION_CLEAR_AUTH:
+ values[CONF_TOKEN] = None
+
+ # Check if user is authenticated
+ is_authenticated = bool(values.get(CONF_TOKEN))
+
+ return (
+ ConfigEntry(
+ key=CONF_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label="KION Music Token",
+ description="Enter your KION Music OAuth token. "
+ "See the documentation for how to obtain it.",
+ required=True,
+ hidden=is_authenticated,
+ value=cast("str", values.get(CONF_TOKEN)) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_CLEAR_AUTH,
+ type=ConfigEntryType.ACTION,
+ label="Reset authentication",
+ description="Clear the current authentication details.",
+ action=CONF_ACTION_CLEAR_AUTH,
+ hidden=not is_authenticated,
+ ),
+ ConfigEntry(
+ key=CONF_QUALITY,
+ type=ConfigEntryType.STRING,
+ label="Audio quality",
+ description="Select preferred audio quality.",
+ options=[
+ ConfigValueOption("High (320 kbps)", QUALITY_HIGH),
+ ConfigValueOption("Lossless (FLAC)", QUALITY_LOSSLESS),
+ ],
+ default_value=QUALITY_HIGH,
+ ),
+ )
--- /dev/null
+"""API client wrapper for KION Music (MTS Music)."""
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any, cast
+
+from music_assistant_models.errors import (
+ LoginFailed,
+ ProviderUnavailableError,
+ ResourceTemporarilyUnavailable,
+)
+from yandex_music import Album as YandexAlbum
+from yandex_music import Artist as YandexArtist
+from yandex_music import ClientAsync, Search, TrackShort
+from yandex_music import Playlist as YandexPlaylist
+from yandex_music import Track as YandexTrack
+from yandex_music.exceptions import BadRequestError, NetworkError, UnauthorizedError
+from yandex_music.utils.sign_request import get_sign_request
+
+if TYPE_CHECKING:
+ from yandex_music import DownloadInfo
+
+from .constants import DEFAULT_LIMIT
+
+# get-file-info with quality=lossless returns FLAC; default /tracks/.../download-info often does not
+# Prefer flac-mp4/aac-mp4
+GET_FILE_INFO_CODECS = "flac-mp4,flac,aac-mp4,aac,he-aac,mp3,he-aac-mp4"
+# get-file-info: same host as library (all requests go through one API)
+KION_BASE_URL = "https://music.mts.ru/ya_api"
+
+LOGGER = logging.getLogger(__name__)
+
+
+class KionMusicClient:
+ """Wrapper around yandex-music-api ClientAsync for KION Music."""
+
+ def __init__(self, token: str) -> None:
+ """Initialize the KION Music client.
+
+ :param token: KION Music OAuth token.
+ """
+ self._token = token
+ self._client: ClientAsync | None = None
+ self._user_id: int | None = None
+
+ @property
+ def user_id(self) -> int:
+ """Return the user ID."""
+ if self._user_id is None:
+ raise ProviderUnavailableError("Client not initialized, call connect() first")
+ return self._user_id
+
+ async def connect(self) -> bool:
+ """Initialize the client and verify token validity.
+
+ :return: True if connection was successful.
+ :raises LoginFailed: If the token is invalid.
+ """
+ try:
+ self._client = await ClientAsync(self._token, base_url=KION_BASE_URL).init()
+ if self._client.me is None or self._client.me.account is None:
+ raise LoginFailed("Failed to get account info")
+ self._user_id = self._client.me.account.uid
+ LOGGER.debug("Connected to KION Music as user %s", self._user_id)
+ return True
+ except UnauthorizedError as err:
+ raise LoginFailed("Invalid KION Music token") from err
+ except NetworkError as err:
+ msg = "Network error connecting to KION Music"
+ raise ResourceTemporarilyUnavailable(msg) from err
+
+ async def disconnect(self) -> None:
+ """Disconnect the client."""
+ self._client = None
+ self._user_id = None
+
+ def _ensure_connected(self) -> ClientAsync:
+ """Ensure the client is connected and return it."""
+ if self._client is None:
+ raise ProviderUnavailableError("Client not connected, call connect() first")
+ return self._client
+
+ # Library methods
+
+ async def get_liked_tracks(self) -> list[TrackShort]:
+ """Get user's liked tracks.
+
+ :return: List of liked track objects.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_tracks()
+ if result is None:
+ return []
+ return result.tracks or []
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching liked tracks: %s", err)
+ raise ResourceTemporarilyUnavailable("Failed to fetch liked tracks") from err
+
+ async def get_liked_albums(self) -> list[YandexAlbum]:
+ """Get user's liked albums with full details (including cover art).
+
+ The users_likes_albums endpoint returns minimal album data without
+ cover_uri, so we fetch full album details in batches afterwards.
+
+ :return: List of liked album objects with full details.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_albums()
+ if result is None:
+ return []
+ album_ids = [
+ str(like.album.id) for like in result if like.album is not None and like.album.id
+ ]
+ if not album_ids:
+ return []
+ # Fetch full album details in batches to get cover_uri and other metadata
+ batch_size = 50
+ full_albums: list[YandexAlbum] = []
+ for i in range(0, len(album_ids), batch_size):
+ batch = album_ids[i : i + batch_size]
+ try:
+ batch_result = await client.albums(batch)
+ if batch_result:
+ full_albums.extend(batch_result)
+ except (BadRequestError, NetworkError) as batch_err:
+ LOGGER.warning("Error fetching album details batch: %s", batch_err)
+ # Fall back to minimal data for this batch
+ batch_set = set(batch)
+ for like in result:
+ if (
+ like.album is not None
+ and like.album.id
+ and str(like.album.id) in batch_set
+ ):
+ full_albums.append(like.album)
+ return full_albums
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching liked albums: %s", err)
+ raise ResourceTemporarilyUnavailable("Failed to fetch liked albums") from err
+
+ async def get_liked_artists(self) -> list[YandexArtist]:
+ """Get user's liked artists.
+
+ :return: List of liked artist objects.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_artists()
+ if result is None:
+ return []
+ return [like.artist for like in result if like.artist is not None]
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching liked artists: %s", err)
+ raise ResourceTemporarilyUnavailable("Failed to fetch liked artists") from err
+
+ async def get_user_playlists(self) -> list[YandexPlaylist]:
+ """Get user's playlists.
+
+ :return: List of playlist objects.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_playlists_list()
+ if result is None:
+ return []
+ return list(result)
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching playlists: %s", err)
+ raise ResourceTemporarilyUnavailable("Failed to fetch playlists") from err
+
+ # Search
+
+ async def search(
+ self,
+ query: str,
+ search_type: str = "all",
+ limit: int = DEFAULT_LIMIT,
+ ) -> Search | None:
+ """Search for tracks, albums, artists, or playlists.
+
+ :param query: Search query string.
+ :param search_type: Type of search ('all', 'track', 'album', 'artist', 'playlist').
+ :param limit: Maximum number of results per type.
+ :return: Search results object.
+ """
+ client = self._ensure_connected()
+ try:
+ return await client.search(query, type_=search_type, page=0, nocorrect=False)
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Search error: %s", err)
+ raise ResourceTemporarilyUnavailable("Search failed") from err
+
+ # Get single items
+
+ async def get_track(self, track_id: str) -> YandexTrack | None:
+ """Get a single track by ID.
+
+ :param track_id: Track ID.
+ :return: Track object or None if not found.
+ """
+ client = self._ensure_connected()
+ try:
+ tracks = await client.tracks([track_id])
+ return tracks[0] if tracks else None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching track %s: %s", track_id, err)
+ return None
+
+ async def get_tracks(self, track_ids: list[str]) -> list[YandexTrack]:
+ """Get multiple tracks by IDs.
+
+ :param track_ids: List of track IDs.
+ :return: List of track objects.
+ :raises ResourceTemporarilyUnavailable: On network errors after retry.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.tracks(track_ids)
+ return result or []
+ except NetworkError as err:
+ # Retry once on network errors (timeout, disconnect, etc.)
+ LOGGER.warning("Network error fetching tracks, retrying once: %s", err)
+ try:
+ result = await client.tracks(track_ids)
+ return result or []
+ except NetworkError as retry_err:
+ LOGGER.error("Error fetching tracks (retry failed): %s", retry_err)
+ raise ResourceTemporarilyUnavailable("Failed to fetch tracks") from retry_err
+ except BadRequestError as err:
+ LOGGER.error("Error fetching tracks: %s", err)
+ return []
+
+ async def get_album(self, album_id: str) -> YandexAlbum | None:
+ """Get a single album by ID.
+
+ :param album_id: Album ID.
+ :return: Album object or None if not found.
+ """
+ client = self._ensure_connected()
+ try:
+ albums = await client.albums([album_id])
+ return albums[0] if albums else None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching album %s: %s", album_id, err)
+ return None
+
+ async def get_album_with_tracks(self, album_id: str) -> YandexAlbum | None:
+ """Get an album with its tracks.
+
+ Uses the same semantics as the web client: albums/{id}/with-tracks
+ with resumeStream, richTracks, withListeningFinished when the library
+ passes them through.
+
+ :param album_id: Album ID.
+ :return: Album object with tracks or None if not found.
+ """
+ client = self._ensure_connected()
+ try:
+ return await client.albums_with_tracks(
+ album_id,
+ resumeStream=True,
+ richTracks=True,
+ withListeningFinished=True,
+ )
+ except TypeError:
+ # Older yandex-music may not accept these kwargs
+ return await client.albums_with_tracks(album_id)
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching album with tracks %s: %s", album_id, err)
+ return None
+
+ async def get_artist(self, artist_id: str) -> YandexArtist | None:
+ """Get a single artist by ID.
+
+ :param artist_id: Artist ID.
+ :return: Artist object or None if not found.
+ """
+ client = self._ensure_connected()
+ try:
+ artists = await client.artists([artist_id])
+ return artists[0] if artists else None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching artist %s: %s", artist_id, err)
+ return None
+
+ async def get_artist_albums(
+ self, artist_id: str, limit: int = DEFAULT_LIMIT
+ ) -> list[YandexAlbum]:
+ """Get artist's albums.
+
+ :param artist_id: Artist ID.
+ :param limit: Maximum number of albums.
+ :return: List of album objects.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.artists_direct_albums(artist_id, page=0, page_size=limit)
+ if result is None:
+ return []
+ return result.albums or []
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching artist albums %s: %s", artist_id, err)
+ return []
+
+ async def get_artist_tracks(
+ self, artist_id: str, limit: int = DEFAULT_LIMIT
+ ) -> list[YandexTrack]:
+ """Get artist's top tracks.
+
+ :param artist_id: Artist ID.
+ :param limit: Maximum number of tracks.
+ :return: List of track objects.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.artists_tracks(artist_id, page=0, page_size=limit)
+ if result is None:
+ return []
+ return result.tracks or []
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching artist tracks %s: %s", artist_id, err)
+ return []
+
+ async def get_playlist(self, user_id: str, playlist_id: str) -> YandexPlaylist | None:
+ """Get a playlist by ID.
+
+ :param user_id: User ID (owner of the playlist).
+ :param playlist_id: Playlist ID (kind).
+ :return: Playlist object or None if not found.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_playlists(kind=int(playlist_id), user_id=user_id)
+ if isinstance(result, list):
+ return result[0] if result else None
+ return result
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching playlist %s/%s: %s", user_id, playlist_id, err)
+ return None
+
+ # Streaming
+
+ async def get_track_download_info(
+ self, track_id: str, get_direct_links: bool = True
+ ) -> list[DownloadInfo]:
+ """Get download info for a track.
+
+ :param track_id: Track ID.
+ :param get_direct_links: Whether to get direct download links.
+ :return: List of download info objects.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.tracks_download_info(track_id, get_direct_links=get_direct_links)
+ return result or []
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error fetching download info for track %s: %s", track_id, err)
+ return []
+
+ async def get_track_file_info_lossless(self, track_id: str) -> dict[str, Any] | None:
+ """Request lossless stream via get-file-info (quality=lossless).
+
+ The /tracks/{id}/download-info endpoint often returns only MP3; get-file-info
+ with quality=lossless and codecs=flac,... returns FLAC when available.
+
+ :param track_id: Track ID.
+ :return: Parsed downloadInfo dict (url, codec, urls, ...) or None on error.
+ """
+ client = self._ensure_connected()
+ sign = get_sign_request(track_id)
+ base_params = {
+ "ts": sign.timestamp,
+ "trackId": track_id,
+ "quality": "lossless",
+ "codecs": GET_FILE_INFO_CODECS,
+ "sign": sign.value,
+ }
+
+ def _parse_file_info_result(raw: dict[str, Any] | None) -> dict[str, Any] | None:
+ if not raw or not isinstance(raw, dict):
+ return None
+ download_info = raw.get("download_info")
+ if not download_info or not download_info.get("url"):
+ return None
+ return cast("dict[str, Any]", download_info)
+
+ url = f"{KION_BASE_URL}/get-file-info"
+ params_encraw = {**base_params, "transports": "encraw"}
+ try:
+ result = await client._request.get(url, params=params_encraw)
+ return _parse_file_info_result(result)
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.debug(
+ "get-file-info lossless for track %s: %s %s",
+ track_id,
+ type(err).__name__,
+ getattr(err, "message", str(err)) or repr(err),
+ )
+ return None
+ except UnauthorizedError as err:
+ LOGGER.debug(
+ "get-file-info lossless for track %s (transports=encraw): %s %s",
+ track_id,
+ type(err).__name__,
+ getattr(err, "message", str(err)) or repr(err),
+ )
+ LOGGER.debug(
+ "If you have KION Music Plus and this track has lossless, "
+ "try a token from the web client (music.mts.ru)."
+ )
+ params_raw = {**base_params, "transports": "raw"}
+ try:
+ result = await client._request.get(url, params=params_raw)
+ return _parse_file_info_result(result)
+ except (BadRequestError, NetworkError, UnauthorizedError) as retry_err:
+ LOGGER.debug(
+ "get-file-info lossless for track %s (transports=raw): %s %s",
+ track_id,
+ type(retry_err).__name__,
+ getattr(retry_err, "message", str(retry_err)) or repr(retry_err),
+ )
+ return None
+
+ # Library modifications
+
+ async def like_track(self, track_id: str) -> bool:
+ """Add a track to liked tracks.
+
+ :param track_id: Track ID to like.
+ :return: True if successful.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_tracks_add(track_id)
+ return result is not None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error liking track %s: %s", track_id, err)
+ return False
+
+ async def unlike_track(self, track_id: str) -> bool:
+ """Remove a track from liked tracks.
+
+ :param track_id: Track ID to unlike.
+ :return: True if successful.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_tracks_remove(track_id)
+ return result is not None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error unliking track %s: %s", track_id, err)
+ return False
+
+ async def like_album(self, album_id: str) -> bool:
+ """Add an album to liked albums.
+
+ :param album_id: Album ID to like.
+ :return: True if successful.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_albums_add(album_id)
+ return result is not None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error liking album %s: %s", album_id, err)
+ return False
+
+ async def unlike_album(self, album_id: str) -> bool:
+ """Remove an album from liked albums.
+
+ :param album_id: Album ID to unlike.
+ :return: True if successful.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_albums_remove(album_id)
+ return result is not None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error unliking album %s: %s", album_id, err)
+ return False
+
+ async def like_artist(self, artist_id: str) -> bool:
+ """Add an artist to liked artists.
+
+ :param artist_id: Artist ID to like.
+ :return: True if successful.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_artists_add(artist_id)
+ return result is not None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error liking artist %s: %s", artist_id, err)
+ return False
+
+ async def unlike_artist(self, artist_id: str) -> bool:
+ """Remove an artist from liked artists.
+
+ :param artist_id: Artist ID to unlike.
+ :return: True if successful.
+ """
+ client = self._ensure_connected()
+ try:
+ result = await client.users_likes_artists_remove(artist_id)
+ return result is not None
+ except (BadRequestError, NetworkError) as err:
+ LOGGER.error("Error unliking artist %s: %s", artist_id, err)
+ return False
--- /dev/null
+"""Constants for the KION Music provider."""
+
+from __future__ import annotations
+
+from typing import Final
+
+# Configuration Keys
+CONF_TOKEN = "token"
+CONF_QUALITY = "quality"
+
+# Actions
+CONF_ACTION_AUTH = "auth"
+CONF_ACTION_CLEAR_AUTH = "clear_auth"
+
+# Labels
+LABEL_TOKEN = "token_label"
+LABEL_AUTH_INSTRUCTIONS = "auth_instructions_label"
+
+# API defaults
+DEFAULT_LIMIT: Final[int] = 50
+
+# Quality options
+QUALITY_HIGH = "high"
+QUALITY_LOSSLESS = "lossless"
+
+# Image sizes
+IMAGE_SIZE_SMALL = "200x200"
+IMAGE_SIZE_MEDIUM = "400x400"
+IMAGE_SIZE_LARGE = "1000x1000"
+
+# ID separators
+PLAYLIST_ID_SPLITTER: Final[str] = ":"
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 144 144">
+ <defs>
+ <linearGradient id="bg" x1="0%" y1="0%" x2="100%" y2="100%">
+ <stop offset="0%" style="stop-color:#5B2D91"/>
+ <stop offset="100%" style="stop-color:#E31E52"/>
+ </linearGradient>
+ </defs>
+ <rect width="144" height="144" rx="24" fill="url(#bg)"/>
+ <circle cx="52" cy="100" r="22" fill="#FFFFFF"/>
+ <polygon points="82,32 118,22 110,46 74,56" fill="#FFFFFF"/>
+ <polygon points="82,52 118,42 110,66 74,76" fill="#FFFFFF"/>
+</svg>
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 144 144">
+ <rect width="144" height="144" rx="24" fill="#FFFFFF"/>
+ <circle cx="52" cy="100" r="22" fill="#000000"/>
+ <polygon points="82,32 118,22 110,46 74,56" fill="#000000"/>
+ <polygon points="82,52 118,42 110,66 74,76" fill="#000000"/>
+</svg>
--- /dev/null
+{
+ "type": "music",
+ "domain": "kion_music",
+ "stage": "beta",
+ "name": "KION Music",
+ "description": "Stream music from KION Music (MTS) service.",
+ "codeowners": ["@TrudenBoy"],
+ "documentation": "https://music-assistant.io/music-providers/kion-music/",
+ "requirements": ["yandex-music==2.2.0"],
+ "multi_instance": true
+}
--- /dev/null
+"""Parsers for KION Music API responses."""
+
+from __future__ import annotations
+
+from contextlib import suppress
+from datetime import datetime
+from typing import TYPE_CHECKING
+
+from music_assistant_models.enums import (
+ AlbumType,
+ ContentType,
+ ImageType,
+)
+from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ AudioFormat,
+ MediaItemImage,
+ Playlist,
+ ProviderMapping,
+ Track,
+ UniqueList,
+)
+
+from music_assistant.helpers.util import parse_title_and_version
+
+from .constants import IMAGE_SIZE_LARGE
+
+if TYPE_CHECKING:
+ from yandex_music import Album as YandexAlbum
+ from yandex_music import Artist as YandexArtist
+ from yandex_music import Playlist as YandexPlaylist
+ from yandex_music import Track as YandexTrack
+
+ from .provider import KionMusicProvider
+
+
+def _get_content_type(provider: KionMusicProvider) -> ContentType:
+ """Get content type based on provider quality setting.
+
+ :param provider: The KION Music provider instance.
+ :return: ContentType.UNKNOWN as actual codec is determined at stream time.
+ """
+ # Actual codec is determined when getting stream details
+ # Suppress unused argument warning
+ _ = provider
+ return ContentType.UNKNOWN
+
+
+def _get_image_url(cover_uri: str | None, size: str = IMAGE_SIZE_LARGE) -> str | None:
+ """Convert cover URI to full URL.
+
+ :param cover_uri: Cover URI template.
+ :param size: Image size (e.g., '1000x1000').
+ :return: Full image URL or None.
+ """
+ if not cover_uri:
+ return None
+ # Cover URIs come in format "avatars.yandex.net/get-music-content/xxx/yyy/%%"
+ # Replace %% with the desired size
+ return f"https://{cover_uri.replace('%%', size)}"
+
+
+def parse_artist(provider: KionMusicProvider, artist_obj: YandexArtist) -> Artist:
+ """Parse KION artist object to MA Artist model.
+
+ :param provider: The KION Music provider instance.
+ :param artist_obj: KION artist object.
+ :return: Music Assistant Artist model.
+ """
+ artist_id = str(artist_obj.id)
+ artist = Artist(
+ item_id=artist_id,
+ provider=provider.instance_id,
+ name=artist_obj.name or "Unknown Artist",
+ provider_mappings={
+ ProviderMapping(
+ item_id=artist_id,
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ url=f"https://music.mts.ru/artist/{artist_id}",
+ )
+ },
+ )
+
+ # Add image if available
+ if artist_obj.cover:
+ image_url = _get_image_url(artist_obj.cover.uri)
+ if image_url:
+ artist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+ elif artist_obj.og_image:
+ image_url = _get_image_url(artist_obj.og_image)
+ if image_url:
+ artist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
+ return artist
+
+
+def parse_album(provider: KionMusicProvider, album_obj: YandexAlbum) -> Album:
+ """Parse KION album object to MA Album model.
+
+ :param provider: The KION Music provider instance.
+ :param album_obj: KION album object.
+ :return: Music Assistant Album model.
+ """
+ name, version = parse_title_and_version(
+ album_obj.title or "Unknown Album",
+ album_obj.version or None,
+ )
+ album_id = str(album_obj.id)
+
+ # Determine availability
+ available = album_obj.available or False
+
+ album = Album(
+ item_id=album_id,
+ provider=provider.instance_id,
+ name=name,
+ version=version,
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_id,
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ audio_format=AudioFormat(
+ content_type=_get_content_type(provider),
+ ),
+ url=f"https://music.mts.ru/album/{album_id}",
+ available=available,
+ )
+ },
+ )
+
+ # Parse artists
+ various_artist_album = False
+ if album_obj.artists:
+ for artist in album_obj.artists:
+ if artist.name and artist.name.lower() in ("various artists", "сборник"):
+ various_artist_album = True
+ album.artists.append(parse_artist(provider, artist))
+
+ # Determine album type
+ album_type_str = album_obj.type or "album"
+ if album_type_str == "compilation" or various_artist_album:
+ album.album_type = AlbumType.COMPILATION
+ elif album_type_str == "single":
+ album.album_type = AlbumType.SINGLE
+ else:
+ album.album_type = AlbumType.ALBUM
+
+ # Parse year
+ if album_obj.year:
+ album.year = album_obj.year
+ if album_obj.release_date:
+ with suppress(ValueError):
+ album.metadata.release_date = datetime.fromisoformat(album_obj.release_date)
+
+ # Parse metadata
+ if album_obj.genre:
+ album.metadata.genres = {album_obj.genre}
+
+ # Add cover image
+ if album_obj.cover_uri:
+ image_url = _get_image_url(album_obj.cover_uri)
+ if image_url:
+ album.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+ elif album_obj.og_image:
+ image_url = _get_image_url(album_obj.og_image)
+ if image_url:
+ album.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
+ return album
+
+
+def parse_track(provider: KionMusicProvider, track_obj: YandexTrack) -> Track:
+ """Parse KION track object to MA Track model.
+
+ :param provider: The KION Music provider instance.
+ :param track_obj: KION track object.
+ :return: Music Assistant Track model.
+ """
+ name, version = parse_title_and_version(
+ track_obj.title or "Unknown Track",
+ track_obj.version or None,
+ )
+ track_id = str(track_obj.id)
+
+ # Determine availability
+ available = track_obj.available or False
+
+ # Duration is in milliseconds
+ duration = (track_obj.duration_ms or 0) // 1000
+
+ track = Track(
+ item_id=track_id,
+ provider=provider.instance_id,
+ name=name,
+ version=version,
+ duration=duration,
+ provider_mappings={
+ ProviderMapping(
+ item_id=track_id,
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ audio_format=AudioFormat(
+ content_type=_get_content_type(provider),
+ ),
+ url=f"https://music.mts.ru/track/{track_id}",
+ available=available,
+ )
+ },
+ )
+
+ # Parse artists
+ if track_obj.artists:
+ track.artists = UniqueList()
+ for artist in track_obj.artists:
+ track.artists.append(parse_artist(provider, artist))
+
+ # Parse album (full data so album gets cover art in the library)
+ if track_obj.albums and len(track_obj.albums) > 0:
+ album_obj = track_obj.albums[0]
+ track.album = parse_album(provider, album_obj)
+ # Also set track image from album cover if available
+ if album_obj.cover_uri:
+ image_url = _get_image_url(album_obj.cover_uri)
+ if image_url:
+ track.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
+ # Parse external IDs
+ if track_obj.real_id:
+ # real_id can be used as an identifier
+ pass
+
+ # Metadata
+ if track_obj.content_warning:
+ track.metadata.explicit = track_obj.content_warning == "explicit"
+
+ return track
+
+
+def parse_playlist(
+ provider: KionMusicProvider, playlist_obj: YandexPlaylist, owner_name: str | None = None
+) -> Playlist:
+ """Parse KION playlist object to MA Playlist model.
+
+ :param provider: The KION Music provider instance.
+ :param playlist_obj: KION playlist object.
+ :param owner_name: Optional owner name override.
+ :return: Music Assistant Playlist model.
+ """
+ # Playlist ID is a combination of owner uid and playlist kind
+ owner_id = str(playlist_obj.owner.uid) if playlist_obj.owner else str(provider.client.user_id)
+ playlist_kind = str(playlist_obj.kind)
+ playlist_id = f"{owner_id}:{playlist_kind}"
+
+ # Determine if editable (user owns the playlist)
+ is_editable = owner_id == str(provider.client.user_id)
+
+ # Get owner name
+ if owner_name is None:
+ if playlist_obj.owner and playlist_obj.owner.name:
+ owner_name = playlist_obj.owner.name
+ elif is_editable:
+ owner_name = "Me"
+ else:
+ owner_name = "KION Music"
+
+ playlist = Playlist(
+ item_id=playlist_id,
+ provider=provider.instance_id,
+ name=playlist_obj.title or "Unknown Playlist",
+ owner=owner_name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlist_id,
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ url=f"https://music.mts.ru/users/{owner_id}/playlists/{playlist_kind}",
+ is_unique=is_editable,
+ )
+ },
+ is_editable=is_editable,
+ )
+
+ # Metadata
+ if playlist_obj.description:
+ playlist.metadata.description = playlist_obj.description
+
+ # Add cover image
+ if playlist_obj.cover:
+ # Cover can be CoverImage or a string
+ cover = playlist_obj.cover
+ if hasattr(cover, "uri") and cover.uri:
+ image_url = _get_image_url(cover.uri)
+ if image_url:
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+ elif playlist_obj.og_image:
+ image_url = _get_image_url(playlist_obj.og_image)
+ if image_url:
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=provider.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
+ return playlist
--- /dev/null
+"""KION Music provider implementation."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from music_assistant_models.enums import MediaType
+from music_assistant_models.errors import (
+ InvalidDataError,
+ LoginFailed,
+ MediaNotFoundError,
+ ProviderUnavailableError,
+ ResourceTemporarilyUnavailable,
+)
+from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ ItemMapping,
+ MediaItemType,
+ Playlist,
+ SearchResults,
+ Track,
+)
+
+from music_assistant.controllers.cache import use_cache
+from music_assistant.models.music_provider import MusicProvider
+
+from .api_client import KionMusicClient
+from .constants import CONF_TOKEN, PLAYLIST_ID_SPLITTER
+from .parsers import parse_album, parse_artist, parse_playlist, parse_track
+from .streaming import KionMusicStreamingManager
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator
+
+ from music_assistant_models.streamdetails import StreamDetails
+
+
+class KionMusicProvider(MusicProvider):
+ """Implementation of a KION Music MusicProvider."""
+
+ _client: KionMusicClient | None = None
+ _streaming: KionMusicStreamingManager | None = None
+
+ @property
+ def client(self) -> KionMusicClient:
+ """Return the KION Music client."""
+ if self._client is None:
+ raise ProviderUnavailableError("Provider not initialized")
+ return self._client
+
+ @property
+ def streaming(self) -> KionMusicStreamingManager:
+ """Return the streaming manager."""
+ if self._streaming is None:
+ raise ProviderUnavailableError("Provider not initialized")
+ return self._streaming
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ token = self.config.get_value(CONF_TOKEN)
+ if not token:
+ raise LoginFailed("No KION Music token provided")
+
+ self._client = KionMusicClient(str(token))
+ await self._client.connect()
+ self._streaming = KionMusicStreamingManager(self)
+ self.logger.info("Successfully connected to KION Music")
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """Handle unload/close of the provider.
+
+ :param is_removed: Whether the provider is being removed.
+ """
+ if self._client:
+ await self._client.disconnect()
+ self._client = None
+ self._streaming = None
+ await super().unload(is_removed)
+
+ def get_item_mapping(self, media_type: MediaType | str, key: str, name: str) -> ItemMapping:
+ """Create a generic item mapping.
+
+ :param media_type: The media type.
+ :param key: The item ID.
+ :param name: The item name.
+ :return: An ItemMapping instance.
+ """
+ if isinstance(media_type, str):
+ media_type = MediaType(media_type)
+ return ItemMapping(
+ media_type=media_type,
+ item_id=key,
+ provider=self.instance_id,
+ name=name,
+ )
+
+ # Search
+
+ @use_cache(3600 * 24 * 14)
+ async def search(
+ self, search_query: str, media_types: list[MediaType], limit: int = 5
+ ) -> SearchResults:
+ """Perform search on KION Music.
+
+ :param search_query: The search query.
+ :param media_types: List of media types to search for.
+ :param limit: Maximum number of results per type.
+ :return: SearchResults with found items.
+ """
+ result = SearchResults()
+
+ # Determine search type based on requested media types
+ # Map MediaType to KION API search type
+ type_mapping = {
+ MediaType.TRACK: "track",
+ MediaType.ALBUM: "album",
+ MediaType.ARTIST: "artist",
+ MediaType.PLAYLIST: "playlist",
+ }
+ requested_types = [type_mapping[mt] for mt in media_types if mt in type_mapping]
+
+ # Use specific type if only one requested, otherwise search all
+ search_type = requested_types[0] if len(requested_types) == 1 else "all"
+
+ search_result = await self.client.search(search_query, search_type=search_type, limit=limit)
+ if not search_result:
+ return result
+
+ # Parse tracks
+ if MediaType.TRACK in media_types and search_result.tracks:
+ for track in search_result.tracks.results[:limit]:
+ try:
+ result.tracks = [*result.tracks, parse_track(self, track)]
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing track: %s", err)
+
+ # Parse albums
+ if MediaType.ALBUM in media_types and search_result.albums:
+ for album in search_result.albums.results[:limit]:
+ try:
+ result.albums = [*result.albums, parse_album(self, album)]
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing album: %s", err)
+
+ # Parse artists
+ if MediaType.ARTIST in media_types and search_result.artists:
+ for artist in search_result.artists.results[:limit]:
+ try:
+ result.artists = [*result.artists, parse_artist(self, artist)]
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing artist: %s", err)
+
+ # Parse playlists
+ if MediaType.PLAYLIST in media_types and search_result.playlists:
+ for playlist in search_result.playlists.results[:limit]:
+ try:
+ result.playlists = [*result.playlists, parse_playlist(self, playlist)]
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing playlist: %s", err)
+
+ return result
+
+ # Get single items
+
+ @use_cache(3600 * 24 * 30)
+ async def get_artist(self, prov_artist_id: str) -> Artist:
+ """Get artist details by ID.
+
+ :param prov_artist_id: The provider artist ID.
+ :return: Artist object.
+ :raises MediaNotFoundError: If artist not found.
+ """
+ artist = await self.client.get_artist(prov_artist_id)
+ if not artist:
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
+ return parse_artist(self, artist)
+
+ @use_cache(3600 * 24 * 30)
+ async def get_album(self, prov_album_id: str) -> Album:
+ """Get album details by ID.
+
+ :param prov_album_id: The provider album ID.
+ :return: Album object.
+ :raises MediaNotFoundError: If album not found.
+ """
+ album = await self.client.get_album(prov_album_id)
+ if not album:
+ raise MediaNotFoundError(f"Album {prov_album_id} not found")
+ return parse_album(self, album)
+
+ @use_cache(3600 * 24 * 30)
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get track details by ID.
+
+ :param prov_track_id: The provider track ID.
+ :return: Track object.
+ :raises MediaNotFoundError: If track not found.
+ """
+ yandex_track = await self.client.get_track(prov_track_id)
+ if not yandex_track:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found")
+ return parse_track(self, yandex_track)
+
+ @use_cache(3600 * 24 * 30)
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
+ """Get playlist details by ID.
+
+ :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind").
+ :return: Playlist object.
+ :raises MediaNotFoundError: If playlist not found.
+ """
+ # Parse the playlist ID (format: owner_id:kind)
+ if PLAYLIST_ID_SPLITTER in prov_playlist_id:
+ owner_id, kind = prov_playlist_id.split(PLAYLIST_ID_SPLITTER, 1)
+ else:
+ owner_id = str(self.client.user_id)
+ kind = prov_playlist_id
+
+ playlist = await self.client.get_playlist(owner_id, kind)
+ if not playlist:
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
+ return parse_playlist(self, playlist)
+
+ # Get related items
+
+ @use_cache(3600 * 24 * 30)
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
+ """Get album tracks.
+
+ :param prov_album_id: The provider album ID.
+ :return: List of Track objects.
+ """
+ album = await self.client.get_album_with_tracks(prov_album_id)
+ if not album or not album.volumes:
+ return []
+
+ tracks = []
+ for volume_index, volume in enumerate(album.volumes):
+ for track_index, track in enumerate(volume):
+ try:
+ parsed_track = parse_track(self, track)
+ parsed_track.disc_number = volume_index + 1
+ parsed_track.track_number = track_index + 1
+ tracks.append(parsed_track)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing album track: %s", err)
+ return tracks
+
+ @use_cache(3600 * 3)
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
+ """Get playlist tracks.
+
+ :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind").
+ :param page: Page number for pagination.
+ :return: List of Track objects.
+ """
+ # KION Music API returns all playlist tracks in one call (no server-side pagination)
+ if page > 0:
+ return []
+
+ self.logger.debug("get_playlist_tracks called: %s", prov_playlist_id)
+ # Parse the playlist ID (format: owner_id:kind)
+ if PLAYLIST_ID_SPLITTER in prov_playlist_id:
+ owner_id, kind = prov_playlist_id.split(PLAYLIST_ID_SPLITTER, 1)
+ else:
+ owner_id = str(self.client.user_id)
+ kind = prov_playlist_id
+
+ self.logger.debug("Fetching playlist %s/%s from API...", owner_id, kind)
+ playlist = await self.client.get_playlist(owner_id, kind)
+ if not playlist:
+ self.logger.debug("Playlist %s/%s not found", owner_id, kind)
+ return []
+
+ # API sometimes returns playlist without tracks; fetch them explicitly if needed
+ tracks_list = playlist.tracks or []
+ track_count = getattr(playlist, "track_count", None) or 0
+ self.logger.debug(
+ "Playlist %s/%s: track_count=%s, tracks_in_response=%s",
+ owner_id,
+ kind,
+ track_count,
+ len(tracks_list),
+ )
+ if not tracks_list and track_count > 0:
+ self.logger.debug("No tracks in response, calling fetch_tracks_async...")
+ try:
+ tracks_list = await playlist.fetch_tracks_async()
+ self.logger.debug("fetch_tracks_async returned %s tracks", len(tracks_list or []))
+ except Exception as err:
+ self.logger.warning("fetch_tracks_async failed: %s", err)
+ if not tracks_list:
+ self.logger.warning(
+ "Playlist %s/%s: expected %s tracks but got none",
+ owner_id,
+ kind,
+ track_count,
+ )
+ raise ResourceTemporarilyUnavailable(
+ "Playlist tracks not available; try again later"
+ ) from None
+
+ if not tracks_list:
+ self.logger.debug("Playlist %s/%s has no tracks", owner_id, kind)
+ return []
+
+ # API returns TrackShort objects, we need to fetch full track info
+ track_ids = [
+ str(track.track_id) if hasattr(track, "track_id") else str(track.id)
+ for track in tracks_list
+ if track
+ ]
+ if not track_ids:
+ return []
+
+ self.logger.debug("Fetching full details for %s tracks...", len(track_ids))
+ # Fetch full track details in batches to avoid timeouts
+ batch_size = 50
+ full_tracks = []
+ for i in range(0, len(track_ids), batch_size):
+ batch = track_ids[i : i + batch_size]
+ self.logger.debug("Fetching batch %s-%s...", i, i + len(batch))
+ batch_result = await self.client.get_tracks(batch)
+ self.logger.debug("Batch returned %s tracks", len(batch_result or []))
+ full_tracks.extend(batch_result or [])
+
+ if track_ids and not full_tracks:
+ self.logger.warning("Got 0 full tracks for %s IDs", len(track_ids))
+ raise ResourceTemporarilyUnavailable(
+ "Failed to load track details; try again later"
+ ) from None
+
+ tracks = []
+ for track in full_tracks:
+ try:
+ tracks.append(parse_track(self, track))
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing playlist track: %s", err)
+ self.logger.debug("Returning %s parsed tracks", len(tracks))
+ return tracks
+
+ @use_cache(3600 * 24 * 7)
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
+ """Get artist's albums.
+
+ :param prov_artist_id: The provider artist ID.
+ :return: List of Album objects.
+ """
+ albums = await self.client.get_artist_albums(prov_artist_id)
+ result = []
+ for album in albums:
+ try:
+ result.append(parse_album(self, album))
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing artist album: %s", err)
+ return result
+
+ @use_cache(3600 * 24 * 7)
+ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
+ """Get artist's top tracks.
+
+ :param prov_artist_id: The provider artist ID.
+ :return: List of Track objects.
+ """
+ tracks = await self.client.get_artist_tracks(prov_artist_id)
+ result = []
+ for track in tracks:
+ try:
+ result.append(parse_track(self, track))
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing artist track: %s", err)
+ return result
+
+ # Library methods
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve library artists from KION Music."""
+ artists = await self.client.get_liked_artists()
+ for artist in artists:
+ try:
+ yield parse_artist(self, artist)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing library artist: %s", err)
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """Retrieve library albums from KION Music."""
+ albums = await self.client.get_liked_albums()
+ for album in albums:
+ try:
+ yield parse_album(self, album)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing library album: %s", err)
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """Retrieve library tracks from KION Music."""
+ track_shorts = await self.client.get_liked_tracks()
+ if not track_shorts:
+ return
+
+ # Fetch full track details in batches
+ track_ids = [str(ts.track_id) for ts in track_shorts if ts.track_id]
+ batch_size = 50
+ for i in range(0, len(track_ids), batch_size):
+ batch_ids = track_ids[i : i + batch_size]
+ full_tracks = await self.client.get_tracks(batch_ids)
+ for track in full_tracks:
+ try:
+ yield parse_track(self, track)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing library track: %s", err)
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Retrieve library playlists from KION Music."""
+ playlists = await self.client.get_user_playlists()
+ for playlist in playlists:
+ try:
+ yield parse_playlist(self, playlist)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing library playlist: %s", err)
+
+ # Library edit methods
+
+ async def library_add(self, item: MediaItemType) -> bool:
+ """Add item to library.
+
+ :param item: The media item to add.
+ :return: True if successful.
+ """
+ prov_item_id = self._get_provider_item_id(item)
+ if not prov_item_id:
+ return False
+
+ if item.media_type == MediaType.TRACK:
+ return await self.client.like_track(prov_item_id)
+ if item.media_type == MediaType.ALBUM:
+ return await self.client.like_album(prov_item_id)
+ if item.media_type == MediaType.ARTIST:
+ return await self.client.like_artist(prov_item_id)
+ return False
+
+ async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
+ """Remove item from library.
+
+ :param prov_item_id: The provider item ID.
+ :param media_type: The media type.
+ :return: True if successful.
+ """
+ if media_type == MediaType.TRACK:
+ return await self.client.unlike_track(prov_item_id)
+ if media_type == MediaType.ALBUM:
+ return await self.client.unlike_album(prov_item_id)
+ if media_type == MediaType.ARTIST:
+ return await self.client.unlike_artist(prov_item_id)
+ return False
+
+ def _get_provider_item_id(self, item: MediaItemType) -> str | None:
+ """Get provider item ID from media item."""
+ for mapping in item.provider_mappings:
+ if mapping.provider_instance == self.instance_id:
+ return mapping.item_id
+ return item.item_id if item.provider == self.instance_id else None
+
+ # Streaming
+
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
+ """Get stream details for a track.
+
+ :param item_id: The track ID.
+ :param media_type: The media type (should be TRACK).
+ :return: StreamDetails for the track.
+ """
+ return await self.streaming.get_stream_details(item_id)
--- /dev/null
+"""Streaming operations for KION Music."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.enums import ContentType, StreamType
+from music_assistant_models.errors import MediaNotFoundError
+from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.streamdetails import StreamDetails
+
+from .constants import CONF_QUALITY, QUALITY_LOSSLESS
+
+if TYPE_CHECKING:
+ from yandex_music import DownloadInfo
+
+ from .provider import KionMusicProvider
+
+
+class KionMusicStreamingManager:
+ """Manages KION Music streaming operations."""
+
+ def __init__(self, provider: KionMusicProvider) -> None:
+ """Initialize streaming manager.
+
+ :param provider: The KION Music provider instance.
+ """
+ self.provider = provider
+ self.client = provider.client
+ self.mass = provider.mass
+ self.logger = provider.logger
+
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
+ """Get stream details for a track.
+
+ :param item_id: Track ID.
+ :return: StreamDetails for the track.
+ :raises MediaNotFoundError: If stream URL cannot be obtained.
+ """
+ # Get track info first
+ track = await self.provider.get_track(item_id)
+ if not track:
+ raise MediaNotFoundError(f"Track {item_id} not found")
+
+ quality = self.provider.config.get_value(CONF_QUALITY)
+ quality_str = str(quality) if quality is not None else None
+ preferred_normalized = (quality_str or "").strip().lower()
+ want_lossless = (
+ QUALITY_LOSSLESS in preferred_normalized or preferred_normalized == QUALITY_LOSSLESS
+ )
+
+ # When user wants lossless, try get-file-info first (FLAC; download-info often MP3 only)
+ if want_lossless:
+ self.logger.debug("Requesting lossless via get-file-info for track %s", item_id)
+ file_info = await self.client.get_track_file_info_lossless(item_id)
+ if file_info:
+ url = file_info.get("url")
+ codec = file_info.get("codec") or ""
+ if url and codec.lower() in ("flac", "flac-mp4"):
+ content_type = self._get_content_type(codec)
+ self.logger.debug(
+ "Stream selected for track %s via get-file-info: codec=%s",
+ item_id,
+ codec,
+ )
+ return StreamDetails(
+ item_id=item_id,
+ provider=self.provider.instance_id,
+ audio_format=AudioFormat(
+ content_type=content_type,
+ bit_rate=0,
+ ),
+ stream_type=StreamType.HTTP,
+ duration=track.duration,
+ path=url,
+ can_seek=True,
+ allow_seek=True,
+ )
+
+ # Default: use /tracks/.../download-info and select best quality
+ download_infos = await self.client.get_track_download_info(item_id, get_direct_links=True)
+ if not download_infos:
+ raise MediaNotFoundError(f"No stream info available for track {item_id}")
+
+ codecs_available = [
+ (getattr(i, "codec", None), getattr(i, "bitrate_in_kbps", None)) for i in download_infos
+ ]
+ self.logger.debug(
+ "Stream quality for track %s: config quality=%s, available codecs=%s",
+ item_id,
+ quality_str,
+ codecs_available,
+ )
+ selected_info = self._select_best_quality(download_infos, quality_str)
+
+ if not selected_info or not selected_info.direct_link:
+ raise MediaNotFoundError(f"No stream URL available for track {item_id}")
+
+ self.logger.debug(
+ "Stream selected for track %s: codec=%s, bitrate=%s",
+ item_id,
+ getattr(selected_info, "codec", None),
+ getattr(selected_info, "bitrate_in_kbps", None),
+ )
+
+ content_type = self._get_content_type(selected_info.codec)
+ bitrate = selected_info.bitrate_in_kbps or 0
+
+ return StreamDetails(
+ item_id=item_id,
+ provider=self.provider.instance_id,
+ audio_format=AudioFormat(
+ content_type=content_type,
+ bit_rate=bitrate,
+ ),
+ stream_type=StreamType.HTTP,
+ duration=track.duration,
+ path=selected_info.direct_link,
+ can_seek=True,
+ allow_seek=True,
+ )
+
+ def _select_best_quality(
+ self, download_infos: list[Any], preferred_quality: str | None
+ ) -> DownloadInfo | None:
+ """Select the best quality download info.
+
+ :param download_infos: List of DownloadInfo objects.
+ :param preferred_quality: User's preferred quality (e.g. "lossless" or "Lossless (FLAC)").
+ :return: Best matching DownloadInfo or None.
+ """
+ if not download_infos:
+ return None
+
+ # Normalize so we accept "lossless", "Lossless (FLAC)", etc.
+ preferred_normalized = (preferred_quality or "").strip().lower()
+ want_lossless = (
+ QUALITY_LOSSLESS in preferred_normalized or preferred_normalized == QUALITY_LOSSLESS
+ )
+
+ # Sort by bitrate descending
+ sorted_infos = sorted(
+ download_infos,
+ key=lambda x: x.bitrate_in_kbps or 0,
+ reverse=True,
+ )
+
+ # If user wants lossless, prefer flac-mp4 then flac (API formats ~2025)
+ if want_lossless:
+ for codec in ("flac-mp4", "flac"):
+ for info in sorted_infos:
+ if info.codec and info.codec.lower() == codec:
+ return info
+ self.logger.warning(
+ "Lossless (FLAC) requested but no FLAC in API response for this "
+ "track; using best available"
+ )
+
+ # Return highest bitrate
+ return sorted_infos[0] if sorted_infos else None
+
+ def _get_content_type(self, codec: str | None) -> ContentType:
+ """Determine content type from codec string.
+
+ :param codec: Codec string from API.
+ :return: ContentType enum value.
+ """
+ if not codec:
+ return ContentType.UNKNOWN
+
+ codec_lower = codec.lower()
+ if codec_lower in ("flac", "flac-mp4"):
+ return ContentType.FLAC
+ if codec_lower in ("mp3", "mpeg"):
+ return ContentType.MP3
+ if codec_lower == "aac":
+ return ContentType.AAC
+
+ return ContentType.UNKNOWN
--- /dev/null
+"""Tests for KION Music provider."""
--- /dev/null
+# serializer version: 1
+# name: test_parse_album_snapshot[minimal]
+ dict({
+ 'album_type': 'album',
+ 'artists': list([
+ ]),
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '300',
+ 'media_type': 'album',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': None,
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Test Album',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '300',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/album/300',
+ }),
+ ]),
+ 'sort_name': 'test album',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://album/300',
+ 'version': '',
+ 'year': 2020,
+ })
+# ---
+# name: test_parse_artist_snapshot[minimal]
+ dict({
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '100',
+ 'media_type': 'artist',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': None,
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Test Artist',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '100',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/artist/100',
+ }),
+ ]),
+ 'sort_name': 'test artist',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://artist/100',
+ 'version': '',
+ })
+# ---
+# name: test_parse_artist_snapshot[with_cover]
+ dict({
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '200',
+ 'media_type': 'artist',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': list([
+ dict({
+ 'path': 'https://avatars.yandex.net/get-music-content/xxx/yyy/1000x1000',
+ 'provider': 'kion_music_instance',
+ 'remotely_accessible': True,
+ 'type': 'thumb',
+ }),
+ ]),
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Artist With Cover',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '200',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/artist/200',
+ }),
+ ]),
+ 'sort_name': 'artist with cover',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://artist/200',
+ 'version': '',
+ })
+# ---
+# name: test_parse_playlist_snapshot[minimal]
+ dict({
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_editable': True,
+ 'is_playable': True,
+ 'item_id': '12345:3',
+ 'media_type': 'playlist',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': None,
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'My Playlist',
+ 'owner': 'Me',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': True,
+ 'item_id': '12345:3',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/users/12345/playlists/3',
+ }),
+ ]),
+ 'sort_name': 'my playlist',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://playlist/12345:3',
+ 'version': '',
+ })
+# ---
+# name: test_parse_playlist_snapshot[other_user]
+ dict({
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_editable': False,
+ 'is_playable': True,
+ 'item_id': '99999:1',
+ 'media_type': 'playlist',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': 'A shared playlist',
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': None,
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Shared Playlist',
+ 'owner': 'Other User',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': False,
+ 'item_id': '99999:1',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/users/99999/playlists/1',
+ }),
+ ]),
+ 'sort_name': 'shared playlist',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://playlist/99999:1',
+ 'version': '',
+ })
+# ---
+# name: test_parse_track_snapshot[minimal]
+ dict({
+ 'album': None,
+ 'artists': list([
+ ]),
+ 'date_added': None,
+ 'disc_number': 0,
+ 'duration': 180,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '400',
+ 'last_played': 0,
+ 'media_type': 'track',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': None,
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Test Track',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '400',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/track/400',
+ }),
+ ]),
+ 'sort_name': 'test track',
+ 'track_number': 0,
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://track/400',
+ 'version': '',
+ })
+# ---
+# name: test_parse_track_snapshot[with_artist_and_album]
+ dict({
+ 'album': dict({
+ 'album_type': 'album',
+ 'artists': list([
+ ]),
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '20',
+ 'media_type': 'album',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': list([
+ dict({
+ 'path': 'https://avatars.yandex.net/get-music-content/aaa/bbb/1000x1000',
+ 'provider': 'kion_music_instance',
+ 'remotely_accessible': True,
+ 'type': 'thumb',
+ }),
+ ]),
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Track Album',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': False,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '20',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/album/20',
+ }),
+ ]),
+ 'sort_name': 'track album',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://album/20',
+ 'version': '',
+ 'year': None,
+ }),
+ 'artists': list([
+ dict({
+ 'date_added': None,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '10',
+ 'media_type': 'artist',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': None,
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Track Artist',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '10',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/artist/10',
+ }),
+ ]),
+ 'sort_name': 'track artist',
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://artist/10',
+ 'version': '',
+ }),
+ ]),
+ 'date_added': None,
+ 'disc_number': 0,
+ 'duration': 240,
+ 'external_ids': list([
+ ]),
+ 'favorite': False,
+ 'is_playable': True,
+ 'item_id': '500',
+ 'last_played': 0,
+ 'media_type': 'track',
+ 'metadata': dict({
+ 'chapters': None,
+ 'copyright': None,
+ 'description': None,
+ 'explicit': None,
+ 'genres': None,
+ 'grouping': None,
+ 'images': list([
+ dict({
+ 'path': 'https://avatars.yandex.net/get-music-content/aaa/bbb/1000x1000',
+ 'provider': 'kion_music_instance',
+ 'remotely_accessible': True,
+ 'type': 'thumb',
+ }),
+ ]),
+ 'label': None,
+ 'languages': None,
+ 'last_refresh': None,
+ 'links': None,
+ 'lrc_lyrics': None,
+ 'lyrics': None,
+ 'mood': None,
+ 'performers': None,
+ 'popularity': None,
+ 'preview': None,
+ 'release_date': None,
+ 'review': None,
+ 'style': None,
+ }),
+ 'name': 'Track With Album',
+ 'position': None,
+ 'provider': 'kion_music_instance',
+ 'provider_mappings': list([
+ dict({
+ 'audio_format': dict({
+ 'bit_depth': 16,
+ 'bit_rate': 0,
+ 'channels': 2,
+ 'codec_type': '?',
+ 'content_type': '?',
+ 'output_format_str': '?',
+ 'sample_rate': 44100,
+ }),
+ 'available': True,
+ 'details': None,
+ 'in_library': None,
+ 'is_unique': None,
+ 'item_id': '500',
+ 'provider_domain': 'kion_music',
+ 'provider_instance': 'kion_music_instance',
+ 'url': 'https://music.mts.ru/track/500',
+ }),
+ ]),
+ 'sort_name': 'track with album',
+ 'track_number': 0,
+ 'translation_key': None,
+ 'uri': 'kion_music_instance://track/500',
+ 'version': '',
+ })
+# ---
--- /dev/null
+"""Shared fixtures and stubs for KION Music provider tests."""
+
+from __future__ import annotations
+
+import logging
+
+import pytest
+from music_assistant_models.enums import MediaType
+from music_assistant_models.media_items import ItemMapping
+
+
+class ProviderStub:
+ """Minimal provider-like object for parser tests (no Mock).
+
+ Provides the minimal interface needed by parse_* functions.
+ """
+
+ domain = "kion_music"
+ instance_id = "kion_music_instance"
+
+ def __init__(self) -> None:
+ """Initialize stub with minimal client."""
+ self.client = type("ClientStub", (), {"user_id": 12345})()
+
+ def get_item_mapping(self, media_type: MediaType | str, key: str, name: str) -> ItemMapping:
+ """Return ItemMapping for the given media type, key and name."""
+ return ItemMapping(
+ media_type=MediaType(media_type) if isinstance(media_type, str) else media_type,
+ item_id=key,
+ provider=self.instance_id,
+ name=name,
+ )
+
+
+class StreamingProviderStub:
+ """Minimal provider stub for streaming tests (no Mock).
+
+ Provides the minimal interface needed by KionMusicStreamingManager.
+ """
+
+ domain = "kion_music"
+ instance_id = "kion_music_instance"
+ logger = logging.getLogger("kion_music_test_streaming")
+
+ def __init__(self) -> None:
+ """Initialize stub with minimal client."""
+ self.client = type("ClientStub", (), {"user_id": 12345})()
+ self.mass = type("MassStub", (), {})()
+ self._warning_count = 0
+
+ def _count_warning(self, *args: object, **kwargs: object) -> None:
+ """Track warning calls for test assertions."""
+ self._warning_count += 1
+
+
+class TrackingLogger:
+ """Logger that tracks calls for test assertions without using Mock."""
+
+ def __init__(self) -> None:
+ """Initialize with empty call counters."""
+ self._debug_count = 0
+ self._info_count = 0
+ self._warning_count = 0
+ self._error_count = 0
+
+ def debug(self, *args: object, **kwargs: object) -> None:
+ """Track debug calls."""
+ self._debug_count += 1
+
+ def info(self, *args: object, **kwargs: object) -> None:
+ """Track info calls."""
+ self._info_count += 1
+
+ def warning(self, *args: object, **kwargs: object) -> None:
+ """Track warning calls."""
+ self._warning_count += 1
+
+ def error(self, *args: object, **kwargs: object) -> None:
+ """Track error calls."""
+ self._error_count += 1
+
+
+# Minimal client-like object for yandex_music de_json (library requires client, not None)
+DE_JSON_CLIENT = type("ClientStub", (), {"report_unknown_fields": False})()
+
+
+@pytest.fixture
+def provider_stub() -> ProviderStub:
+ """Return a real provider stub (no Mock)."""
+ return ProviderStub()
+
+
+@pytest.fixture
+def streaming_provider_stub() -> StreamingProviderStub:
+ """Return a streaming provider stub (no Mock)."""
+ return StreamingProviderStub()
--- /dev/null
+{
+ "id": 300,
+ "title": "Test Album",
+ "available": true,
+ "artists": [],
+ "type": "album",
+ "year": 2020
+}
--- /dev/null
+{
+ "id": 100,
+ "name": "Test Artist"
+}
--- /dev/null
+{
+ "id": 200,
+ "name": "Artist With Cover",
+ "cover": {
+ "type": "from-og-image",
+ "uri": "avatars.yandex.net/get-music-content/xxx/yyy/%%"
+ }
+}
--- /dev/null
+{
+ "owner": {
+ "uid": 12345,
+ "name": "Me",
+ "login": "me"
+ },
+ "kind": 3,
+ "title": "My Playlist"
+}
--- /dev/null
+{
+ "owner": {
+ "uid": 99999,
+ "name": "Other User",
+ "login": "other_user"
+ },
+ "kind": 1,
+ "title": "Shared Playlist",
+ "description": "A shared playlist"
+}
--- /dev/null
+{
+ "id": 400,
+ "title": "Test Track",
+ "available": true,
+ "duration_ms": 180000,
+ "artists": [],
+ "albums": []
+}
--- /dev/null
+{
+ "id": 500,
+ "title": "Track With Album",
+ "available": true,
+ "duration_ms": 240000,
+ "artists": [
+ {
+ "id": 10,
+ "name": "Track Artist"
+ }
+ ],
+ "albums": [
+ {
+ "id": 20,
+ "title": "Track Album",
+ "cover_uri": "avatars.yandex.net/get-music-content/aaa/bbb/%%"
+ }
+ ]
+}
--- /dev/null
+"""Unit tests for the KION Music API client."""
+
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+from music_assistant_models.errors import ResourceTemporarilyUnavailable
+from yandex_music.exceptions import NetworkError
+
+from music_assistant.providers.kion_music.api_client import KION_BASE_URL, KionMusicClient
+
+
+@pytest.fixture
+def client() -> KionMusicClient:
+ """Return a KionMusicClient with a fake token."""
+ return KionMusicClient("fake_token")
+
+
+async def test_connect_sets_base_url(client: KionMusicClient) -> None:
+ """Verify connect() passes KION_BASE_URL to ClientAsync."""
+ with mock.patch("music_assistant.providers.kion_music.api_client.ClientAsync") as mock_cls:
+ mock_instance = mock.AsyncMock()
+ mock_instance.me = type("Me", (), {"account": type("Account", (), {"uid": 42})()})()
+ mock_instance.init = mock.AsyncMock(return_value=mock_instance)
+ mock_cls.return_value = mock_instance
+
+ result = await client.connect()
+
+ assert result is True
+ mock_cls.assert_called_once_with("fake_token", base_url=KION_BASE_URL)
+
+
+async def test_get_liked_albums_batching(client: KionMusicClient) -> None:
+ """Test that liked albums are fetched in batches of 50."""
+ mock_client = mock.AsyncMock()
+ client._client = mock_client
+ client._user_id = 1
+
+ # Create 60 likes so we get 2 batches
+ likes = []
+ for i in range(60):
+ like = type("Like", (), {"album": type("Album", (), {"id": i + 1})()})()
+ likes.append(like)
+
+ mock_client.users_likes_albums = mock.AsyncMock(return_value=likes)
+
+ batch1 = [type("Album", (), {"id": i + 1})() for i in range(50)]
+ batch2 = [type("Album", (), {"id": i + 51})() for i in range(10)]
+ mock_client.albums = mock.AsyncMock(side_effect=[batch1, batch2])
+
+ result = await client.get_liked_albums()
+
+ assert len(result) == 60
+ assert mock_client.albums.call_count == 2
+
+
+async def test_get_liked_albums_batch_fallback_on_network_error(
+ client: KionMusicClient,
+) -> None:
+ """Test fallback to minimal data when batch fetch fails."""
+ mock_client = mock.AsyncMock()
+ client._client = mock_client
+ client._user_id = 1
+
+ album_obj = type("Album", (), {"id": 1})()
+ likes = [type("Like", (), {"album": album_obj})()]
+
+ mock_client.users_likes_albums = mock.AsyncMock(return_value=likes)
+ mock_client.albums = mock.AsyncMock(side_effect=NetworkError("timeout"))
+
+ result = await client.get_liked_albums()
+
+ assert len(result) == 1
+ assert result[0].id == 1
+
+
+async def test_get_tracks_retry_on_network_error_then_success(
+ client: KionMusicClient,
+) -> None:
+ """Test that get_tracks retries once on NetworkError and succeeds."""
+ mock_client = mock.AsyncMock()
+ client._client = mock_client
+ client._user_id = 1
+
+ track = type("Track", (), {"id": 1})()
+ mock_client.tracks = mock.AsyncMock(side_effect=[NetworkError("timeout"), [track]])
+
+ result = await client.get_tracks(["1"])
+
+ assert len(result) == 1
+ assert mock_client.tracks.call_count == 2
+
+
+async def test_get_tracks_retry_on_network_error_both_fail(
+ client: KionMusicClient,
+) -> None:
+ """Test that get_tracks raises ResourceTemporarilyUnavailable when retry fails."""
+ mock_client = mock.AsyncMock()
+ client._client = mock_client
+ client._user_id = 1
+
+ mock_client.tracks = mock.AsyncMock(side_effect=NetworkError("timeout"))
+
+ with pytest.raises(ResourceTemporarilyUnavailable):
+ await client.get_tracks(["1"])
+
+ assert mock_client.tracks.call_count == 2
--- /dev/null
+"""Integration tests for the KION Music provider with in-process Music Assistant."""
+
+from __future__ import annotations
+
+import json
+import pathlib
+from collections.abc import AsyncGenerator
+from typing import TYPE_CHECKING, Any, cast
+from unittest import mock
+
+import pytest
+from music_assistant_models.enums import ContentType, MediaType, StreamType
+from yandex_music import Album as YandexAlbum
+from yandex_music import Artist as YandexArtist
+from yandex_music import Playlist as YandexPlaylist
+from yandex_music import Track as YandexTrack
+
+from music_assistant.mass import MusicAssistant
+from music_assistant.models.music_provider import MusicProvider
+from tests.common import wait_for_sync_completion
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+
+FIXTURES_DIR = pathlib.Path(__file__).parent / "fixtures"
+_DE_JSON_CLIENT = type("ClientStub", (), {"report_unknown_fields": False})()
+
+
+def _load_json(path: pathlib.Path) -> dict[str, Any]:
+ """Load JSON fixture."""
+ with open(path) as f:
+ return cast("dict[str, Any]", json.load(f))
+
+
+def _load_kion_objects() -> tuple[Any, Any, Any, Any]:
+ """Load Artist, Album, Track, Playlist from fixtures for mock client."""
+ artist = YandexArtist.de_json(
+ _load_json(FIXTURES_DIR / "artists" / "minimal.json"), _DE_JSON_CLIENT
+ )
+ album = YandexAlbum.de_json(
+ _load_json(FIXTURES_DIR / "albums" / "minimal.json"), _DE_JSON_CLIENT
+ )
+ track = YandexTrack.de_json(
+ _load_json(FIXTURES_DIR / "tracks" / "minimal.json"), _DE_JSON_CLIENT
+ )
+ playlist = YandexPlaylist.de_json(
+ _load_json(FIXTURES_DIR / "playlists" / "minimal.json"), _DE_JSON_CLIENT
+ )
+ return artist, album, track, playlist
+
+
+def _make_search_result(track: Any, album: Any, artist: Any, playlist: Any) -> Any:
+ """Build a Search-like object with .tracks.results, .albums.results, etc."""
+ return type(
+ "Search",
+ (),
+ {
+ "tracks": type("TracksResult", (), {"results": [track]})(),
+ "albums": type("AlbumsResult", (), {"results": [album]})(),
+ "artists": type("ArtistsResult", (), {"results": [artist]})(),
+ "playlists": type("PlaylistsResult", (), {"results": [playlist]})(),
+ },
+ )()
+
+
+def _make_download_info(
+ codec: str = "mp3",
+ direct_link: str = "https://example.com/kion_track.mp3",
+ bitrate_in_kbps: int = 320,
+) -> Any:
+ """Build DownloadInfo-like object for streaming."""
+ return type(
+ "DownloadInfo",
+ (),
+ {
+ "direct_link": direct_link,
+ "codec": codec,
+ "bitrate_in_kbps": bitrate_in_kbps,
+ },
+ )()
+
+
+@pytest.fixture
+async def kion_music_provider(
+ mass: MusicAssistant,
+) -> AsyncGenerator[ProviderConfig, None]:
+ """Configure KION Music provider with mocked API client and add to mass."""
+ artist, album, track, playlist = _load_kion_objects()
+ search_result = _make_search_result(track, album, artist, playlist)
+ download_info = _make_download_info()
+
+ # Album with volumes for get_album_tracks
+ album_with_volumes = type(
+ "AlbumWithVolumes",
+ (),
+ {
+ "id": album.id,
+ "title": album.title,
+ "volumes": [[track]],
+ "artists": album.artists if hasattr(album, "artists") else [],
+ "year": getattr(album, "year", None),
+ "release_date": getattr(album, "release_date", None),
+ "genre": getattr(album, "genre", None),
+ "cover_uri": getattr(album, "cover_uri", None),
+ "og_image": getattr(album, "og_image", None),
+ "type": getattr(album, "type", "album"),
+ "available": getattr(album, "available", True),
+ },
+ )()
+
+ with mock.patch(
+ "music_assistant.providers.kion_music.provider.KionMusicClient"
+ ) as mock_client_class:
+ mock_client = mock.AsyncMock()
+ mock_client_class.return_value = mock_client
+
+ mock_client.connect = mock.AsyncMock(return_value=True)
+ mock_client.user_id = 12345
+
+ mock_client.get_liked_tracks = mock.AsyncMock(return_value=[])
+ mock_client.get_liked_albums = mock.AsyncMock(return_value=[])
+ mock_client.get_liked_artists = mock.AsyncMock(return_value=[])
+ mock_client.get_user_playlists = mock.AsyncMock(return_value=[playlist])
+
+ mock_client.search = mock.AsyncMock(return_value=search_result)
+ mock_client.get_track = mock.AsyncMock(return_value=track)
+ mock_client.get_tracks = mock.AsyncMock(return_value=[track])
+ mock_client.get_album = mock.AsyncMock(return_value=album)
+ mock_client.get_album_with_tracks = mock.AsyncMock(return_value=album_with_volumes)
+ mock_client.get_artist = mock.AsyncMock(return_value=artist)
+ mock_client.get_artist_albums = mock.AsyncMock(return_value=[album])
+ mock_client.get_artist_tracks = mock.AsyncMock(return_value=[track])
+ mock_client.get_playlist = mock.AsyncMock(return_value=playlist)
+ mock_client.get_track_download_info = mock.AsyncMock(return_value=[download_info])
+
+ async with wait_for_sync_completion(mass):
+ config = await mass.config.save_provider_config(
+ "kion_music",
+ {"token": "mock_kion_token", "quality": "high"},
+ )
+ await mass.music.start_sync()
+
+ yield config
+
+
+@pytest.fixture
+async def kion_music_provider_lossless(
+ mass: MusicAssistant,
+) -> AsyncGenerator[ProviderConfig, None]:
+ """Configure KION Music with quality=lossless and mock returning MP3 + FLAC."""
+ artist, album, track, playlist = _load_kion_objects()
+ search_result = _make_search_result(track, album, artist, playlist)
+ mp3_info = _make_download_info(
+ codec="mp3",
+ direct_link="https://example.com/kion_track.mp3",
+ bitrate_in_kbps=320,
+ )
+ flac_info = _make_download_info(
+ codec="flac",
+ direct_link="https://example.com/kion_track.flac",
+ bitrate_in_kbps=0,
+ )
+ download_infos = [mp3_info, flac_info]
+
+ album_with_volumes = type(
+ "AlbumWithVolumes",
+ (),
+ {
+ "id": album.id,
+ "title": album.title,
+ "volumes": [[track]],
+ "artists": album.artists if hasattr(album, "artists") else [],
+ "year": getattr(album, "year", None),
+ "release_date": getattr(album, "release_date", None),
+ "genre": getattr(album, "genre", None),
+ "cover_uri": getattr(album, "cover_uri", None),
+ "og_image": getattr(album, "og_image", None),
+ "type": getattr(album, "type", "album"),
+ "available": getattr(album, "available", True),
+ },
+ )()
+
+ with mock.patch(
+ "music_assistant.providers.kion_music.provider.KionMusicClient"
+ ) as mock_client_class:
+ mock_client = mock.AsyncMock()
+ mock_client_class.return_value = mock_client
+
+ mock_client.connect = mock.AsyncMock(return_value=True)
+ mock_client.user_id = 12345
+
+ mock_client.get_liked_tracks = mock.AsyncMock(return_value=[])
+ mock_client.get_liked_albums = mock.AsyncMock(return_value=[])
+ mock_client.get_liked_artists = mock.AsyncMock(return_value=[])
+ mock_client.get_user_playlists = mock.AsyncMock(return_value=[playlist])
+
+ mock_client.search = mock.AsyncMock(return_value=search_result)
+ mock_client.get_track = mock.AsyncMock(return_value=track)
+ mock_client.get_tracks = mock.AsyncMock(return_value=[track])
+ mock_client.get_album = mock.AsyncMock(return_value=album)
+ mock_client.get_album_with_tracks = mock.AsyncMock(return_value=album_with_volumes)
+ mock_client.get_artist = mock.AsyncMock(return_value=artist)
+ mock_client.get_artist_albums = mock.AsyncMock(return_value=[album])
+ mock_client.get_artist_tracks = mock.AsyncMock(return_value=[track])
+ mock_client.get_playlist = mock.AsyncMock(return_value=playlist)
+ mock_client.get_track_file_info_lossless = mock.AsyncMock(return_value=None)
+ mock_client.get_track_download_info = mock.AsyncMock(return_value=download_infos)
+
+ async with wait_for_sync_completion(mass):
+ config = await mass.config.save_provider_config(
+ "kion_music",
+ {"token": "mock_kion_token", "quality": "lossless"},
+ )
+ await mass.music.start_sync()
+
+ yield config
+
+
+def _get_kion_provider(mass: MusicAssistant) -> MusicProvider | None:
+ """Get KION Music provider instance from mass."""
+ for provider in mass.music.providers:
+ if provider.domain == "kion_music":
+ return provider
+ return None
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_registration_and_sync(mass: MusicAssistant) -> None:
+ """Test that provider is registered and sync completes."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ assert prov.domain == "kion_music"
+ assert prov.instance_id
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_search(mass: MusicAssistant) -> None:
+ """Test search returns results from kion_music."""
+ results = await mass.music.search("test query", [MediaType.TRACK], limit=5)
+ kion_tracks = [t for t in results.tracks if t.provider and "kion_music" in t.provider]
+ assert len(kion_tracks) >= 0
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_artist(mass: MusicAssistant) -> None:
+ """Test getting artist by id."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ artist = await prov.get_artist("100")
+ assert artist is not None
+ assert artist.name
+ assert artist.provider == prov.instance_id
+ assert artist.item_id == "100"
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_album(mass: MusicAssistant) -> None:
+ """Test getting album by id."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ album = await prov.get_album("300")
+ assert album is not None
+ assert album.name
+ assert album.provider == prov.instance_id
+ assert album.item_id == "300"
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_track(mass: MusicAssistant) -> None:
+ """Test getting track by id."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ track = await prov.get_track("400")
+ assert track is not None
+ assert track.name
+ assert track.provider == prov.instance_id
+ assert track.item_id == "400"
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_album_tracks(mass: MusicAssistant) -> None:
+ """Test getting album tracks."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ tracks = await prov.get_album_tracks("300")
+ assert isinstance(tracks, list)
+ assert len(tracks) >= 0
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_playlist_tracks(mass: MusicAssistant) -> None:
+ """Test getting playlist tracks."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ tracks = await prov.get_playlist_tracks("12345:3", page=0)
+ assert isinstance(tracks, list)
+ assert len(tracks) >= 0
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_playlist_tracks_page_gt_zero_returns_empty(mass: MusicAssistant) -> None:
+ """Test that page > 0 returns empty list (no server-side pagination)."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ tracks = await prov.get_playlist_tracks("12345:3", page=1)
+ assert tracks == []
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_get_stream_details(mass: MusicAssistant) -> None:
+ """Test stream details retrieval."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ stream_details = await prov.get_stream_details("400", MediaType.TRACK)
+ assert stream_details is not None
+ assert stream_details.stream_type == StreamType.HTTP
+ assert stream_details.path == "https://example.com/kion_track.mp3"
+
+
+@pytest.mark.usefixtures("kion_music_provider_lossless")
+async def test_get_stream_details_returns_flac_when_lossless_selected(
+ mass: MusicAssistant,
+) -> None:
+ """When quality=lossless and API returns MP3+FLAC, stream details use FLAC."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ stream_details = await prov.get_stream_details("400", MediaType.TRACK)
+ assert stream_details is not None
+ assert stream_details.audio_format.content_type == ContentType.FLAC
+ assert stream_details.path == "https://example.com/kion_track.flac"
+
+
+@pytest.mark.usefixtures("kion_music_provider")
+async def test_library_items(mass: MusicAssistant) -> None:
+ """Test library artists, albums, tracks, playlists."""
+ prov = _get_kion_provider(mass)
+ assert prov is not None
+ instance_id = prov.instance_id
+
+ artists = await mass.music.artists.library_items()
+ kion_artists = [a for a in artists if a.provider == instance_id]
+ assert len(kion_artists) >= 0
+
+ albums = await mass.music.albums.library_items()
+ kion_albums = [a for a in albums if a.provider == instance_id]
+ assert len(kion_albums) >= 0
+
+ tracks = await mass.music.tracks.library_items()
+ kion_tracks = [t for t in tracks if t.provider == instance_id]
+ assert len(kion_tracks) >= 0
+
+ playlists = await mass.music.playlists.library_items()
+ kion_playlists = [p for p in playlists if p.provider == instance_id]
+ assert len(kion_playlists) >= 0
--- /dev/null
+"""Test we can parse KION Music API objects into Music Assistant models."""
+
+from __future__ import annotations
+
+import json
+import pathlib
+from typing import TYPE_CHECKING, Any, cast
+
+import pytest
+from yandex_music import Album as YandexAlbum
+from yandex_music import Artist as YandexArtist
+from yandex_music import Playlist as YandexPlaylist
+from yandex_music import Track as YandexTrack
+
+from music_assistant.providers.kion_music.parsers import (
+ parse_album,
+ parse_artist,
+ parse_playlist,
+ parse_track,
+)
+from music_assistant.providers.kion_music.provider import KionMusicProvider
+
+from .conftest import DE_JSON_CLIENT
+
+if TYPE_CHECKING:
+ from syrupy.assertion import SnapshotAssertion
+
+ from .conftest import ProviderStub
+
+FIXTURES_DIR = pathlib.Path(__file__).parent / "fixtures"
+ARTIST_FIXTURES = list(FIXTURES_DIR.glob("artists/*.json"))
+ALBUM_FIXTURES = list(FIXTURES_DIR.glob("albums/*.json"))
+TRACK_FIXTURES = list(FIXTURES_DIR.glob("tracks/*.json"))
+PLAYLIST_FIXTURES = list(FIXTURES_DIR.glob("playlists/*.json"))
+
+
+def _load_json(path: pathlib.Path) -> dict[str, Any]:
+ """Load JSON fixture."""
+ with open(path) as f:
+ return cast("dict[str, Any]", json.load(f))
+
+
+def _artist_from_fixture(path: pathlib.Path) -> YandexArtist | None:
+ """Deserialize Artist from fixture JSON."""
+ data = _load_json(path)
+ return YandexArtist.de_json(data, DE_JSON_CLIENT)
+
+
+def _album_from_fixture(path: pathlib.Path) -> YandexAlbum | None:
+ """Deserialize Album from fixture JSON."""
+ data = _load_json(path)
+ return YandexAlbum.de_json(data, DE_JSON_CLIENT)
+
+
+def _track_from_fixture(path: pathlib.Path) -> YandexTrack | None:
+ """Deserialize Track from fixture JSON."""
+ data = _load_json(path)
+ return YandexTrack.de_json(data, DE_JSON_CLIENT)
+
+
+def _playlist_from_fixture(path: pathlib.Path) -> YandexPlaylist | None:
+ """Deserialize Playlist from fixture JSON."""
+ data = _load_json(path)
+ return YandexPlaylist.de_json(data, DE_JSON_CLIENT)
+
+
+# provider_stub fixture is provided by conftest.py
+
+
+@pytest.mark.parametrize("example", ARTIST_FIXTURES, ids=lambda val: val.stem)
+def test_parse_artist(example: pathlib.Path, provider_stub: ProviderStub) -> None:
+ """Test we can parse artists from fixture JSON."""
+ artist_obj = _artist_from_fixture(example)
+ assert artist_obj is not None
+ result = parse_artist(cast("KionMusicProvider", provider_stub), artist_obj)
+ assert result.item_id == str(artist_obj.id)
+ assert result.name == (artist_obj.name or "Unknown Artist")
+ assert result.provider == provider_stub.instance_id
+ assert len(result.provider_mappings) == 1
+ mapping = next(iter(result.provider_mappings))
+ assert f"music.mts.ru/artist/{artist_obj.id}" in (mapping.url or "")
+
+
+def test_parse_artist_with_cover(provider_stub: ProviderStub) -> None:
+ """Test parsing artist with cover image."""
+ path = FIXTURES_DIR / "artists" / "with_cover.json"
+ artist_obj = _artist_from_fixture(path)
+ assert artist_obj is not None
+ result = parse_artist(cast("KionMusicProvider", provider_stub), artist_obj)
+ assert result.item_id == "200"
+ assert result.name == "Artist With Cover"
+ if artist_obj.cover and artist_obj.cover.uri:
+ assert result.metadata.images is not None
+ assert len(result.metadata.images) == 1
+ assert "avatars.yandex.net" in (result.metadata.images[0].path or "")
+
+
+@pytest.mark.parametrize("example", ALBUM_FIXTURES, ids=lambda val: val.stem)
+def test_parse_album(example: pathlib.Path, provider_stub: ProviderStub) -> None:
+ """Test we can parse albums from fixture JSON."""
+ album_obj = _album_from_fixture(example)
+ assert album_obj is not None
+ result = parse_album(cast("KionMusicProvider", provider_stub), album_obj)
+ assert result.item_id == str(album_obj.id)
+ assert result.name
+ assert result.provider == provider_stub.instance_id
+ mapping = next(iter(result.provider_mappings))
+ assert f"music.mts.ru/album/{album_obj.id}" in (mapping.url or "")
+ if album_obj.year:
+ assert result.year == album_obj.year
+
+
+@pytest.mark.parametrize("example", TRACK_FIXTURES, ids=lambda val: val.stem)
+def test_parse_track(example: pathlib.Path, provider_stub: ProviderStub) -> None:
+ """Test we can parse tracks from fixture JSON."""
+ track_obj = _track_from_fixture(example)
+ assert track_obj is not None
+ result = parse_track(cast("KionMusicProvider", provider_stub), track_obj)
+ assert result.item_id == str(track_obj.id)
+ assert result.name
+ assert result.duration == (track_obj.duration_ms or 0) // 1000
+ mapping = next(iter(result.provider_mappings))
+ assert f"music.mts.ru/track/{track_obj.id}" in (mapping.url or "")
+
+
+def test_parse_track_with_artist_and_album(provider_stub: ProviderStub) -> None:
+ """Test parsing track with artist and album."""
+ path = FIXTURES_DIR / "tracks" / "with_artist_and_album.json"
+ track_obj = _track_from_fixture(path)
+ assert track_obj is not None
+ result = parse_track(cast("KionMusicProvider", provider_stub), track_obj)
+ assert result.item_id == "500"
+ if track_obj.artists:
+ assert len(result.artists) >= 1
+ assert result.artists[0].name == "Track Artist"
+ if track_obj.albums:
+ assert result.album is not None
+ assert result.album.item_id == "20"
+ assert result.album.name == "Track Album"
+
+
+@pytest.mark.parametrize("example", PLAYLIST_FIXTURES, ids=lambda val: val.stem)
+def test_parse_playlist(example: pathlib.Path, provider_stub: ProviderStub) -> None:
+ """Test we can parse playlists from fixture JSON."""
+ playlist_obj = _playlist_from_fixture(example)
+ assert playlist_obj is not None
+ result = parse_playlist(cast("KionMusicProvider", provider_stub), playlist_obj)
+ owner_id = (
+ str(playlist_obj.owner.uid) if playlist_obj.owner else str(provider_stub.client.user_id)
+ )
+ kind = str(playlist_obj.kind)
+ assert result.item_id == f"{owner_id}:{kind}"
+ assert result.name == (playlist_obj.title or "Unknown Playlist")
+ mapping = next(iter(result.provider_mappings))
+ assert f"music.mts.ru/users/{owner_id}/playlists/{kind}" in (mapping.url or "")
+
+
+def test_parse_playlist_editable(provider_stub: ProviderStub) -> None:
+ """Test parsing own playlist (editable)."""
+ path = FIXTURES_DIR / "playlists" / "minimal.json"
+ playlist_obj = _playlist_from_fixture(path)
+ assert playlist_obj is not None
+ result = parse_playlist(cast("KionMusicProvider", provider_stub), playlist_obj)
+ assert result.owner == "Me"
+ assert result.is_editable is True
+
+
+def test_parse_playlist_other_user(provider_stub: ProviderStub) -> None:
+ """Test parsing playlist owned by another user."""
+ path = FIXTURES_DIR / "playlists" / "other_user.json"
+ playlist_obj = _playlist_from_fixture(path)
+ assert playlist_obj is not None
+ result = parse_playlist(cast("KionMusicProvider", provider_stub), playlist_obj)
+ assert result.item_id == "99999:1"
+ assert result.name == "Shared Playlist"
+ assert result.owner == "Other User"
+ assert result.is_editable is False
+ assert result.metadata.description == "A shared playlist"
+
+
+# --- Snapshot tests ---
+
+
+def _sort_for_snapshot(parsed: dict[str, Any]) -> dict[str, Any]:
+ """Sort lists in parsed dict for deterministic snapshot comparison."""
+ if parsed.get("external_ids"):
+ parsed["external_ids"] = sorted(parsed["external_ids"])
+ if "metadata" in parsed and isinstance(parsed["metadata"], dict):
+ if parsed["metadata"].get("genres"):
+ parsed["metadata"]["genres"] = sorted(parsed["metadata"]["genres"])
+ return parsed
+
+
+@pytest.mark.parametrize("example", ARTIST_FIXTURES, ids=lambda val: val.stem)
+def test_parse_artist_snapshot(
+ example: pathlib.Path,
+ provider_stub: ProviderStub,
+ snapshot: SnapshotAssertion,
+) -> None:
+ """Snapshot test for artist parsing."""
+ artist_obj = _artist_from_fixture(example)
+ assert artist_obj is not None
+ result = parse_artist(cast("KionMusicProvider", provider_stub), artist_obj)
+ parsed = _sort_for_snapshot(result.to_dict())
+ assert snapshot == parsed
+
+
+@pytest.mark.parametrize("example", ALBUM_FIXTURES, ids=lambda val: val.stem)
+def test_parse_album_snapshot(
+ example: pathlib.Path,
+ provider_stub: ProviderStub,
+ snapshot: SnapshotAssertion,
+) -> None:
+ """Snapshot test for album parsing."""
+ album_obj = _album_from_fixture(example)
+ assert album_obj is not None
+ result = parse_album(cast("KionMusicProvider", provider_stub), album_obj)
+ parsed = _sort_for_snapshot(result.to_dict())
+ assert snapshot == parsed
+
+
+@pytest.mark.parametrize("example", TRACK_FIXTURES, ids=lambda val: val.stem)
+def test_parse_track_snapshot(
+ example: pathlib.Path,
+ provider_stub: ProviderStub,
+ snapshot: SnapshotAssertion,
+) -> None:
+ """Snapshot test for track parsing."""
+ track_obj = _track_from_fixture(example)
+ assert track_obj is not None
+ result = parse_track(cast("KionMusicProvider", provider_stub), track_obj)
+ parsed = _sort_for_snapshot(result.to_dict())
+ assert snapshot == parsed
+
+
+@pytest.mark.parametrize("example", PLAYLIST_FIXTURES, ids=lambda val: val.stem)
+def test_parse_playlist_snapshot(
+ example: pathlib.Path,
+ provider_stub: ProviderStub,
+ snapshot: SnapshotAssertion,
+) -> None:
+ """Snapshot test for playlist parsing."""
+ playlist_obj = _playlist_from_fixture(example)
+ assert playlist_obj is not None
+ result = parse_playlist(cast("KionMusicProvider", provider_stub), playlist_obj)
+ parsed = _sort_for_snapshot(result.to_dict())
+ assert snapshot == parsed