from __future__ import annotations
import logging
+from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, cast
from music_assistant_models.errors import (
if TYPE_CHECKING:
from yandex_music import DownloadInfo
-from .constants import DEFAULT_LIMIT
+from .constants import DEFAULT_LIMIT, ROTOR_STATION_MY_WAVE
# get-file-info with quality=lossless returns FLAC; default /tracks/.../download-info often does not
# Prefer flac-mp4/aac-mp4 (Yandex API moved to these formats around 2025)
raise ProviderUnavailableError("Client not connected, call connect() first")
return self._client
+ def _is_connection_error(self, err: Exception) -> bool:
+ """Return True if the exception indicates a connection or server drop."""
+ if isinstance(err, NetworkError):
+ return True
+ msg = str(err).lower()
+ return "disconnect" in msg or "connection" in msg or "timeout" in msg
+
+ async def _reconnect(self) -> None:
+ """Disconnect and connect again to recover from Server disconnected / connection errors."""
+ await self.disconnect()
+ await self.connect()
+
+ # Rotor (radio station) methods
+
+ async def get_rotor_station_tracks(
+ self,
+ station_id: str,
+ queue: str | int | None = None,
+ ) -> tuple[list[YandexTrack], str | None]:
+ """Get tracks from a rotor station (e.g. user:onyourwave or track:1234).
+
+ :param station_id: Station ID (e.g. ROTOR_STATION_MY_WAVE or "track:1234" for similar).
+ :param queue: Optional track ID for pagination (first track of previous batch).
+ :return: Tuple of (list of track objects, batch_id for feedback or None).
+ """
+ for attempt in range(2):
+ client = self._ensure_connected()
+ try:
+ result = await client.rotor_station_tracks(station_id, settings2=True, queue=queue)
+ if not result or not result.sequence:
+ return ([], result.batch_id if result else None)
+ track_ids = []
+ for seq in result.sequence:
+ if seq.track is None:
+ continue
+ tid = getattr(seq.track, "id", None) or getattr(seq.track, "track_id", None)
+ if tid is not None:
+ track_ids.append(str(tid))
+ if not track_ids:
+ return ([], result.batch_id if result else None)
+ full_tracks = await self.get_tracks(track_ids)
+ order_map = {str(t.id): t for t in full_tracks if hasattr(t, "id") and t.id}
+ ordered = [order_map[tid] for tid in track_ids if tid in order_map]
+ return (ordered, result.batch_id if result else None)
+ except BadRequestError as err:
+ LOGGER.warning("Error fetching rotor station %s tracks: %s", station_id, err)
+ return ([], None)
+ except (NetworkError, Exception) as err:
+ if attempt == 0 and self._is_connection_error(err):
+ LOGGER.warning(
+ "Connection error fetching rotor tracks, reconnecting: %s",
+ err,
+ )
+ try:
+ await self._reconnect()
+ except Exception as recon_err:
+ LOGGER.warning("Reconnect failed: %s", recon_err)
+ return ([], None)
+ else:
+ LOGGER.warning("Error fetching rotor station tracks: %s", err)
+ return ([], None)
+ return ([], None)
+
+ async def get_my_wave_tracks(
+ self, queue: str | int | None = None
+ ) -> tuple[list[YandexTrack], str | None]:
+ """Get tracks from the My Wave (Моя волна) radio station.
+
+ :param queue: Optional track ID of the last track from the previous batch (API uses it for
+ pagination; do not pass batch_id).
+ :return: Tuple of (list of track objects, batch_id for feedback).
+ """
+ return await self.get_rotor_station_tracks(ROTOR_STATION_MY_WAVE, queue=queue)
+
+ async def send_rotor_station_feedback(
+ self,
+ station_id: str,
+ feedback_type: str,
+ *,
+ batch_id: str | None = None,
+ track_id: str | None = None,
+ total_played_seconds: int | None = None,
+ ) -> bool:
+ """Send rotor station feedback for My Wave recommendations.
+
+ Used to report radioStarted, trackStarted, trackFinished, skip so that
+ Yandex can improve subsequent recommendations.
+
+ :param station_id: Station ID (e.g. ROTOR_STATION_MY_WAVE).
+ :param feedback_type: One of 'radioStarted', 'trackStarted', 'trackFinished', 'skip'.
+ :param batch_id: Optional batch ID from the last get_my_wave_tracks response.
+ :param track_id: Track ID (required for trackStarted, trackFinished, skip).
+ :param total_played_seconds: Seconds played (for trackFinished, skip).
+ :return: True if the request succeeded.
+ """
+ client = self._ensure_connected()
+ payload: dict[str, Any] = {
+ "type": feedback_type,
+ "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
+ }
+ if feedback_type == "radioStarted":
+ payload["from"] = "YandexMusicDesktopAppWindows"
+ if track_id is not None:
+ payload["trackId"] = track_id
+ if total_played_seconds is not None:
+ payload["totalPlayedSeconds"] = total_played_seconds
+ if batch_id is not None:
+ payload["batchId"] = batch_id
+
+ url = f"{client.base_url}/rotor/station/{station_id}/feedback"
+ for attempt in range(2):
+ client = self._ensure_connected()
+ try:
+ await client._request.post(url, payload)
+ return True
+ except BadRequestError as err:
+ LOGGER.debug("Rotor feedback %s failed: %s", feedback_type, err)
+ return False
+ except (NetworkError, Exception) as err:
+ if attempt == 0 and self._is_connection_error(err):
+ LOGGER.warning(
+ "Connection error on rotor feedback %s, reconnecting: %s",
+ feedback_type,
+ err,
+ )
+ try:
+ await self._reconnect()
+ except Exception as recon_err:
+ LOGGER.debug("Reconnect failed: %s", recon_err)
+ return False
+ else:
+ LOGGER.debug("Rotor feedback %s failed: %s", feedback_type, err)
+ return False
+ return False
+
# Library methods
async def get_liked_tracks(self) -> list[TrackShort]:
from __future__ import annotations
import logging
+from collections.abc import Sequence
from typing import TYPE_CHECKING
-from music_assistant_models.enums import MediaType
+from music_assistant_models.enums import MediaType, ProviderFeature
from music_assistant_models.errors import (
InvalidDataError,
LoginFailed,
from music_assistant_models.media_items import (
Album,
Artist,
+ BrowseFolder,
ItemMapping,
MediaItemType,
Playlist,
+ ProviderMapping,
+ RecommendationFolder,
SearchResults,
Track,
+ UniqueList,
)
from music_assistant.controllers.cache import use_cache
from music_assistant.models.music_provider import MusicProvider
from .api_client import YandexMusicClient
-from .constants import CONF_TOKEN, PLAYLIST_ID_SPLITTER
+from .constants import (
+ BROWSE_NAMES_EN,
+ BROWSE_NAMES_RU,
+ CONF_TOKEN,
+ MY_WAVE_PLAYLIST_ID,
+ PLAYLIST_ID_SPLITTER,
+ RADIO_TRACK_ID_SEP,
+ ROTOR_STATION_MY_WAVE,
+)
from .parsers import parse_album, parse_artist, parse_playlist, parse_track
from .streaming import YandexMusicStreamingManager
from music_assistant_models.streamdetails import StreamDetails
+def _parse_radio_item_id(item_id: str) -> tuple[str, str | None]:
+ """Extract track_id and optional station_id from provider item_id.
+
+ My Wave tracks use item_id format 'track_id@station_id'. Other tracks use
+ plain track_id.
+
+ :param item_id: Provider item_id (may contain RADIO_TRACK_ID_SEP).
+ :return: (track_id, station_id or None).
+ """
+ if RADIO_TRACK_ID_SEP in item_id:
+ parts = item_id.split(RADIO_TRACK_ID_SEP, 1)
+ return (parts[0], parts[1] if len(parts) > 1 else None)
+ return (item_id, None)
+
+
class YandexMusicProvider(MusicProvider):
"""Implementation of a Yandex Music MusicProvider."""
_client: YandexMusicClient | None = None
_streaming: YandexMusicStreamingManager | None = None
+ _my_wave_batch_id: str | None = None
+ _my_wave_last_track_id: str | None = None # last track id for "Load more" (API queue param)
+ _my_wave_playlist_next_cursor: str | None = None # first_track_id for next playlist page
+ _my_wave_radio_started_sent: bool = False
@property
def client(self) -> YandexMusicClient:
raise ProviderUnavailableError("Provider not initialized")
return self._streaming
+ def _get_browse_names(self) -> dict[str, str]:
+ """Get locale-based browse folder names."""
+ try:
+ locale = (self.mass.metadata.locale or "en_US").lower()
+ use_russian = locale.startswith("ru")
+ except Exception:
+ use_russian = False
+ return BROWSE_NAMES_RU if use_russian else BROWSE_NAMES_EN
+
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
token = self.config.get_value(CONF_TOKEN)
name=name,
)
+ async def browse( # noqa: PLR0915
+ self, path: str
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ """Browse provider items with locale-based folder names and My Wave.
+
+ Root level shows My Wave, artists, albums, liked tracks, playlists. Names
+ are in Russian when MA locale is ru_*, otherwise in English. My Wave
+ tracks use item_id format track_id@station_id for rotor feedback.
+
+ :param path: The path to browse (e.g. provider_id:// or provider_id://artists).
+ """
+ if ProviderFeature.BROWSE not in self.supported_features:
+ raise NotImplementedError
+
+ path_parts = path.split("://")[1].split("/") if "://" in path else []
+ subpath = path_parts[0] if len(path_parts) > 0 else None
+ sub_subpath = path_parts[1] if len(path_parts) > 1 else None
+
+ if subpath == MY_WAVE_PLAYLIST_ID:
+ # Root my_wave: fetch up to 3 batches so Play adds more tracks.
+ # "Load more" uses single next batch.
+ max_batches = 3 if sub_subpath != "next" else 1
+ queue: str | int | None = None
+ if sub_subpath == "next":
+ queue = self._my_wave_last_track_id
+ elif sub_subpath:
+ queue = sub_subpath
+
+ all_tracks: list[Track | BrowseFolder] = []
+ last_batch_id: str | None = None
+ first_track_id_this_batch: str | None = None
+
+ for _ in range(max_batches):
+ yandex_tracks, batch_id = await self.client.get_my_wave_tracks(queue=queue)
+ if batch_id:
+ self._my_wave_batch_id = batch_id
+ last_batch_id = batch_id
+ if not self._my_wave_radio_started_sent and yandex_tracks:
+ self._my_wave_radio_started_sent = True
+ await self.client.send_rotor_station_feedback(
+ ROTOR_STATION_MY_WAVE,
+ "radioStarted",
+ batch_id=batch_id,
+ )
+ first_track_id_this_batch = None
+ for yt in yandex_tracks:
+ try:
+ t = parse_track(self, yt)
+ track_id = (
+ str(yt.id)
+ if hasattr(yt, "id") and yt.id
+ else getattr(yt, "track_id", None)
+ )
+ if track_id:
+ if first_track_id_this_batch is None:
+ first_track_id_this_batch = track_id
+ t.item_id = f"{track_id}{RADIO_TRACK_ID_SEP}{ROTOR_STATION_MY_WAVE}"
+ for pm in t.provider_mappings:
+ if pm.provider_instance == self.instance_id:
+ pm.item_id = t.item_id
+ break
+ all_tracks.append(t)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing My Wave track: %s", err)
+ if first_track_id_this_batch is not None:
+ self._my_wave_last_track_id = first_track_id_this_batch
+ if not batch_id or not yandex_tracks:
+ break
+ queue = first_track_id_this_batch
+
+ if last_batch_id:
+ names = self._get_browse_names()
+ next_name = "Ещё" if names is BROWSE_NAMES_RU else "Load more"
+ all_tracks.append(
+ BrowseFolder(
+ item_id="next",
+ provider=self.instance_id,
+ path=f"{path.rstrip('/')}/next",
+ name=next_name,
+ is_playable=False,
+ )
+ )
+ return all_tracks
+
+ if subpath:
+ return await super().browse(path)
+
+ names = self._get_browse_names()
+
+ folders: list[BrowseFolder] = []
+ base = path if path.endswith("//") else path.rstrip("/") + "/"
+ folders.append(
+ BrowseFolder(
+ item_id=MY_WAVE_PLAYLIST_ID,
+ provider=self.instance_id,
+ path=f"{base}{MY_WAVE_PLAYLIST_ID}",
+ name=names[MY_WAVE_PLAYLIST_ID],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="artists",
+ provider=self.instance_id,
+ path=f"{base}artists",
+ name=names["artists"],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="albums",
+ provider=self.instance_id,
+ path=f"{base}albums",
+ name=names["albums"],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="tracks",
+ provider=self.instance_id,
+ path=f"{base}tracks",
+ name=names["tracks"],
+ is_playable=True,
+ )
+ )
+ if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
+ folders.append(
+ BrowseFolder(
+ item_id="playlists",
+ provider=self.instance_id,
+ path=f"{base}playlists",
+ name=names["playlists"],
+ is_playable=True,
+ )
+ )
+ if len(folders) == 1:
+ return await self.browse(folders[0].path)
+ return folders
+
# Search
@use_cache(3600 * 24 * 14)
async def get_track(self, prov_track_id: str) -> Track:
"""Get track details by ID.
- :param prov_track_id: The provider track ID.
+ Supports composite item_id (track_id@station_id) for My Wave tracks;
+ only the track_id part is used for the API.
+
+ :param prov_track_id: The provider track ID (or track_id@station_id).
:return: Track object.
:raises MediaNotFoundError: If track not found.
"""
- yandex_track = await self.client.get_track(prov_track_id)
+ track_id, _ = _parse_radio_item_id(prov_track_id)
+ yandex_track = await self.client.get_track(track_id)
if not yandex_track:
raise MediaNotFoundError(f"Track {prov_track_id} not found")
return parse_track(self, yandex_track)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details by ID.
- :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind").
+ Supports virtual playlist MY_WAVE_PLAYLIST_ID (My Wave). Real playlists
+ use format "owner_id:kind".
+
+ :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind" or my_wave).
:return: Playlist object.
:raises MediaNotFoundError: If playlist not found.
"""
+ if prov_playlist_id == MY_WAVE_PLAYLIST_ID:
+ names = self._get_browse_names()
+ return Playlist(
+ item_id=MY_WAVE_PLAYLIST_ID,
+ provider=self.instance_id,
+ name=names[MY_WAVE_PLAYLIST_ID],
+ owner="Yandex Music",
+ provider_mappings={
+ ProviderMapping(
+ item_id=MY_WAVE_PLAYLIST_ID,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ is_unique=True,
+ )
+ },
+ is_editable=False,
+ )
+
# Parse the playlist ID (format: owner_id:kind)
if PLAYLIST_ID_SPLITTER in prov_playlist_id:
owner_id, kind = prov_playlist_id.split(PLAYLIST_ID_SPLITTER, 1)
raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
return parse_playlist(self, playlist)
+ async def _get_my_wave_playlist_tracks(self, page: int) -> list[Track]:
+ """Get My Wave tracks for virtual playlist (uncached; uses cursor for page > 0).
+
+ :param page: Page number (0 = first batch, 1+ = next batches via queue cursor).
+ :return: List of Track objects for this page.
+ """
+ queue: str | int | None = None
+ if page > 0:
+ queue = self._my_wave_playlist_next_cursor
+ if not queue:
+ return []
+ yandex_tracks, batch_id = await self.client.get_my_wave_tracks(queue=queue)
+ if batch_id:
+ self._my_wave_batch_id = batch_id
+ if not self._my_wave_radio_started_sent and yandex_tracks:
+ self._my_wave_radio_started_sent = True
+ await self.client.send_rotor_station_feedback(
+ ROTOR_STATION_MY_WAVE,
+ "radioStarted",
+ batch_id=batch_id,
+ )
+ first_track_id_this_batch = None
+ tracks = []
+ for yt in yandex_tracks:
+ try:
+ t = parse_track(self, yt)
+ track_id = (
+ str(yt.id) if hasattr(yt, "id") and yt.id else getattr(yt, "track_id", None)
+ )
+ if track_id:
+ if first_track_id_this_batch is None:
+ first_track_id_this_batch = track_id
+ t.item_id = f"{track_id}{RADIO_TRACK_ID_SEP}{ROTOR_STATION_MY_WAVE}"
+ for pm in t.provider_mappings:
+ if pm.provider_instance == self.instance_id:
+ pm.item_id = t.item_id
+ break
+ tracks.append(t)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing My Wave track: %s", err)
+ if first_track_id_this_batch is not None:
+ self._my_wave_playlist_next_cursor = first_track_id_this_batch
+ return tracks
+
# Get related items
@use_cache(3600 * 24 * 30)
self.logger.debug("Error parsing album track: %s", err)
return tracks
+ @use_cache(3600 * 3)
+ async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
+ """Get similar tracks using Yandex Rotor station for this track.
+
+ Uses rotor station track:{id} so MA radio mode gets Yandex recommendations.
+
+ :param prov_track_id: Provider track ID (plain or track_id@station_id).
+ :param limit: Maximum number of tracks to return.
+ :return: List of similar Track objects.
+ """
+ track_id, _ = _parse_radio_item_id(prov_track_id)
+ station_id = f"track:{track_id}"
+ yandex_tracks, _ = await self.client.get_rotor_station_tracks(station_id, queue=None)
+ tracks = []
+ for yt in yandex_tracks[:limit]:
+ try:
+ tracks.append(parse_track(self, yt))
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing similar track: %s", err)
+ return tracks
+
+ @use_cache(3600 * 3)
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get recommendations; includes My Wave (Моя волна) as first folder.
+
+ :return: List of recommendation folders (My Wave with first batch of tracks).
+ """
+ names = self._get_browse_names()
+ yandex_tracks, _ = await self.client.get_my_wave_tracks(queue=None)
+ items: list[Track] = []
+ for yt in yandex_tracks:
+ try:
+ t = parse_track(self, yt)
+ track_id = (
+ str(yt.id) if hasattr(yt, "id") and yt.id else getattr(yt, "track_id", None)
+ )
+ if track_id:
+ t.item_id = f"{track_id}{RADIO_TRACK_ID_SEP}{ROTOR_STATION_MY_WAVE}"
+ for pm in t.provider_mappings:
+ if pm.provider_instance == self.instance_id:
+ pm.item_id = t.item_id
+ break
+ items.append(t)
+ except InvalidDataError as err:
+ self.logger.debug("Error parsing My Wave track for recommendations: %s", err)
+ return [
+ RecommendationFolder(
+ item_id=MY_WAVE_PLAYLIST_ID,
+ provider=self.instance_id,
+ name=names[MY_WAVE_PLAYLIST_ID],
+ items=UniqueList(items),
+ icon="mdi-waveform",
+ )
+ ]
+
@use_cache(3600 * 3)
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks.
- :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind").
+ :param prov_playlist_id: The provider playlist ID (format: "owner_id:kind" or my_wave).
:param page: Page number for pagination.
:return: List of Track objects.
"""
+ if prov_playlist_id == MY_WAVE_PLAYLIST_ID:
+ return await self._get_my_wave_playlist_tracks(page)
+
# Yandex Music API returns all playlist tracks in one call (no server-side pagination).
# Return empty list for page > 0 so the controller pagination loop terminates.
if page > 0:
self.logger.debug("Error parsing library track: %s", err)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
- """Retrieve library playlists from Yandex Music."""
+ """Retrieve library playlists from Yandex Music.
+
+ Includes the virtual My Wave playlist first, then user playlists.
+ """
+ yield await self.get_playlist(MY_WAVE_PLAYLIST_ID)
playlists = await self.client.get_user_playlists()
for playlist in playlists:
try:
prov_item_id = self._get_provider_item_id(item)
if not prov_item_id:
return False
+ track_id, _ = _parse_radio_item_id(prov_item_id)
if item.media_type == MediaType.TRACK:
- return await self.client.like_track(prov_item_id)
+ return await self.client.like_track(track_id)
if item.media_type == MediaType.ALBUM:
return await self.client.like_album(prov_item_id)
if item.media_type == MediaType.ARTIST:
async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Remove item from library.
- :param prov_item_id: The provider item ID.
+ :param prov_item_id: The provider item ID (may be track_id@station_id for tracks).
:param media_type: The media type.
:return: True if successful.
"""
+ track_id, _ = _parse_radio_item_id(prov_item_id)
if media_type == MediaType.TRACK:
- return await self.client.unlike_track(prov_item_id)
+ return await self.client.unlike_track(track_id)
if media_type == MediaType.ALBUM:
return await self.client.unlike_album(prov_item_id)
if media_type == MediaType.ARTIST:
) -> StreamDetails:
"""Get stream details for a track.
- :param item_id: The track ID.
+ :param item_id: The track ID (or track_id@station_id for My Wave).
:param media_type: The media type (should be TRACK).
:return: StreamDetails for the track.
"""
return await self.streaming.get_stream_details(item_id)
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ prov_item_id: str,
+ fully_played: bool,
+ position: int,
+ media_item: MediaItemType,
+ is_playing: bool = False,
+ ) -> None:
+ """Report playback for rotor feedback when the track is from My Wave.
+
+ Sends trackStarted when the track is currently playing (is_playing=True).
+ trackFinished/skip are sent from on_streamed to use accurate seconds_streamed.
+ """
+ if media_type != MediaType.TRACK:
+ return
+ track_id, station_id = _parse_radio_item_id(prov_item_id)
+ if not station_id:
+ return
+ if is_playing:
+ await self.client.send_rotor_station_feedback(
+ station_id,
+ "trackStarted",
+ track_id=track_id,
+ batch_id=self._my_wave_batch_id,
+ )
+
+ async def on_streamed(self, streamdetails: StreamDetails) -> None:
+ """Report stream completion for My Wave rotor feedback.
+
+ Sends trackFinished or skip with actual seconds_streamed so Yandex
+ can improve recommendations.
+ """
+ track_id, station_id = _parse_radio_item_id(streamdetails.item_id)
+ if not station_id:
+ return
+ seconds = int(streamdetails.seconds_streamed or 0)
+ duration = streamdetails.duration or 0
+ feedback_type = "trackFinished" if duration and seconds >= max(0, duration - 10) else "skip"
+ await self.client.send_rotor_station_feedback(
+ station_id,
+ feedback_type,
+ track_id=track_id,
+ total_played_seconds=seconds,
+ batch_id=self._my_wave_batch_id,
+ )