from .constants import (
CONF_ACTION_CLEAR_AUTH,
+ CONF_BASE_URL,
CONF_QUALITY,
CONF_TOKEN,
+ DEFAULT_BASE_URL,
QUALITY_HIGH,
QUALITY_LOSSLESS,
)
ProviderFeature.LIBRARY_ALBUMS_EDIT,
ProviderFeature.LIBRARY_TRACKS_EDIT,
ProviderFeature.BROWSE,
+ ProviderFeature.SIMILAR_TRACKS,
+ ProviderFeature.RECOMMENDATIONS,
}
],
default_value=QUALITY_HIGH,
),
+ ConfigEntry(
+ key=CONF_BASE_URL,
+ type=ConfigEntryType.STRING,
+ label="API Base URL",
+ description="API endpoint base URL. "
+ "Only change if KION Music changes their API endpoint. "
+ "Default: https://music.mts.ru/ya_proxy_api",
+ default_value=DEFAULT_BASE_URL,
+ required=False,
+ advanced=True,
+ ),
)
-"""API client wrapper for KION Music (MTS Music)."""
+"""API client wrapper for KION Music."""
from __future__ import annotations
import logging
+from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, cast
from music_assistant_models.errors import (
if TYPE_CHECKING:
from yandex_music import DownloadInfo
-from .constants import DEFAULT_LIMIT
+from .constants import DEFAULT_BASE_URL, DEFAULT_LIMIT, ROTOR_FEEDBACK_FROM, ROTOR_STATION_MY_MIX
# get-file-info with quality=lossless returns FLAC; default /tracks/.../download-info often does not
-# Prefer flac-mp4/aac-mp4
+# Prefer flac-mp4/aac-mp4 (KION API moved to these formats around 2025)
GET_FILE_INFO_CODECS = "flac-mp4,flac,aac-mp4,aac,he-aac,mp3,he-aac-mp4"
-# get-file-info: same host as library (all requests go through one API)
-KION_BASE_URL = "https://music.mts.ru/ya_api"
LOGGER = logging.getLogger(__name__)
class KionMusicClient:
- """Wrapper around yandex-music-api ClientAsync for KION Music."""
+ """Wrapper around yandex-music-api ClientAsync."""
- def __init__(self, token: str) -> None:
+ def __init__(self, token: str, base_url: str | None = None) -> None:
"""Initialize the KION Music client.
:param token: KION Music OAuth token.
+ :param base_url: Optional API base URL (defaults to KION Music API).
"""
self._token = token
+ self._base_url = base_url or DEFAULT_BASE_URL
self._client: ClientAsync | None = None
self._user_id: int | None = None
:raises LoginFailed: If the token is invalid.
"""
try:
- self._client = await ClientAsync(self._token, base_url=KION_BASE_URL).init()
+ self._client = await ClientAsync(self._token, base_url=self._base_url).init()
if self._client.me is None or self._client.me.account is None:
raise LoginFailed("Failed to get account info")
self._user_id = self._client.me.account.uid
raise ProviderUnavailableError("Client not connected, call connect() first")
return self._client
+ def _is_connection_error(self, err: Exception) -> bool:
+ """Return True if the exception indicates a connection or server drop."""
+ if isinstance(err, NetworkError):
+ return True
+ msg = str(err).lower()
+ return "disconnect" in msg or "connection" in msg or "timeout" in msg
+
+ async def _reconnect(self) -> None:
+ """Disconnect and connect again to recover from Server disconnected / connection errors."""
+ await self.disconnect()
+ await self.connect()
+
+ # Rotor (radio station) methods
+
+ async def get_rotor_station_tracks(
+ self,
+ station_id: str,
+ queue: str | int | None = None,
+ ) -> tuple[list[YandexTrack], str | None]:
+ """Get tracks from a rotor station (e.g. user:onyourwave or track:1234).
+
+ :param station_id: Station ID (e.g. ROTOR_STATION_MY_MIX or "track:1234" for similar).
+ :param queue: Optional track ID for pagination (first track of previous batch).
+ :return: Tuple of (list of track objects, batch_id for feedback or None).
+ """
+ for attempt in range(2):
+ client = self._ensure_connected()
+ try:
+ result = await client.rotor_station_tracks(station_id, settings2=True, queue=queue)
+ if not result or not result.sequence:
+ return ([], result.batch_id if result else None)
+ track_ids = []
+ for seq in result.sequence:
+ if seq.track is None:
+ continue
+ tid = getattr(seq.track, "id", None) or getattr(seq.track, "track_id", None)
+ if tid is not None:
+ track_ids.append(str(tid))
+ if not track_ids:
+ return ([], result.batch_id if result else None)
+ full_tracks = await self.get_tracks(track_ids)
+ order_map = {str(t.id): t for t in full_tracks if hasattr(t, "id") and t.id}
+ ordered = [order_map[tid] for tid in track_ids if tid in order_map]
+ return (ordered, result.batch_id if result else None)
+ except BadRequestError as err:
+ LOGGER.warning("Error fetching rotor station %s tracks: %s", station_id, err)
+ return ([], None)
+ except (NetworkError, Exception) as err:
+ if attempt == 0 and self._is_connection_error(err):
+ LOGGER.warning(
+ "Connection error fetching rotor tracks, reconnecting: %s",
+ err,
+ )
+ try:
+ await self._reconnect()
+ except Exception as recon_err:
+ LOGGER.warning("Reconnect failed: %s", recon_err)
+ return ([], None)
+ else:
+ LOGGER.warning("Error fetching rotor station tracks: %s", err)
+ return ([], None)
+ return ([], None)
+
+ async def get_my_mix_tracks(
+ self, queue: str | int | None = None
+ ) -> tuple[list[YandexTrack], str | None]:
+ """Get tracks from the My Mix (Мой Микс) radio station.
+
+ :param queue: Optional track ID of the last track from the previous batch (API uses it for
+ pagination; do not pass batch_id).
+ :return: Tuple of (list of track objects, batch_id for feedback).
+ """
+ return await self.get_rotor_station_tracks(ROTOR_STATION_MY_MIX, queue=queue)
+
+ async def send_rotor_station_feedback(
+ self,
+ station_id: str,
+ feedback_type: str,
+ *,
+ batch_id: str | None = None,
+ track_id: str | None = None,
+ total_played_seconds: int | None = None,
+ ) -> bool:
+ """Send rotor station feedback for My Mix recommendations.
+
+ Used to report radioStarted, trackStarted, trackFinished, skip so that
+ the service can improve subsequent recommendations.
+
+ :param station_id: Station ID (e.g. ROTOR_STATION_MY_MIX).
+ :param feedback_type: One of 'radioStarted', 'trackStarted', 'trackFinished', 'skip'.
+ :param batch_id: Optional batch ID from the last get_my_mix_tracks response.
+ :param track_id: Track ID (required for trackStarted, trackFinished, skip).
+ :param total_played_seconds: Seconds played (for trackFinished, skip).
+ :return: True if the request succeeded.
+ """
+ client = self._ensure_connected()
+ payload: dict[str, Any] = {
+ "type": feedback_type,
+ "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
+ }
+ if feedback_type == "radioStarted":
+ payload["from"] = ROTOR_FEEDBACK_FROM
+ if track_id is not None:
+ payload["trackId"] = track_id
+ if total_played_seconds is not None:
+ payload["totalPlayedSeconds"] = total_played_seconds
+ if batch_id is not None:
+ payload["batchId"] = batch_id
+
+ url = f"{client.base_url}/rotor/station/{station_id}/feedback"
+ for attempt in range(2):
+ client = self._ensure_connected()
+ try:
+ await client.request.post(url, payload)
+ return True
+ except BadRequestError as err:
+ LOGGER.debug("Rotor feedback %s failed: %s", feedback_type, err)
+ return False
+ except (NetworkError, Exception) as err:
+ if attempt == 0 and self._is_connection_error(err):
+ LOGGER.warning(
+ "Connection error on rotor feedback %s, reconnecting: %s",
+ feedback_type,
+ err,
+ )
+ try:
+ await self._reconnect()
+ except Exception as recon_err:
+ LOGGER.debug("Reconnect failed: %s", recon_err)
+ return False
+ else:
+ LOGGER.debug("Rotor feedback %s failed: %s", feedback_type, err)
+ return False
+ return False
+
# Library methods
async def get_liked_tracks(self) -> list[TrackShort]:
LOGGER.error("Error fetching liked tracks: %s", err)
raise ResourceTemporarilyUnavailable("Failed to fetch liked tracks") from err
- async def get_liked_albums(self) -> list[YandexAlbum]:
+ async def get_liked_albums(self, batch_size: int = 50) -> list[YandexAlbum]:
"""Get user's liked albums with full details (including cover art).
The users_likes_albums endpoint returns minimal album data without
if not album_ids:
return []
# Fetch full album details in batches to get cover_uri and other metadata
- batch_size = 50
+ # batch_size is now a parameter with default 50
full_albums: list[YandexAlbum] = []
for i in range(0, len(album_ids), batch_size):
batch = album_ids[i : i + batch_size]
:param user_id: User ID (owner of the playlist).
:param playlist_id: Playlist ID (kind).
:return: Playlist object or None if not found.
+ :raises ResourceTemporarilyUnavailable: On network errors.
"""
client = self._ensure_connected()
try:
if isinstance(result, list):
return result[0] if result else None
return result
- except (BadRequestError, NetworkError) as err:
+ except NetworkError as err:
+ LOGGER.warning("Network error fetching playlist %s/%s: %s", user_id, playlist_id, err)
+ raise ResourceTemporarilyUnavailable("Failed to fetch playlist") from err
+ except BadRequestError as err:
LOGGER.error("Error fetching playlist %s/%s: %s", user_id, playlist_id, err)
return None
The /tracks/{id}/download-info endpoint often returns only MP3; get-file-info
with quality=lossless and codecs=flac,... returns FLAC when available.
+ Includes retry with reconnect on transient connection errors so that a
+ momentary disconnect does not silently fall back to lossy quality.
+
:param track_id: Track ID.
:return: Parsed downloadInfo dict (url, codec, urls, ...) or None on error.
"""
- client = self._ensure_connected()
- sign = get_sign_request(track_id)
- base_params = {
- "ts": sign.timestamp,
- "trackId": track_id,
- "quality": "lossless",
- "codecs": GET_FILE_INFO_CODECS,
- "sign": sign.value,
- }
def _parse_file_info_result(raw: dict[str, Any] | None) -> dict[str, Any] | None:
if not raw or not isinstance(raw, dict):
return None
return cast("dict[str, Any]", download_info)
- url = f"{KION_BASE_URL}/get-file-info"
- params_encraw = {**base_params, "transports": "encraw"}
- try:
- result = await client._request.get(url, params=params_encraw)
- return _parse_file_info_result(result)
- except (BadRequestError, NetworkError) as err:
- LOGGER.debug(
- "get-file-info lossless for track %s: %s %s",
- track_id,
- type(err).__name__,
- getattr(err, "message", str(err)) or repr(err),
- )
- return None
- except UnauthorizedError as err:
- LOGGER.debug(
- "get-file-info lossless for track %s (transports=encraw): %s %s",
- track_id,
- type(err).__name__,
- getattr(err, "message", str(err)) or repr(err),
- )
- LOGGER.debug(
- "If you have KION Music Plus and this track has lossless, "
- "try a token from the web client (music.mts.ru)."
- )
- params_raw = {**base_params, "transports": "raw"}
+ for attempt in range(2):
+ client = self._ensure_connected()
+ sign = get_sign_request(track_id)
+ base_params = {
+ "ts": sign.timestamp,
+ "trackId": track_id,
+ "quality": "lossless",
+ "codecs": GET_FILE_INFO_CODECS,
+ "sign": sign.value,
+ }
+
+ url = f"{client.base_url}/get-file-info"
+ params_encraw = {**base_params, "transports": "encraw"}
try:
- result = await client._request.get(url, params=params_raw)
+ result = await client.request.get(url, params=params_encraw)
return _parse_file_info_result(result)
- except (BadRequestError, NetworkError, UnauthorizedError) as retry_err:
+ except UnauthorizedError as err:
+ LOGGER.debug(
+ "get-file-info lossless for track %s (transports=encraw): %s %s",
+ track_id,
+ type(err).__name__,
+ getattr(err, "message", str(err)) or repr(err),
+ )
+ LOGGER.debug(
+ "If you have KION Music Plus and this track has lossless, "
+ "try a token from the web client (music.mts.ru)."
+ )
+ params_raw = {**base_params, "transports": "raw"}
+ try:
+ result = await client.request.get(url, params=params_raw)
+ return _parse_file_info_result(result)
+ except (BadRequestError, NetworkError, UnauthorizedError) as retry_err:
+ LOGGER.debug(
+ "get-file-info lossless for track %s (transports=raw): %s %s",
+ track_id,
+ type(retry_err).__name__,
+ getattr(retry_err, "message", str(retry_err)) or repr(retry_err),
+ )
+ return None
+ except BadRequestError as err:
LOGGER.debug(
- "get-file-info lossless for track %s (transports=raw): %s %s",
+ "get-file-info lossless for track %s: %s %s",
track_id,
- type(retry_err).__name__,
- getattr(retry_err, "message", str(retry_err)) or repr(retry_err),
+ type(err).__name__,
+ getattr(err, "message", str(err)) or repr(err),
)
return None
+ except (NetworkError, Exception) as err:
+ if attempt == 0 and self._is_connection_error(err):
+ LOGGER.warning(
+ "Connection error on get-file-info lossless for track %s, reconnecting: %s",
+ track_id,
+ err,
+ )
+ try:
+ await self._reconnect()
+ except Exception as recon_err:
+ LOGGER.debug("Reconnect failed: %s", recon_err)
+ return None
+ else:
+ LOGGER.debug(
+ "get-file-info lossless for track %s: %s %s",
+ track_id,
+ type(err).__name__,
+ getattr(err, "message", str(err)) or repr(err),
+ )
+ return None
+ return None
# Library modifications
# Configuration Keys
CONF_TOKEN = "token"
CONF_QUALITY = "quality"
+CONF_BASE_URL = "base_url"
# Actions
CONF_ACTION_AUTH = "auth"
# API defaults
DEFAULT_LIMIT: Final[int] = 50
+DEFAULT_BASE_URL: Final[str] = "https://music.mts.ru/ya_proxy_api"
# Quality options
QUALITY_HIGH = "high"
QUALITY_LOSSLESS = "lossless"
+# Default tuning values for My Mix / browse / discovery behaviour
+MY_MIX_MAX_TRACKS: Final[int] = 150
+MY_MIX_BATCH_SIZE: Final[int] = 3
+TRACK_BATCH_SIZE: Final[int] = 50
+DISCOVERY_INITIAL_TRACKS: Final[int] = 20
+BROWSE_INITIAL_TRACKS: Final[int] = 15
+
# Image sizes
IMAGE_SIZE_SMALL = "200x200"
IMAGE_SIZE_MEDIUM = "400x400"
# ID separators
PLAYLIST_ID_SPLITTER: Final[str] = ":"
+
+# Rotor (radio) station identifiers
+ROTOR_STATION_MY_MIX: Final[str] = "user:onyourwave"
+
+# Client identifier for rotor radioStarted feedback.
+# The API expects a "from" field identifying the client; the desktop app
+# identifier ensures the rotor API returns proper recommendations.
+ROTOR_FEEDBACK_FROM: Final[str] = "YandexMusicDesktopAppWindows"
+
+# Virtual playlist ID for My Mix (used in get_playlist / get_playlist_tracks; not owner_id:kind)
+MY_MIX_PLAYLIST_ID: Final[str] = "my_mix"
+
+# Composite item_id for My Mix tracks: track_id + separator + station_id (for rotor feedback)
+RADIO_TRACK_ID_SEP: Final[str] = "@"
+
+# Browse folder names by locale (item_id -> display name)
+BROWSE_NAMES_RU: Final[dict[str, str]] = {
+ "my_mix": "Мой Микс",
+ "artists": "Мои исполнители",
+ "albums": "Мои альбомы",
+ "tracks": "Мне нравится",
+ "playlists": "Мои плейлисты",
+}
+BROWSE_NAMES_EN: Final[dict[str, str]] = {
+ "my_mix": "My Mix",
+ "artists": "My Artists",
+ "albums": "My Albums",
+ "tracks": "My Favorites",
+ "playlists": "My Playlists",
+}
from .provider import KionMusicProvider
-def _get_content_type(provider: KionMusicProvider) -> ContentType:
- """Get content type based on provider quality setting.
-
- :param provider: The KION Music provider instance.
- :return: ContentType.UNKNOWN as actual codec is determined at stream time.
- """
- # Actual codec is determined when getting stream details
- # Suppress unused argument warning
- _ = provider
- return ContentType.UNKNOWN
-
-
def _get_image_url(cover_uri: str | None, size: str = IMAGE_SIZE_LARGE) -> str | None:
"""Convert cover URI to full URL.
def parse_artist(provider: KionMusicProvider, artist_obj: YandexArtist) -> Artist:
- """Parse KION artist object to MA Artist model.
+ """Parse a KION Music artist object to MA Artist model.
:param provider: The KION Music provider instance.
- :param artist_obj: KION artist object.
+ :param artist_obj: API artist object.
:return: Music Assistant Artist model.
"""
artist_id = str(artist_obj.id)
def parse_album(provider: KionMusicProvider, album_obj: YandexAlbum) -> Album:
- """Parse KION album object to MA Album model.
+ """Parse a KION Music album object to MA Album model.
:param provider: The KION Music provider instance.
- :param album_obj: KION album object.
+ :param album_obj: API album object.
:return: Music Assistant Album model.
"""
name, version = parse_title_and_version(
provider_domain=provider.domain,
provider_instance=provider.instance_id,
audio_format=AudioFormat(
- content_type=_get_content_type(provider),
+ content_type=ContentType.UNKNOWN,
),
url=f"https://music.mts.ru/album/{album_id}",
available=available,
def parse_track(provider: KionMusicProvider, track_obj: YandexTrack) -> Track:
- """Parse KION track object to MA Track model.
+ """Parse a KION Music track object to MA Track model.
:param provider: The KION Music provider instance.
- :param track_obj: KION track object.
+ :param track_obj: API track object.
:return: Music Assistant Track model.
"""
name, version = parse_title_and_version(
# Determine availability
available = track_obj.available or False
- # Duration is in milliseconds
+ # Duration is in milliseconds in KION API
duration = (track_obj.duration_ms or 0) // 1000
track = Track(
provider_domain=provider.domain,
provider_instance=provider.instance_id,
audio_format=AudioFormat(
- content_type=_get_content_type(provider),
+ content_type=ContentType.UNKNOWN,
),
url=f"https://music.mts.ru/track/{track_id}",
available=available,
def parse_playlist(
provider: KionMusicProvider, playlist_obj: YandexPlaylist, owner_name: str | None = None
) -> Playlist:
- """Parse KION playlist object to MA Playlist model.
+ """Parse a KION Music playlist object to MA Playlist model.
:param provider: The KION Music provider instance.
- :param playlist_obj: KION playlist object.
+ :param playlist_obj: API playlist object.
:param owner_name: Optional owner name override.
:return: Music Assistant Playlist model.
"""
from __future__ import annotations
+import logging
+from collections.abc import Sequence
from typing import TYPE_CHECKING
-from music_assistant_models.enums import MediaType
+from music_assistant_models.enums import MediaType, ProviderFeature
from music_assistant_models.errors import (
InvalidDataError,
LoginFailed,
from music_assistant_models.media_items import (
Album,
Artist,
+ BrowseFolder,
ItemMapping,
MediaItemType,
Playlist,
+ ProviderMapping,
+ RecommendationFolder,
SearchResults,
Track,
+ UniqueList,
)
from music_assistant.controllers.cache import use_cache
from music_assistant.models.music_provider import MusicProvider
from .api_client import KionMusicClient
-from .constants import CONF_TOKEN, PLAYLIST_ID_SPLITTER
+from .constants import (
+ BROWSE_INITIAL_TRACKS,
+ BROWSE_NAMES_EN,
+ BROWSE_NAMES_RU,
+ CONF_BASE_URL,
+ CONF_TOKEN,
+ DEFAULT_BASE_URL,
+ DISCOVERY_INITIAL_TRACKS,
+ MY_MIX_BATCH_SIZE,
+ MY_MIX_MAX_TRACKS,
+ MY_MIX_PLAYLIST_ID,
+ PLAYLIST_ID_SPLITTER,
+ RADIO_TRACK_ID_SEP,
+ ROTOR_STATION_MY_MIX,
+ TRACK_BATCH_SIZE,
+)
from .parsers import parse_album, parse_artist, parse_playlist, parse_track
from .streaming import KionMusicStreamingManager
from music_assistant_models.streamdetails import StreamDetails
+def _parse_radio_item_id(item_id: str) -> tuple[str, str | None]:
+ """Extract track_id and optional station_id from provider item_id.
+
+ My Mix tracks use item_id format 'track_id@station_id'. Other tracks use
+ plain track_id.
+
+ :param item_id: Provider item_id (may contain RADIO_TRACK_ID_SEP).
+ :return: (track_id, station_id or None).
+ """
+ if RADIO_TRACK_ID_SEP in item_id:
+ parts = item_id.split(RADIO_TRACK_ID_SEP, 1)
+ return (parts[0], parts[1] if len(parts) > 1 else None)
+ return (item_id, None)
+
+
class KionMusicProvider(MusicProvider):
"""Implementation of a KION Music MusicProvider."""
_client: KionMusicClient | None = None
_streaming: KionMusicStreamingManager | None = None
+ _my_mix_batch_id: str | None = None
+ _my_mix_last_track_id: str | None = None # last track id for "Load more" (API queue param)
+ _my_mix_playlist_next_cursor: str | None = None # first_track_id for next playlist page
+ _my_mix_radio_started_sent: bool = False
+ _my_mix_seen_track_ids: set[str] # Track IDs seen in current My Mix session
@property
def client(self) -> KionMusicClient:
raise ProviderUnavailableError("Provider not initialized")
return self._streaming
+ def _get_browse_names(self) -> dict[str, str]:
+ """Get locale-based browse folder names."""
+ try:
+ locale = (self.mass.metadata.locale or "en_US").lower()
+ use_russian = locale.startswith("ru")
+ except Exception:
+ use_russian = False
+ return BROWSE_NAMES_RU if use_russian else BROWSE_NAMES_EN
+
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
token = self.config.get_value(CONF_TOKEN)
if not token:
raise LoginFailed("No KION Music token provided")
- self._client = KionMusicClient(str(token))
+ base_url = self.config.get_value(CONF_BASE_URL, DEFAULT_BASE_URL)
+ self._client = KionMusicClient(str(token), base_url=str(base_url))
await self._client.connect()
+ # Suppress yandex_music library DEBUG dumps (full API request/response JSON)
+ logging.getLogger("yandex_music").setLevel(self.logger.level + 10)
self._streaming = KionMusicStreamingManager(self)
+ # Initialize My Mix duplicate tracking
+ self._my_mix_seen_track_ids = set()
self.logger.info("Successfully connected to KION Music")
async def unload(self, is_removed: bool = False) -> None:
name=name,
)
+ async def _fetch_my_mix_tracks(
+ self,
+ *,
+ max_tracks: int = MY_MIX_MAX_TRACKS,
+ max_batches: int = MY_MIX_BATCH_SIZE,
+ initial_queue: str | int | None = None,
+ seen_track_ids: set[str] | None = None,
+ ) -> tuple[list[Track], str | None, str | None, set[str]]:
+ """Fetch My Mix tracks with de-duplication and radio feedback.
+
+ :param max_tracks: Maximum number of tracks to return.
+ :param max_batches: Maximum number of API batch calls.
+ :param initial_queue: Optional track ID for API pagination.
+ :param seen_track_ids: Already-seen track IDs for de-duplication.
+ :return: (tracks, last_batch_id, last_first_track_id, updated_seen_ids).
+ """
+ if seen_track_ids is None:
+ seen_track_ids = set()
+
+ tracks: list[Track] = []
+ last_batch_id: str | None = None
+ last_first_track_id: str | None = None
+ queue: str | int | None = initial_queue
+
+ for _ in range(max_batches):
+ if len(tracks) >= max_tracks:
+ break
+
+ yandex_tracks, batch_id = await self.client.get_my_mix_tracks(queue=queue)
+ if batch_id:
+ self._my_mix_batch_id = batch_id
+ last_batch_id = batch_id
+ if not self._my_mix_radio_started_sent and yandex_tracks:
+ self._my_mix_radio_started_sent = True
+ await self.client.send_rotor_station_feedback(
+ ROTOR_STATION_MY_MIX,
+ "radioStarted",
+ batch_id=batch_id,
+ )
+ first_track_id_this_batch: str | None = None
+ for yt in yandex_tracks:
+ if len(tracks) >= max_tracks:
+ break
+ try:
+ t = parse_track(self, yt)
+ track_id = (
+ str(yt.id) if hasattr(yt, "id") and yt.id else getattr(yt, "track_id", None)
+ )
+ if track_id:
+ if track_id in seen_track_ids:
+ self.logger.debug("Skipping duplicate My Mix track: %s", track_id)
+ continue
+ seen_track_ids.add(track_id)
+ if first_track_id_this_batch is None:
+ first_track_id_this_batch = track_id
+ t.item_id = f"{track_id}{RADIO_TRACK_ID_SEP}{ROTOR_STATION_MY_MIX}"
+ for pm in t.provider_mappings:
+ if pm.provider_instance == self.instance_id:
+ pm.item_id = t.item_id
+ break
+ tracks.append(t)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing My Mix track: %s", err)
+ if first_track_id_this_batch is not None:
+ last_first_track_id = first_track_id_this_batch
+ if not batch_id or not yandex_tracks or len(tracks) >= max_tracks:
+ break
+ queue = first_track_id_this_batch
+
+ return (tracks, last_batch_id, last_first_track_id, seen_track_ids)
+
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ """Browse provider items with locale-based folder names and My Mix.
+
+ Root level shows My Mix, artists, albums, liked tracks, playlists. Names
+ are in Russian when MA locale is ru_*, otherwise in English. My Mix
+ tracks use item_id format track_id@station_id for rotor feedback.
+
+ :param path: The path to browse (e.g. provider_id:// or provider_id://artists).
+ """
+ if ProviderFeature.BROWSE not in self.supported_features:
+ raise NotImplementedError
+
+ path_parts = path.split("://")[1].split("/") if "://" in path else []
+ subpath = path_parts[0] if len(path_parts) > 0 else None
+ sub_subpath = path_parts[1] if len(path_parts) > 1 else None
+
+ if subpath == MY_MIX_PLAYLIST_ID:
+ max_batches = MY_MIX_BATCH_SIZE if sub_subpath != "next" else 1
+
+ if sub_subpath != "next":
+ self._my_mix_seen_track_ids = set()
+
+ queue: str | int | None = None
+ if sub_subpath == "next":
+ queue = self._my_mix_last_track_id
+ elif sub_subpath:
+ queue = sub_subpath
+
+ (
+ fetched,
+ last_batch_id,
+ last_first_track_id,
+ self._my_mix_seen_track_ids,
+ ) = await self._fetch_my_mix_tracks(
+ max_batches=max_batches,
+ initial_queue=queue,
+ seen_track_ids=self._my_mix_seen_track_ids,
+ )
+ if last_first_track_id is not None:
+ self._my_mix_last_track_id = last_first_track_id
+
+ all_tracks: list[Track | BrowseFolder] = list(fetched)
+
+ # Apply initial tracks limit if not in "load more" mode
+ if sub_subpath != "next":
+ if len(all_tracks) > BROWSE_INITIAL_TRACKS:
+ all_tracks = all_tracks[:BROWSE_INITIAL_TRACKS]
+
+ # Only show "Load more" if we haven't reached the limit and there's more data
+ if last_batch_id and len(fetched) < MY_MIX_MAX_TRACKS:
+ names = self._get_browse_names()
+ next_name = "Ещё" if names == BROWSE_NAMES_RU else "Load more"
+ all_tracks.append(
+ BrowseFolder(
+ item_id="next",
+ provider=self.instance_id,
+ path=f"{path.rstrip('/')}/next",
+ name=next_name,
+ is_playable=False,
+ )
+ )
+ return all_tracks
+
+ if subpath:
+ return await super().browse(path)
+
+ names = self._get_browse_names()
+
+ folders: list[BrowseFolder] = []
+ base = path if path.endswith("//") else path.rstrip("/") + "/"
+ folders.append(
+ BrowseFolder(
+ item_id=MY_MIX_PLAYLIST_ID,
+ provider=self.instance_id,
+ path=f"{base}{MY_MIX_PLAYLIST_ID}",
+ name=names[MY_MIX_PLAYLIST_ID],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="artists",
+ provider=self.instance_id,
+ path=f"{base}artists",
+ name=names["artists"],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="albums",
+ provider=self.instance_id,
+ path=f"{base}albums",
+ name=names["albums"],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="tracks",
+ provider=self.instance_id,
+ path=f"{base}tracks",
+ name=names["tracks"],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="playlists",
+ provider=self.instance_id,
+ path=f"{base}playlists",
+ name=names["playlists"],
+ is_playable=True,
+ )
+ )
+ if len(folders) == 1:
+ return await self.browse(folders[0].path)
+ return folders
+
# Search
@use_cache(3600 * 24 * 14)
raise MediaNotFoundError(f"Album {prov_album_id} not found")
return parse_album(self, album)
- @use_cache(3600 * 24 * 30)
async def get_track(self, prov_track_id: str) -> Track:
"""Get track details by ID.
- :param prov_track_id: The provider track ID.
+ Supports composite item_id (track_id@station_id) for My Mix tracks;
+ only the track_id part is used for the API. Normalizes the ID before
+ caching so that "12345" and "12345@user:onyourwave" share one cache entry.
+
+ :param prov_track_id: The provider track ID (or track_id@station_id).
:return: Track object.
:raises MediaNotFoundError: If track not found.
"""
- yandex_track = await self.client.get_track(prov_track_id)
+ track_id, _ = _parse_radio_item_id(prov_track_id)
+ return await self._get_track_cached(track_id)
+
+ @use_cache(3600 * 24 * 30)
+ async def _get_track_cached(self, track_id: str) -> Track:
+ """Fetch and cache track details by normalized track ID.
+
+ :param track_id: Plain track ID (no station suffix).
+ :return: Track object.
+ :raises MediaNotFoundError: If track not found.
+ """
+ yandex_track = await self.client.get_track(track_id)
if not yandex_track:
- raise MediaNotFoundError(f"Track {prov_track_id} not found")
+ raise MediaNotFoundError(f"Track {track_id} not found")
return parse_track(self, yandex_track)
@use_cache(3600 * 24 * 30)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details by ID.
- :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind").
+ Supports virtual playlist MY_MIX_PLAYLIST_ID (My Mix). Real playlists
+ use format "owner_id:kind".
+
+ :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind" or my_mix).
:return: Playlist object.
:raises MediaNotFoundError: If playlist not found.
"""
+ if prov_playlist_id == MY_MIX_PLAYLIST_ID:
+ names = self._get_browse_names()
+ return Playlist(
+ item_id=MY_MIX_PLAYLIST_ID,
+ provider=self.instance_id,
+ name=names[MY_MIX_PLAYLIST_ID],
+ owner="KION Music",
+ provider_mappings={
+ ProviderMapping(
+ item_id=MY_MIX_PLAYLIST_ID,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ is_unique=True,
+ )
+ },
+ is_editable=False,
+ )
+
# Parse the playlist ID (format: owner_id:kind)
if PLAYLIST_ID_SPLITTER in prov_playlist_id:
owner_id, kind = prov_playlist_id.split(PLAYLIST_ID_SPLITTER, 1)
raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
return parse_playlist(self, playlist)
+ async def _get_my_mix_playlist_tracks(self, page: int) -> list[Track]:
+ """Get My Mix tracks for virtual playlist (uncached; uses cursor for page > 0).
+
+ :param page: Page number (0 = first batch, 1+ = next batches via queue cursor).
+ :return: List of Track objects for this page.
+ """
+ if page == 0:
+ self._my_mix_seen_track_ids = set()
+
+ queue: str | int | None = None
+ if page > 0:
+ queue = self._my_mix_playlist_next_cursor
+ if not queue:
+ return []
+
+ if len(self._my_mix_seen_track_ids) >= MY_MIX_MAX_TRACKS:
+ return []
+
+ (
+ tracks,
+ _,
+ last_first_track_id,
+ self._my_mix_seen_track_ids,
+ ) = await self._fetch_my_mix_tracks(
+ max_batches=1,
+ initial_queue=queue,
+ seen_track_ids=self._my_mix_seen_track_ids,
+ )
+ if last_first_track_id is not None:
+ self._my_mix_playlist_next_cursor = last_first_track_id
+ return tracks
+
# Get related items
@use_cache(3600 * 24 * 30)
self.logger.debug("Error parsing album track: %s", err)
return tracks
+ @use_cache(3600 * 3)
+ async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
+ """Get similar tracks using rotor station for this track.
+
+ Uses rotor station track:{id} so MA radio mode gets recommendations.
+
+ :param prov_track_id: Provider track ID (plain or track_id@station_id).
+ :param limit: Maximum number of tracks to return.
+ :return: List of similar Track objects.
+ """
+ track_id, _ = _parse_radio_item_id(prov_track_id)
+ station_id = f"track:{track_id}"
+ yandex_tracks, _ = await self.client.get_rotor_station_tracks(station_id, queue=None)
+ tracks = []
+ for yt in yandex_tracks[:limit]:
+ try:
+ tracks.append(parse_track(self, yt))
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing similar track: %s", err)
+ return tracks
+
+ @use_cache(600) # Cache for 10 minutes
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get recommendations; includes My Mix (Мой Микс) as first folder.
+
+ Fetches fresh tracks on each call for discovery experience.
+
+ :return: List of recommendation folders (My Mix with tracks).
+ """
+ items, _, _, _ = await self._fetch_my_mix_tracks(
+ max_tracks=DISCOVERY_INITIAL_TRACKS,
+ )
+
+ names = self._get_browse_names()
+ return [
+ RecommendationFolder(
+ item_id=MY_MIX_PLAYLIST_ID,
+ provider=self.instance_id,
+ name=names[MY_MIX_PLAYLIST_ID],
+ items=UniqueList(items),
+ icon="mdi-waveform",
+ )
+ ]
+
@use_cache(3600 * 3)
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks.
- :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind").
+ :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind" or my_mix).
:param page: Page number for pagination.
:return: List of Track objects.
"""
- # KION Music API returns all playlist tracks in one call (no server-side pagination)
+ if prov_playlist_id == MY_MIX_PLAYLIST_ID:
+ return await self._get_my_mix_playlist_tracks(page)
+
+ # KION Music API returns all playlist tracks in one call (no server-side pagination).
+ # Return empty list for page > 0 so the controller pagination loop terminates.
if page > 0:
return []
- self.logger.debug("get_playlist_tracks called: %s", prov_playlist_id)
# Parse the playlist ID (format: owner_id:kind)
if PLAYLIST_ID_SPLITTER in prov_playlist_id:
owner_id, kind = prov_playlist_id.split(PLAYLIST_ID_SPLITTER, 1)
owner_id = str(self.client.user_id)
kind = prov_playlist_id
- self.logger.debug("Fetching playlist %s/%s from API...", owner_id, kind)
playlist = await self.client.get_playlist(owner_id, kind)
if not playlist:
- self.logger.debug("Playlist %s/%s not found", owner_id, kind)
return []
# API sometimes returns playlist without tracks; fetch them explicitly if needed
tracks_list = playlist.tracks or []
track_count = getattr(playlist, "track_count", None) or 0
- self.logger.debug(
- "Playlist %s/%s: track_count=%s, tracks_in_response=%s",
- owner_id,
- kind,
- track_count,
- len(tracks_list),
- )
if not tracks_list and track_count > 0:
- self.logger.debug("No tracks in response, calling fetch_tracks_async...")
+ self.logger.debug(
+ "Playlist %s/%s: track_count=%s but no tracks in response, "
+ "calling fetch_tracks_async",
+ owner_id,
+ kind,
+ track_count,
+ )
try:
tracks_list = await playlist.fetch_tracks_async()
- self.logger.debug("fetch_tracks_async returned %s tracks", len(tracks_list or []))
except Exception as err:
- self.logger.warning("fetch_tracks_async failed: %s", err)
+ self.logger.warning("fetch_tracks_async failed for %s/%s: %s", owner_id, kind, err)
if not tracks_list:
- self.logger.warning(
- "Playlist %s/%s: expected %s tracks but got none",
- owner_id,
- kind,
- track_count,
- )
raise ResourceTemporarilyUnavailable(
"Playlist tracks not available; try again later"
- ) from None
+ )
if not tracks_list:
- self.logger.debug("Playlist %s/%s has no tracks", owner_id, kind)
return []
# API returns TrackShort objects, we need to fetch full track info
if not track_ids:
return []
- self.logger.debug("Fetching full details for %s tracks...", len(track_ids))
# Fetch full track details in batches to avoid timeouts
- batch_size = 50
full_tracks = []
- for i in range(0, len(track_ids), batch_size):
- batch = track_ids[i : i + batch_size]
- self.logger.debug("Fetching batch %s-%s...", i, i + len(batch))
+ for i in range(0, len(track_ids), TRACK_BATCH_SIZE):
+ batch = track_ids[i : i + TRACK_BATCH_SIZE]
batch_result = await self.client.get_tracks(batch)
- self.logger.debug("Batch returned %s tracks", len(batch_result or []))
- full_tracks.extend(batch_result or [])
+ if not batch_result:
+ self.logger.warning(
+ "Received empty result for playlist %s tracks batch %s-%s",
+ prov_playlist_id,
+ i,
+ i + len(batch) - 1,
+ )
+ raise ResourceTemporarilyUnavailable(
+ "Playlist tracks not fully available; try again later"
+ )
+ full_tracks.extend(batch_result)
if track_ids and not full_tracks:
- self.logger.warning("Got 0 full tracks for %s IDs", len(track_ids))
- raise ResourceTemporarilyUnavailable(
- "Failed to load track details; try again later"
- ) from None
+ raise ResourceTemporarilyUnavailable("Failed to load track details; try again later")
tracks = []
for track in full_tracks:
tracks.append(parse_track(self, track))
except InvalidDataError as err:
self.logger.debug("Error parsing playlist track: %s", err)
- self.logger.debug("Returning %s parsed tracks", len(tracks))
return tracks
@use_cache(3600 * 24 * 7)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve library albums from KION Music."""
- albums = await self.client.get_liked_albums()
+ albums = await self.client.get_liked_albums(batch_size=TRACK_BATCH_SIZE)
for album in albums:
try:
yield parse_album(self, album)
# Fetch full track details in batches
track_ids = [str(ts.track_id) for ts in track_shorts if ts.track_id]
- batch_size = 50
- for i in range(0, len(track_ids), batch_size):
- batch_ids = track_ids[i : i + batch_size]
+ for i in range(0, len(track_ids), TRACK_BATCH_SIZE):
+ batch_ids = track_ids[i : i + TRACK_BATCH_SIZE]
full_tracks = await self.client.get_tracks(batch_ids)
for track in full_tracks:
try:
self.logger.debug("Error parsing library track: %s", err)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
- """Retrieve library playlists from KION Music."""
+ """Retrieve library playlists from KION Music.
+
+ Includes the virtual My Mix playlist first, then user playlists.
+ """
+ yield await self.get_playlist(MY_MIX_PLAYLIST_ID)
playlists = await self.client.get_user_playlists()
for playlist in playlists:
try:
prov_item_id = self._get_provider_item_id(item)
if not prov_item_id:
return False
+ track_id, _ = _parse_radio_item_id(prov_item_id)
if item.media_type == MediaType.TRACK:
- return await self.client.like_track(prov_item_id)
+ return await self.client.like_track(track_id)
if item.media_type == MediaType.ALBUM:
return await self.client.like_album(prov_item_id)
if item.media_type == MediaType.ARTIST:
async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Remove item from library.
- :param prov_item_id: The provider item ID.
+ :param prov_item_id: The provider item ID (may be track_id@station_id for tracks).
:param media_type: The media type.
:return: True if successful.
"""
+ track_id, _ = _parse_radio_item_id(prov_item_id)
if media_type == MediaType.TRACK:
- return await self.client.unlike_track(prov_item_id)
+ return await self.client.unlike_track(track_id)
if media_type == MediaType.ALBUM:
return await self.client.unlike_album(prov_item_id)
if media_type == MediaType.ARTIST:
) -> StreamDetails:
"""Get stream details for a track.
- :param item_id: The track ID.
+ :param item_id: The track ID (or track_id@station_id for My Mix).
:param media_type: The media type (should be TRACK).
:return: StreamDetails for the track.
"""
return await self.streaming.get_stream_details(item_id)
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ prov_item_id: str,
+ fully_played: bool,
+ position: int,
+ media_item: MediaItemType,
+ is_playing: bool = False,
+ ) -> None:
+ """Report playback for rotor feedback when the track is from My Mix.
+
+ Sends trackStarted when the track is currently playing (is_playing=True).
+ trackFinished/skip are sent from on_streamed to use accurate seconds_streamed.
+ """
+ if media_type != MediaType.TRACK:
+ return
+ track_id, station_id = _parse_radio_item_id(prov_item_id)
+ if not station_id:
+ return
+ if is_playing:
+ await self.client.send_rotor_station_feedback(
+ station_id,
+ "trackStarted",
+ track_id=track_id,
+ batch_id=self._my_mix_batch_id,
+ )
+
+ async def on_streamed(self, streamdetails: StreamDetails) -> None:
+ """Report stream completion for My Mix rotor feedback.
+
+ Sends trackFinished or skip with actual seconds_streamed so the service
+ can improve recommendations.
+ """
+ track_id, station_id = _parse_radio_item_id(streamdetails.item_id)
+ if not station_id:
+ return
+ seconds = int(streamdetails.seconds_streamed or 0)
+ duration = streamdetails.duration or 0
+ feedback_type = "trackFinished" if duration and seconds >= max(0, duration - 10) else "skip"
+ await self.client.send_rotor_station_feedback(
+ station_id,
+ feedback_type,
+ track_id=track_id,
+ total_played_seconds=seconds,
+ batch_id=self._my_mix_batch_id,
+ )
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.streamdetails import StreamDetails
-from .constants import CONF_QUALITY, QUALITY_LOSSLESS
+from .constants import CONF_QUALITY, QUALITY_LOSSLESS, RADIO_TRACK_ID_SEP
if TYPE_CHECKING:
from yandex_music import DownloadInfo
self.mass = provider.mass
self.logger = provider.logger
+ def _track_id_from_item_id(self, item_id: str) -> str:
+ """Extract API track ID from item_id (may be track_id@station_id for My Mix)."""
+ if RADIO_TRACK_ID_SEP in item_id:
+ return item_id.split(RADIO_TRACK_ID_SEP, 1)[0]
+ return item_id
+
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get stream details for a track.
- :param item_id: Track ID.
- :return: StreamDetails for the track.
+ :param item_id: Track ID or composite track_id@station_id for My Mix.
+ :return: StreamDetails for the track (item_id preserved for on_streamed).
:raises MediaNotFoundError: If stream URL cannot be obtained.
"""
- # Get track info first
+ track_id = self._track_id_from_item_id(item_id)
track = await self.provider.get_track(item_id)
if not track:
raise MediaNotFoundError(f"Track {item_id} not found")
# When user wants lossless, try get-file-info first (FLAC; download-info often MP3 only)
if want_lossless:
- self.logger.debug("Requesting lossless via get-file-info for track %s", item_id)
- file_info = await self.client.get_track_file_info_lossless(item_id)
+ self.logger.debug("Requesting lossless via get-file-info for track %s", track_id)
+ file_info = await self.client.get_track_file_info_lossless(track_id)
if file_info:
url = file_info.get("url")
codec = file_info.get("codec") or ""
)
# Default: use /tracks/.../download-info and select best quality
- download_infos = await self.client.get_track_download_info(item_id, get_direct_links=True)
+ download_infos = await self.client.get_track_download_info(track_id, get_direct_links=True)
if not download_infos:
raise MediaNotFoundError(f"No stream info available for track {item_id}")
]
self.logger.debug(
"Stream quality for track %s: config quality=%s, available codecs=%s",
- item_id,
+ track_id,
quality_str,
codecs_available,
)
self.logger.debug(
"Stream selected for track %s: codec=%s, bitrate=%s",
- item_id,
+ track_id,
getattr(selected_info, "codec", None),
getattr(selected_info, "bitrate_in_kbps", None),
)
def _get_content_type(self, codec: str | None) -> ContentType:
"""Determine content type from codec string.
- :param codec: Codec string from API.
+ :param codec: Codec string from KION API.
:return: ContentType enum value.
"""
if not codec:
return ContentType.FLAC
if codec_lower in ("mp3", "mpeg"):
return ContentType.MP3
- if codec_lower == "aac":
+ if codec_lower in ("aac", "aac-mp4", "he-aac", "he-aac-mp4"):
return ContentType.AAC
return ContentType.UNKNOWN
self._error_count += 1
-# Minimal client-like object for yandex_music de_json (library requires client, not None)
+class StreamingProviderStubWithTracking:
+ """Provider stub with tracking logger for assertions.
+
+ Use this when you need to verify logging behavior.
+ """
+
+ domain = "kion_music"
+ instance_id = "kion_music_instance"
+
+ def __init__(self) -> None:
+ """Initialize stub with tracking logger."""
+ self.client = type("ClientStub", (), {"user_id": 12345})()
+ self.mass = type("MassStub", (), {})()
+ self.logger = TrackingLogger()
+
+
+# Minimal client-like object for kion_music de_json (library requires client, not None)
DE_JSON_CLIENT = type("ClientStub", (), {"report_unknown_fields": False})()
def streaming_provider_stub() -> StreamingProviderStub:
"""Return a streaming provider stub (no Mock)."""
return StreamingProviderStub()
+
+
+@pytest.fixture
+def streaming_provider_stub_with_tracking() -> StreamingProviderStubWithTracking:
+ """Return a streaming provider stub with tracking logger."""
+ return StreamingProviderStubWithTracking()
from music_assistant_models.errors import ResourceTemporarilyUnavailable
from yandex_music.exceptions import NetworkError
-from music_assistant.providers.kion_music.api_client import KION_BASE_URL, KionMusicClient
+from music_assistant.providers.kion_music.api_client import KionMusicClient
+from music_assistant.providers.kion_music.constants import DEFAULT_BASE_URL
@pytest.fixture
async def test_connect_sets_base_url(client: KionMusicClient) -> None:
- """Verify connect() passes KION_BASE_URL to ClientAsync."""
+ """Verify connect() passes DEFAULT_BASE_URL to ClientAsync."""
with mock.patch("music_assistant.providers.kion_music.api_client.ClientAsync") as mock_cls:
mock_instance = mock.AsyncMock()
mock_instance.me = type("Me", (), {"account": type("Account", (), {"uid": 42})()})()
result = await client.connect()
assert result is True
- mock_cls.assert_called_once_with("fake_token", base_url=KION_BASE_URL)
+ mock_cls.assert_called_once_with("fake_token", base_url=DEFAULT_BASE_URL)
async def test_get_liked_albums_batching(client: KionMusicClient) -> None:
--- /dev/null
+"""Tests for My Mix (Мой Микс) browse and rotor feedback helpers."""
+
+from __future__ import annotations
+
+from music_assistant.providers.kion_music.constants import (
+ RADIO_TRACK_ID_SEP,
+ ROTOR_STATION_MY_MIX,
+)
+from music_assistant.providers.kion_music.provider import _parse_radio_item_id
+
+
+def test_parse_radio_item_id_plain_track_id() -> None:
+ """Plain track_id returns (track_id, None)."""
+ assert _parse_radio_item_id("12345") == ("12345", None)
+ assert _parse_radio_item_id("0") == ("0", None)
+
+
+def test_parse_radio_item_id_composite() -> None:
+ """Composite track_id@station_id returns (track_id, station_id)."""
+ assert _parse_radio_item_id(f"12345{RADIO_TRACK_ID_SEP}{ROTOR_STATION_MY_MIX}") == (
+ "12345",
+ ROTOR_STATION_MY_MIX,
+ )
+ assert _parse_radio_item_id("99@user:custom") == ("99", "user:custom")
def _artist_from_fixture(path: pathlib.Path) -> YandexArtist | None:
- """Deserialize Artist from fixture JSON."""
+ """Deserialize KION Artist from fixture JSON."""
data = _load_json(path)
return YandexArtist.de_json(data, DE_JSON_CLIENT)
def _album_from_fixture(path: pathlib.Path) -> YandexAlbum | None:
- """Deserialize Album from fixture JSON."""
+ """Deserialize KION Album from fixture JSON."""
data = _load_json(path)
return YandexAlbum.de_json(data, DE_JSON_CLIENT)
def _track_from_fixture(path: pathlib.Path) -> YandexTrack | None:
- """Deserialize Track from fixture JSON."""
+ """Deserialize KION Track from fixture JSON."""
data = _load_json(path)
return YandexTrack.de_json(data, DE_JSON_CLIENT)
def _playlist_from_fixture(path: pathlib.Path) -> YandexPlaylist | None:
- """Deserialize Playlist from fixture JSON."""
+ """Deserialize KION Playlist from fixture JSON."""
data = _load_json(path)
return YandexPlaylist.de_json(data, DE_JSON_CLIENT)
--- /dev/null
+"""Unit tests for KION Music streaming quality selection."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pytest
+from music_assistant_models.enums import ContentType
+
+from music_assistant.providers.kion_music.constants import QUALITY_HIGH, QUALITY_LOSSLESS
+from music_assistant.providers.kion_music.streaming import KionMusicStreamingManager
+
+if TYPE_CHECKING:
+ from tests.providers.kion_music.conftest import (
+ StreamingProviderStub,
+ StreamingProviderStubWithTracking,
+ )
+
+
+def _make_download_info(
+ codec: str,
+ bitrate_in_kbps: int,
+ direct_link: str = "https://example.com/track",
+) -> Any:
+ """Build DownloadInfo-like object."""
+ return type(
+ "DownloadInfo",
+ (),
+ {
+ "codec": codec,
+ "bitrate_in_kbps": bitrate_in_kbps,
+ "direct_link": direct_link,
+ },
+ )()
+
+
+@pytest.fixture
+def streaming_manager(
+ streaming_provider_stub: StreamingProviderStub,
+) -> KionMusicStreamingManager:
+ """Create streaming manager with real stub (no Mock)."""
+ return KionMusicStreamingManager(streaming_provider_stub) # type: ignore[arg-type]
+
+
+@pytest.fixture
+def streaming_manager_with_tracking(
+ streaming_provider_stub_with_tracking: StreamingProviderStubWithTracking,
+) -> KionMusicStreamingManager:
+ """Create streaming manager with tracking logger for assertions."""
+ return KionMusicStreamingManager(streaming_provider_stub_with_tracking) # type: ignore[arg-type]
+
+
+def test_select_best_quality_lossless_returns_flac(
+ streaming_manager: KionMusicStreamingManager,
+) -> None:
+ """When preferred_quality is 'lossless' and list has MP3 and FLAC, FLAC is selected."""
+ mp3 = _make_download_info("mp3", 320, "https://example.com/track.mp3")
+ flac = _make_download_info("flac", 0, "https://example.com/track.flac")
+ download_infos = [mp3, flac]
+
+ result = streaming_manager._select_best_quality(download_infos, QUALITY_LOSSLESS)
+
+ assert result is not None
+ assert result.codec == "flac"
+ assert result.direct_link == "https://example.com/track.flac"
+
+
+def test_select_best_quality_high_returns_highest_bitrate(
+ streaming_manager: KionMusicStreamingManager,
+) -> None:
+ """When preferred is 'high' and list has MP3 and FLAC, highest bitrate is selected."""
+ mp3 = _make_download_info("mp3", 320, "https://example.com/track.mp3")
+ flac = _make_download_info("flac", 0, "https://example.com/track.flac")
+ download_infos = [mp3, flac]
+
+ result = streaming_manager._select_best_quality(download_infos, QUALITY_HIGH)
+
+ assert result is not None
+ assert result.codec == "mp3"
+ assert result.bitrate_in_kbps == 320
+
+
+def test_select_best_quality_label_lossless_flac_returns_flac(
+ streaming_manager: KionMusicStreamingManager,
+) -> None:
+ """When preferred_quality is UI label 'Lossless (FLAC)', FLAC is selected."""
+ mp3 = _make_download_info("mp3", 320, "https://example.com/track.mp3")
+ flac = _make_download_info("flac", 0, "https://example.com/track.flac")
+ download_infos = [mp3, flac]
+
+ result = streaming_manager._select_best_quality(download_infos, "Lossless (FLAC)")
+
+ assert result is not None
+ assert result.codec == "flac"
+
+
+def test_select_best_quality_lossless_no_flac_returns_fallback(
+ streaming_manager_with_tracking: KionMusicStreamingManager,
+) -> None:
+ """When lossless requested but no FLAC in list, returns best available (fallback)."""
+ mp3 = _make_download_info("mp3", 320, "https://example.com/track.mp3")
+ download_infos = [mp3]
+
+ result = streaming_manager_with_tracking._select_best_quality(download_infos, QUALITY_LOSSLESS)
+
+ assert result is not None
+ assert result.codec == "mp3"
+ assert streaming_manager_with_tracking.provider.logger._warning_count == 1 # type: ignore[attr-defined]
+
+
+def test_select_best_quality_empty_list_returns_none(
+ streaming_manager: KionMusicStreamingManager,
+) -> None:
+ """Empty download_infos returns None."""
+ result = streaming_manager._select_best_quality([], QUALITY_LOSSLESS)
+ assert result is None
+
+
+def test_select_best_quality_none_preferred_returns_highest_bitrate(
+ streaming_manager: KionMusicStreamingManager,
+) -> None:
+ """When preferred_quality is None, returns highest bitrate."""
+ mp3 = _make_download_info("mp3", 320, "https://example.com/track.mp3")
+ flac = _make_download_info("flac", 0, "https://example.com/track.flac")
+ download_infos = [mp3, flac]
+
+ result = streaming_manager._select_best_quality(download_infos, None)
+
+ assert result is not None
+ assert result.codec == "mp3"
+ assert result.bitrate_in_kbps == 320
+
+
+def test_get_content_type_flac_mp4_returns_flac(
+ streaming_manager: KionMusicStreamingManager,
+) -> None:
+ """flac-mp4 codec from get-file-info is mapped to ContentType.FLAC."""
+ assert streaming_manager._get_content_type("flac-mp4") == ContentType.FLAC
+ assert streaming_manager._get_content_type("FLAC-MP4") == ContentType.FLAC