From: Mikhail Nevskiy <139659391+trudenboy@users.noreply.github.com> Date: Tue, 10 Feb 2026 07:01:39 +0000 (+0300) Subject: Add Zvuk Music provider (#3090) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=e2cea742976640a55aaa057299e80adbf4d8238e;p=music-assistant-server.git Add Zvuk Music provider (#3090) * Add zvuk_music provider scaffold (manifest, constants, icons) Co-Authored-By: Claude Opus 4.5 * Add Zvuk Music API client wrapper Co-Authored-By: Claude Opus 4.5 * Add Zvuk Music model parsers Co-Authored-By: Claude Opus 4.5 * Add ZvukMusicProvider implementation Co-Authored-By: Claude Opus 4.5 * Update zvuk_music icons to official 2024 logo Co-Authored-By: Claude Opus 4.5 * Add Zvuk Music provider integration tests Co-Authored-By: Claude Opus 4.5 * Replace Zvuk Music integration tests with unit tests Remove integration tests that required real API tokens and a running MA server. Add comprehensive unit tests for parsers using mock objects, covering all parser functions (parse_artist, parse_album, parse_track, parse_playlist) with various scenarios. Co-Authored-By: Claude Opus 4.5 * Address PR #3090 review: decorator, duration, allow_seek - Refactor API error handling into @handle_zvuk_errors(not_found_return=...) decorator (api_client.py) - Add duration from get_track() and allow_seek=True in get_stream_details (provider.py) Co-authored-by: Cursor * docs(zvuk): clarify why get_track is needed for duration in get_stream_details Co-authored-by: Cursor --------- Co-authored-by: Михаил Невский Co-authored-by: Claude Opus 4.5 Co-authored-by: Marcel van der Veldt Co-authored-by: Cursor --- diff --git a/music_assistant/providers/zvuk_music/__init__.py b/music_assistant/providers/zvuk_music/__init__.py new file mode 100644 index 00000000..e873a5b5 --- /dev/null +++ b/music_assistant/providers/zvuk_music/__init__.py @@ -0,0 +1,98 @@ +"""Zvuk Music provider support for Music Assistant.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType +from music_assistant_models.enums import ConfigEntryType, ProviderFeature + +from .constants import ( + CONF_ACTION_CLEAR_AUTH, + CONF_QUALITY, + CONF_TOKEN, + QUALITY_HIGH, + QUALITY_LOSSLESS, +) +from .provider import ZvukMusicProvider + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + + +SUPPORTED_FEATURES = { + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_ALBUMS, + ProviderFeature.LIBRARY_TRACKS, + ProviderFeature.LIBRARY_PLAYLISTS, + ProviderFeature.ARTIST_ALBUMS, + ProviderFeature.ARTIST_TOPTRACKS, + ProviderFeature.SEARCH, + ProviderFeature.LIBRARY_ARTISTS_EDIT, + ProviderFeature.LIBRARY_ALBUMS_EDIT, + ProviderFeature.LIBRARY_TRACKS_EDIT, + ProviderFeature.LIBRARY_PLAYLISTS_EDIT, + ProviderFeature.PLAYLIST_TRACKS_EDIT, + ProviderFeature.PLAYLIST_CREATE, +} + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return ZvukMusicProvider(mass, manifest, config, SUPPORTED_FEATURES) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + if values is None: + values = {} + + # Handle clear auth action + if action == CONF_ACTION_CLEAR_AUTH: + values[CONF_TOKEN] = None + + # Check if user is authenticated + is_authenticated = bool(values.get(CONF_TOKEN)) + + return ( + ConfigEntry( + key=CONF_TOKEN, + type=ConfigEntryType.SECURE_STRING, + label="Zvuk Music Token", + description="Enter your Zvuk Music X-Auth-Token. " + "See the documentation for how to obtain it.", + required=True, + hidden=is_authenticated, + value=cast("str", values.get(CONF_TOKEN)) if values else None, + ), + ConfigEntry( + key=CONF_ACTION_CLEAR_AUTH, + type=ConfigEntryType.ACTION, + label="Reset authentication", + description="Clear the current authentication details.", + action=CONF_ACTION_CLEAR_AUTH, + hidden=not is_authenticated, + ), + ConfigEntry( + key=CONF_QUALITY, + type=ConfigEntryType.STRING, + label="Audio quality", + description="Select preferred audio quality.", + options=[ + ConfigValueOption("High (320 kbps)", QUALITY_HIGH), + ConfigValueOption("Lossless (FLAC)", QUALITY_LOSSLESS), + ], + default_value=QUALITY_HIGH, + ), + ) diff --git a/music_assistant/providers/zvuk_music/api_client.py b/music_assistant/providers/zvuk_music/api_client.py new file mode 100644 index 00000000..bee4304e --- /dev/null +++ b/music_assistant/providers/zvuk_music/api_client.py @@ -0,0 +1,489 @@ +"""API client wrapper for Zvuk Music.""" + +from __future__ import annotations + +import logging +from collections.abc import Awaitable, Callable +from typing import Any, ParamSpec, TypeVar, cast + +from music_assistant_models.errors import ( + LoginFailed, + ProviderUnavailableError, + ResourceTemporarilyUnavailable, +) +from zvuk_music import Artist as ZvukArtist +from zvuk_music import ClientAsync, Collection +from zvuk_music import CollectionItem as ZvukCollectionItem +from zvuk_music import Playlist as ZvukPlaylist +from zvuk_music import Release as ZvukRelease +from zvuk_music import Search as ZvukSearch +from zvuk_music import SimpleTrack as ZvukSimpleTrack +from zvuk_music import Stream as ZvukStream +from zvuk_music import Track as ZvukTrack +from zvuk_music.exceptions import ( + BadRequestError, + BotDetectedError, + GraphQLError, + NetworkError, + NotFoundError, + TimedOutError, + UnauthorizedError, +) + +from .constants import DEFAULT_LIMIT + +LOGGER = logging.getLogger(__name__) + +_P = ParamSpec("_P") +_R = TypeVar("_R") +_NOT_FOUND_SENTINEL: Any = object() + + +def handle_zvuk_errors( + not_found_return: Any = _NOT_FOUND_SENTINEL, +) -> Callable[[Callable[_P, Awaitable[_R]]], Callable[_P, Awaitable[_R]]]: + """Decorate async methods to map Zvuk API exceptions to MA errors. + + :param not_found_return: Value to return on NotFoundError (e.g. None or []). + If not provided, NotFoundError is not caught. + """ + + def decorator(func: Callable[_P, Awaitable[_R]]) -> Callable[_P, Awaitable[_R]]: + async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R: + try: + return await func(*args, **kwargs) + except UnauthorizedError as err: + raise LoginFailed("Invalid Zvuk Music token") from err + except (NetworkError, TimedOutError) as err: + LOGGER.error("Zvuk API error: %s", err) + raise ResourceTemporarilyUnavailable("Zvuk Music request failed") from err + except (BadRequestError, GraphQLError) as err: + LOGGER.error("Zvuk API error: %s", err) + raise ResourceTemporarilyUnavailable("Zvuk Music request failed") from err + except BotDetectedError as err: + raise ProviderUnavailableError("Bot detected by Zvuk") from err + except NotFoundError: + if not_found_return is _NOT_FOUND_SENTINEL: + raise + return cast("_R", not_found_return) + + return wrapper + + return decorator + + +class ZvukMusicClient: + """Wrapper around zvuk-music ClientAsync.""" + + def __init__(self, token: str) -> None: + """Initialize the Zvuk Music client. + + :param token: Zvuk Music X-Auth-Token. + """ + self._token = token + self._client: ClientAsync | None = None + self._user_id: str | None = None + + @property + def user_id(self) -> str: + """Return the user ID.""" + if self._user_id is None: + raise ProviderUnavailableError("Client not initialized, call connect() first") + return self._user_id + + async def connect(self) -> None: + """Initialize the client and verify token validity. + + :raises LoginFailed: If the token is invalid. + :raises ResourceTemporarilyUnavailable: If there is a network error. + """ + try: + self._client = await ClientAsync(token=self._token).init() + if not await self._client.is_authorized(): + raise LoginFailed("Invalid Zvuk Music token") + profile = await self._client.get_profile() + if profile and profile.result: + self._user_id = str(profile.result.id) + LOGGER.debug("Connected to Zvuk Music as user %s", self._user_id) + except UnauthorizedError as err: + raise LoginFailed("Invalid Zvuk Music token") from err + except (NetworkError, TimedOutError) as err: + msg = "Network error connecting to Zvuk Music" + raise ResourceTemporarilyUnavailable(msg) from err + + async def disconnect(self) -> None: + """Disconnect the client.""" + self._client = None + self._user_id = None + + def _ensure_connected(self) -> ClientAsync: + """Ensure the client is connected and return it.""" + if self._client is None: + raise ProviderUnavailableError("Client not connected, call connect() first") + return self._client + + # Search + + @handle_zvuk_errors(not_found_return=None) + async def search( + self, + query: str, + limit: int = DEFAULT_LIMIT, + *, + search_tracks: bool = True, + search_artists: bool = True, + search_releases: bool = True, + search_playlists: bool = True, + ) -> ZvukSearch | None: + """Search for tracks, albums, artists, or playlists. + + :param query: Search query string. + :param limit: Maximum number of results per type. + :param search_tracks: Whether to search for tracks. + :param search_artists: Whether to search for artists. + :param search_releases: Whether to search for releases. + :param search_playlists: Whether to search for playlists. + :return: Search results object or None. + """ + client = self._ensure_connected() + return await client.search( + query, + limit=limit, + tracks=search_tracks, + artists=search_artists, + releases=search_releases, + playlists=search_playlists, + podcasts=False, + episodes=False, + profiles=False, + books=False, + ) + + # Get single items + + @handle_zvuk_errors(not_found_return=None) + async def get_track(self, track_id: str) -> ZvukTrack | None: + """Get a single track by ID. + + :param track_id: Track ID. + :return: Track object or None if not found. + """ + client = self._ensure_connected() + return await client.get_track(track_id) + + @handle_zvuk_errors(not_found_return=[]) + async def get_tracks(self, track_ids: list[str]) -> list[ZvukTrack]: + """Get multiple tracks by IDs. + + :param track_ids: List of track IDs. + :return: List of track objects. + """ + client = self._ensure_connected() + ids: list[str | int] = list(track_ids) + return await client.get_tracks(ids) + + @handle_zvuk_errors(not_found_return=None) + async def get_release(self, release_id: str) -> ZvukRelease | None: + """Get a single release (album) by ID. + + :param release_id: Release ID. + :return: Release object or None if not found. + """ + client = self._ensure_connected() + return await client.get_release(release_id) + + @handle_zvuk_errors(not_found_return=[]) + async def get_releases(self, release_ids: list[str]) -> list[ZvukRelease]: + """Get multiple releases by IDs. + + :param release_ids: List of release IDs. + :return: List of release objects. + """ + client = self._ensure_connected() + ids: list[str | int] = list(release_ids) + return await client.get_releases(ids) + + @handle_zvuk_errors(not_found_return=None) + async def get_artist(self, artist_id: str) -> ZvukArtist | None: + """Get a single artist by ID. + + :param artist_id: Artist ID. + :return: Artist object or None if not found. + """ + client = self._ensure_connected() + return await client.get_artist(artist_id, with_description=True) + + @handle_zvuk_errors(not_found_return=[]) + async def get_artists(self, artist_ids: list[str]) -> list[ZvukArtist]: + """Get multiple artists by IDs. + + :param artist_ids: List of artist IDs. + :return: List of artist objects. + """ + client = self._ensure_connected() + ids: list[str | int] = list(artist_ids) + return await client.get_artists(ids) + + @handle_zvuk_errors(not_found_return=[]) + async def get_artist_releases( + self, artist_id: str, limit: int = DEFAULT_LIMIT + ) -> list[ZvukArtist]: + """Get artist's releases. + + :param artist_id: Artist ID. + :param limit: Maximum number of releases. + :return: List of artist objects with populated releases. + """ + client = self._ensure_connected() + return await client.get_artists([artist_id], with_releases=True, releases_limit=limit) + + @handle_zvuk_errors(not_found_return=[]) + async def get_artist_top_tracks( + self, artist_id: str, limit: int = DEFAULT_LIMIT + ) -> list[ZvukArtist]: + """Get artist's top tracks. + + :param artist_id: Artist ID. + :param limit: Maximum number of tracks. + :return: List of artist objects with populated popular_tracks. + """ + client = self._ensure_connected() + return await client.get_artists([artist_id], with_popular_tracks=True, tracks_limit=limit) + + # Playlists + + @handle_zvuk_errors(not_found_return=None) + async def get_playlist(self, playlist_id: str) -> ZvukPlaylist | None: + """Get a playlist by ID. + + :param playlist_id: Playlist ID. + :return: Playlist object or None if not found. + """ + client = self._ensure_connected() + return await client.get_playlist(playlist_id) + + @handle_zvuk_errors(not_found_return=[]) + async def get_playlists(self, playlist_ids: list[str]) -> list[ZvukPlaylist]: + """Get multiple playlists by IDs. + + :param playlist_ids: List of playlist IDs. + :return: List of playlist objects. + """ + client = self._ensure_connected() + ids: list[str | int] = list(playlist_ids) + return await client.get_playlists(ids) + + @handle_zvuk_errors(not_found_return=[]) + async def get_playlist_tracks( + self, playlist_id: str, limit: int = 50, offset: int = 0 + ) -> list[ZvukSimpleTrack]: + """Get playlist tracks. + + :param playlist_id: Playlist ID. + :param limit: Maximum number of tracks. + :param offset: Offset for pagination. + :return: List of SimpleTrack objects. + """ + client = self._ensure_connected() + return await client.get_playlist_tracks(playlist_id, limit=limit, offset=offset) + + # Streaming + + @handle_zvuk_errors(not_found_return=[]) + async def get_stream_urls(self, track_id: str) -> list[ZvukStream]: + """Get stream URLs for a track. + + :param track_id: Track ID. + :return: List of Stream objects. + """ + client = self._ensure_connected() + return await client.get_stream_urls(track_id) + + # Collection (Library) + + @handle_zvuk_errors() + async def get_collection(self) -> Collection | None: + """Get user's collection (liked items). + + :return: Collection object or None. + """ + client = self._ensure_connected() + return await client.get_collection() + + @handle_zvuk_errors(not_found_return=[]) + async def get_liked_tracks(self) -> list[ZvukTrack]: + """Get user's liked tracks. + + :return: List of full Track objects. + """ + client = self._ensure_connected() + return await client.get_liked_tracks() + + @handle_zvuk_errors(not_found_return=[]) + async def get_user_playlists(self) -> list[ZvukCollectionItem]: + """Get user's playlists. + + :return: List of CollectionItem objects with playlist IDs. + """ + client = self._ensure_connected() + return await client.get_user_playlists() + + # Library modifications + + async def like_track(self, track_id: str) -> bool: + """Add a track to liked tracks. + + :param track_id: Track ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.like_track(track_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error liking track %s: %s", track_id, err) + return False + + async def unlike_track(self, track_id: str) -> bool: + """Remove a track from liked tracks. + + :param track_id: Track ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.unlike_track(track_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error unliking track %s: %s", track_id, err) + return False + + async def like_release(self, release_id: str) -> bool: + """Add a release to liked releases. + + :param release_id: Release ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.like_release(release_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error liking release %s: %s", release_id, err) + return False + + async def unlike_release(self, release_id: str) -> bool: + """Remove a release from liked releases. + + :param release_id: Release ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.unlike_release(release_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error unliking release %s: %s", release_id, err) + return False + + async def like_artist(self, artist_id: str) -> bool: + """Add an artist to liked artists. + + :param artist_id: Artist ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.like_artist(artist_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error liking artist %s: %s", artist_id, err) + return False + + async def unlike_artist(self, artist_id: str) -> bool: + """Remove an artist from liked artists. + + :param artist_id: Artist ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.unlike_artist(artist_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error unliking artist %s: %s", artist_id, err) + return False + + async def like_playlist(self, playlist_id: str) -> bool: + """Add a playlist to liked playlists. + + :param playlist_id: Playlist ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.like_playlist(playlist_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error liking playlist %s: %s", playlist_id, err) + return False + + async def unlike_playlist(self, playlist_id: str) -> bool: + """Remove a playlist from liked playlists. + + :param playlist_id: Playlist ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.unlike_playlist(playlist_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error unliking playlist %s: %s", playlist_id, err) + return False + + # Playlist management + + @handle_zvuk_errors() + async def create_playlist(self, name: str, track_ids: list[str] | None = None) -> str: + """Create a new playlist. + + :param name: Playlist name. + :param track_ids: Optional list of track IDs to add. + :return: New playlist ID. + """ + client = self._ensure_connected() + return await client.create_playlist(name, track_ids=track_ids) + + async def delete_playlist(self, playlist_id: str) -> bool: + """Delete a playlist. + + :param playlist_id: Playlist ID. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.delete_playlist(playlist_id) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error deleting playlist %s: %s", playlist_id, err) + return False + + async def add_tracks_to_playlist(self, playlist_id: str, track_ids: list[str]) -> bool: + """Add tracks to a playlist. + + :param playlist_id: Playlist ID. + :param track_ids: List of track IDs to add. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.add_tracks_to_playlist(playlist_id, track_ids) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error adding tracks to playlist %s: %s", playlist_id, err) + return False + + async def update_playlist(self, playlist_id: str, track_ids: list[str]) -> bool: + """Update playlist tracks (used for removing tracks by providing remaining ones). + + :param playlist_id: Playlist ID. + :param track_ids: Complete list of track IDs the playlist should contain. + :return: True if successful. + """ + client = self._ensure_connected() + try: + return await client.update_playlist(playlist_id, track_ids) + except (BadRequestError, NetworkError, GraphQLError) as err: + LOGGER.error("Error updating playlist %s: %s", playlist_id, err) + return False diff --git a/music_assistant/providers/zvuk_music/constants.py b/music_assistant/providers/zvuk_music/constants.py new file mode 100644 index 00000000..40fb965f --- /dev/null +++ b/music_assistant/providers/zvuk_music/constants.py @@ -0,0 +1,26 @@ +"""Constants for the Zvuk Music provider.""" + +from __future__ import annotations + +from typing import Final + +# Configuration Keys +CONF_TOKEN: Final[str] = "token" +CONF_QUALITY: Final[str] = "quality" + +# Actions +CONF_ACTION_CLEAR_AUTH: Final[str] = "clear_auth" + +# API defaults +DEFAULT_LIMIT: Final[int] = 50 +PLAYLIST_TRACKS_PAGE_SIZE: Final[int] = 50 + +# Quality options +QUALITY_HIGH: Final[str] = "high" +QUALITY_LOSSLESS: Final[str] = "lossless" + +# Image sizes +IMAGE_SIZE_LARGE: Final[int] = 600 + +# URLs +ZVUK_BASE_URL: Final[str] = "https://zvuk.com" diff --git a/music_assistant/providers/zvuk_music/icon.svg b/music_assistant/providers/zvuk_music/icon.svg new file mode 100644 index 00000000..596830af --- /dev/null +++ b/music_assistant/providers/zvuk_music/icon.svg @@ -0,0 +1,3 @@ + + + diff --git a/music_assistant/providers/zvuk_music/icon_monochrome.svg b/music_assistant/providers/zvuk_music/icon_monochrome.svg new file mode 100644 index 00000000..af067b86 --- /dev/null +++ b/music_assistant/providers/zvuk_music/icon_monochrome.svg @@ -0,0 +1,3 @@ + + + diff --git a/music_assistant/providers/zvuk_music/manifest.json b/music_assistant/providers/zvuk_music/manifest.json new file mode 100644 index 00000000..08c0f769 --- /dev/null +++ b/music_assistant/providers/zvuk_music/manifest.json @@ -0,0 +1,11 @@ +{ + "type": "music", + "domain": "zvuk_music", + "stage": "beta", + "name": "Zvuk Music", + "description": "Stream music from Zvuk Music service.", + "codeowners": ["@TrudenBoy"], + "documentation": "https://music-assistant.io/music-providers/zvuk/", + "requirements": ["zvuk-music[async]==0.5.3"], + "multi_instance": true +} diff --git a/music_assistant/providers/zvuk_music/parsers.py b/music_assistant/providers/zvuk_music/parsers.py new file mode 100644 index 00000000..7fde9b69 --- /dev/null +++ b/music_assistant/providers/zvuk_music/parsers.py @@ -0,0 +1,311 @@ +"""Parsers for Zvuk Music API responses.""" + +from __future__ import annotations + +from contextlib import suppress +from datetime import datetime +from typing import TYPE_CHECKING + +from music_assistant_models.enums import ( + AlbumType, + ContentType, + ImageType, +) +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + MediaItemImage, + Playlist, + ProviderMapping, + Track, + UniqueList, +) + +from music_assistant.helpers.util import parse_title_and_version + +from .constants import IMAGE_SIZE_LARGE, ZVUK_BASE_URL + +if TYPE_CHECKING: + from zvuk_music import Artist as ZvukArtist + from zvuk_music import Playlist as ZvukPlaylist + from zvuk_music import Release as ZvukRelease + from zvuk_music import Track as ZvukTrack + from zvuk_music.models.artist import SimpleArtist as ZvukSimpleArtist + from zvuk_music.models.common import Image as ZvukImage + from zvuk_music.models.playlist import SimplePlaylist as ZvukSimplePlaylist + from zvuk_music.models.release import SimpleRelease as ZvukSimpleRelease + from zvuk_music.models.track import SimpleTrack as ZvukSimpleTrack + + from .provider import ZvukMusicProvider + + +def _get_image_url(image: ZvukImage | None, size: int = IMAGE_SIZE_LARGE) -> str | None: + """Convert Zvuk Image to full URL. + + :param image: Zvuk Image object. + :param size: Image size in pixels. + :return: Full image URL or None. + """ + if not image: + return None + url = image.get_url(size, size) + return url if url else None + + +def parse_artist(provider: ZvukMusicProvider, artist_obj: ZvukArtist | ZvukSimpleArtist) -> Artist: + """Parse Zvuk artist object to MA Artist model. + + :param provider: The Zvuk Music provider instance. + :param artist_obj: Zvuk artist or SimpleArtist object. + :return: Music Assistant Artist model. + """ + artist_id = str(artist_obj.id) + artist = Artist( + item_id=artist_id, + provider=provider.instance_id, + name=artist_obj.title or "Unknown Artist", + provider_mappings={ + ProviderMapping( + item_id=artist_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + url=f"{ZVUK_BASE_URL}/artist/{artist_id}", + ) + }, + ) + + if artist_obj.image: + image_url = _get_image_url(artist_obj.image) + if image_url: + artist.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + + return artist + + +def parse_album(provider: ZvukMusicProvider, release_obj: ZvukRelease | ZvukSimpleRelease) -> Album: + """Parse Zvuk release object to MA Album model. + + :param provider: The Zvuk Music provider instance. + :param release_obj: Zvuk release or SimpleRelease object. + :return: Music Assistant Album model. + """ + name, version = parse_title_and_version( + release_obj.title or "Unknown Album", + ) + album_id = str(release_obj.id) + + album = Album( + item_id=album_id, + provider=provider.instance_id, + name=name, + version=version, + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + audio_format=AudioFormat( + content_type=ContentType.UNKNOWN, + ), + url=f"{ZVUK_BASE_URL}/release/{album_id}", + ) + }, + ) + + # Parse artists + if release_obj.artists: + for artist in release_obj.artists: + album.artists.append(parse_artist(provider, artist)) + + # Determine album type from ReleaseType + if release_obj.type: + release_type_value = ( + release_obj.type.value if hasattr(release_obj.type, "value") else str(release_obj.type) + ) + if release_type_value == "compilation": + album.album_type = AlbumType.COMPILATION + elif release_type_value == "single": + album.album_type = AlbumType.SINGLE + elif release_type_value == "ep": + album.album_type = AlbumType.EP + else: + album.album_type = AlbumType.ALBUM + else: + album.album_type = AlbumType.ALBUM + + # Parse date + if release_obj.date: + # get_year() is available on both Release and SimpleRelease + year = release_obj.get_year() + if year: + album.year = year + with suppress(ValueError): + album.metadata.release_date = datetime.fromisoformat(release_obj.date) + + # Parse genres (only available on full Release, not SimpleRelease) + if hasattr(release_obj, "genres") and release_obj.genres: + album.metadata.genres = {genre.name for genre in release_obj.genres if genre.name} + + # Parse explicit flag + if release_obj.explicit: + album.metadata.explicit = True + + # Add cover image + if release_obj.image: + image_url = _get_image_url(release_obj.image) + if image_url: + album.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + + return album + + +def parse_track(provider: ZvukMusicProvider, track_obj: ZvukTrack | ZvukSimpleTrack) -> Track: + """Parse Zvuk track object to MA Track model. + + :param provider: The Zvuk Music provider instance. + :param track_obj: Zvuk track or SimpleTrack object. + :return: Music Assistant Track model. + """ + name, version = parse_title_and_version( + track_obj.title or "Unknown Track", + ) + track_id = str(track_obj.id) + + # Duration is already in seconds in Zvuk API + duration = track_obj.duration or 0 + + track = Track( + item_id=track_id, + provider=provider.instance_id, + name=name, + version=version, + duration=duration, + provider_mappings={ + ProviderMapping( + item_id=track_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + audio_format=AudioFormat( + content_type=ContentType.UNKNOWN, + ), + url=f"{ZVUK_BASE_URL}/track/{track_id}", + ) + }, + ) + + # Parse artists + if track_obj.artists: + track.artists = UniqueList() + for artist in track_obj.artists: + track.artists.append(parse_artist(provider, artist)) + + # Parse album from release (available on both Track and SimpleTrack) + if track_obj.release: + track.album = provider.get_item_mapping( + media_type="album", + key=str(track_obj.release.id), + name=track_obj.release.title or "Unknown Album", + ) + # Get image from release + if track_obj.release.image: + image_url = _get_image_url(track_obj.release.image) + if image_url: + track.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + + # Track number (position in release, only on full Track) + if hasattr(track_obj, "position") and track_obj.position is not None: + track.track_number = track_obj.position + + # Explicit flag (boolean on both Track and SimpleTrack) + if track_obj.explicit: + track.metadata.explicit = True + + return track + + +def parse_playlist( + provider: ZvukMusicProvider, playlist_obj: ZvukPlaylist | ZvukSimplePlaylist +) -> Playlist: + """Parse Zvuk playlist object to MA Playlist model. + + :param provider: The Zvuk Music provider instance. + :param playlist_obj: Zvuk playlist or SimplePlaylist object. + :return: Music Assistant Playlist model. + """ + playlist_id = str(playlist_obj.id) + + # Determine if editable (user owns the playlist) + # user_id is only available on full Playlist, not SimplePlaylist + is_editable = False + owner_name = "Zvuk Music" + user_id = getattr(playlist_obj, "user_id", None) + if user_id and provider.client.user_id: + is_editable = str(user_id) == str(provider.client.user_id) + if is_editable: + owner_name = "Me" + + playlist = Playlist( + item_id=playlist_id, + provider=provider.instance_id, + name=playlist_obj.title or "Unknown Playlist", + owner=owner_name, + provider_mappings={ + ProviderMapping( + item_id=playlist_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + url=f"{ZVUK_BASE_URL}/playlist/{playlist_id}", + is_unique=is_editable, + ) + }, + is_editable=is_editable, + ) + + # Metadata + if playlist_obj.description: + playlist.metadata.description = playlist_obj.description + + # Add cover image + if playlist_obj.image: + image_url = _get_image_url(playlist_obj.image) + if image_url: + playlist.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + + return playlist diff --git a/music_assistant/providers/zvuk_music/provider.py b/music_assistant/providers/zvuk_music/provider.py new file mode 100644 index 00000000..b5ea30dd --- /dev/null +++ b/music_assistant/providers/zvuk_music/provider.py @@ -0,0 +1,536 @@ +"""Zvuk Music provider implementation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.enums import ContentType, MediaType, StreamType +from music_assistant_models.errors import ( + InvalidDataError, + LoginFailed, + MediaNotFoundError, + ProviderUnavailableError, +) +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + ItemMapping, + MediaItemType, + Playlist, + SearchResults, + Track, +) +from music_assistant_models.streamdetails import StreamDetails +from zvuk_music.enums import Quality +from zvuk_music.exceptions import QualityNotAvailableError, SubscriptionRequiredError + +from music_assistant.controllers.cache import use_cache +from music_assistant.models.music_provider import MusicProvider + +from .api_client import ZvukMusicClient +from .constants import ( + CONF_QUALITY, + CONF_TOKEN, + DEFAULT_LIMIT, + PLAYLIST_TRACKS_PAGE_SIZE, + QUALITY_LOSSLESS, +) +from .parsers import parse_album, parse_artist, parse_playlist, parse_track + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + + +class ZvukMusicProvider(MusicProvider): + """Implementation of a Zvuk Music MusicProvider.""" + + _client: ZvukMusicClient | None = None + + @property + def client(self) -> ZvukMusicClient: + """Return the Zvuk Music client.""" + if self._client is None: + raise ProviderUnavailableError("Provider not initialized") + return self._client + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + token = self.config.get_value(CONF_TOKEN) + if not token: + raise LoginFailed("No Zvuk Music token provided") + + self._client = ZvukMusicClient(str(token)) + await self._client.connect() + self.logger.info("Successfully connected to Zvuk Music") + + async def unload(self, is_removed: bool = False) -> None: + """Handle unload/close of the provider. + + :param is_removed: Whether the provider is being removed. + """ + if self._client: + await self._client.disconnect() + self._client = None + await super().unload(is_removed) + + def get_item_mapping(self, media_type: MediaType | str, key: str, name: str) -> ItemMapping: + """Create a generic item mapping. + + :param media_type: The media type. + :param key: The item ID. + :param name: The item name. + :return: An ItemMapping instance. + """ + if isinstance(media_type, str): + media_type = MediaType(media_type) + return ItemMapping( + media_type=media_type, + item_id=key, + provider=self.instance_id, + name=name, + ) + + # Search + + @use_cache(3600 * 24 * 14) + async def search( + self, search_query: str, media_types: list[MediaType], limit: int = 5 + ) -> SearchResults: + """Perform search on Zvuk Music. + + :param search_query: The search query. + :param media_types: List of media types to search for. + :param limit: Maximum number of results per type. + :return: SearchResults with found items. + """ + result = SearchResults() + + search_result = await self.client.search( + search_query, + limit=limit, + search_tracks=MediaType.TRACK in media_types, + search_artists=MediaType.ARTIST in media_types, + search_releases=MediaType.ALBUM in media_types, + search_playlists=MediaType.PLAYLIST in media_types, + ) + if not search_result: + return result + + # Parse tracks + if MediaType.TRACK in media_types and search_result.tracks: + for track in search_result.tracks.items[:limit]: + try: + result.tracks = [*result.tracks, parse_track(self, track)] + except InvalidDataError as err: + self.logger.debug("Error parsing track: %s", err) + + # Parse albums (Zvuk releases) + if MediaType.ALBUM in media_types and search_result.releases: + for release in search_result.releases.items[:limit]: + try: + result.albums = [*result.albums, parse_album(self, release)] + except InvalidDataError as err: + self.logger.debug("Error parsing album: %s", err) + + # Parse artists + if MediaType.ARTIST in media_types and search_result.artists: + for artist in search_result.artists.items[:limit]: + try: + result.artists = [*result.artists, parse_artist(self, artist)] + except InvalidDataError as err: + self.logger.debug("Error parsing artist: %s", err) + + # Parse playlists + if MediaType.PLAYLIST in media_types and search_result.playlists: + for playlist in search_result.playlists.items[:limit]: + try: + result.playlists = [*result.playlists, parse_playlist(self, playlist)] + except InvalidDataError as err: + self.logger.debug("Error parsing playlist: %s", err) + + return result + + # Get single items + + @use_cache(3600 * 24 * 30) + async def get_artist(self, prov_artist_id: str) -> Artist: + """Get artist details by ID. + + :param prov_artist_id: The provider artist ID. + :return: Artist object. + :raises MediaNotFoundError: If artist not found. + """ + artist = await self.client.get_artist(prov_artist_id) + if not artist: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") + return parse_artist(self, artist) + + @use_cache(3600 * 24 * 30) + async def get_album(self, prov_album_id: str) -> Album: + """Get album details by ID. + + :param prov_album_id: The provider album ID. + :return: Album object. + :raises MediaNotFoundError: If album not found. + """ + release = await self.client.get_release(prov_album_id) + if not release: + raise MediaNotFoundError(f"Album {prov_album_id} not found") + return parse_album(self, release) + + @use_cache(3600 * 24 * 30) + async def get_track(self, prov_track_id: str) -> Track: + """Get track details by ID. + + :param prov_track_id: The provider track ID. + :return: Track object. + :raises MediaNotFoundError: If track not found. + """ + track = await self.client.get_track(prov_track_id) + if not track: + raise MediaNotFoundError(f"Track {prov_track_id} not found") + return parse_track(self, track) + + @use_cache(3600 * 24 * 30) + async def get_playlist(self, prov_playlist_id: str) -> Playlist: + """Get playlist details by ID. + + :param prov_playlist_id: The provider playlist ID. + :return: Playlist object. + :raises MediaNotFoundError: If playlist not found. + """ + playlist = await self.client.get_playlist(prov_playlist_id) + if not playlist: + raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") + return parse_playlist(self, playlist) + + # Get related items + + @use_cache(3600 * 24 * 30) + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Get album tracks. + + :param prov_album_id: The provider album ID. + :return: List of Track objects. + """ + release = await self.client.get_release(prov_album_id) + if not release or not release.tracks: + return [] + + tracks = [] + for index, track in enumerate(release.tracks): + try: + parsed_track = parse_track(self, track) + parsed_track.disc_number = 1 + parsed_track.track_number = index + 1 + tracks.append(parsed_track) + except InvalidDataError as err: + self.logger.debug("Error parsing album track: %s", err) + return tracks + + @use_cache(3600 * 3) + async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: + """Get playlist tracks. + + :param prov_playlist_id: The provider playlist ID. + :param page: Page number for pagination. + :return: List of Track objects. + """ + offset = page * PLAYLIST_TRACKS_PAGE_SIZE + simple_tracks = await self.client.get_playlist_tracks( + prov_playlist_id, limit=PLAYLIST_TRACKS_PAGE_SIZE, offset=offset + ) + if not simple_tracks: + return [] + + # Fetch full track details from SimpleTrack IDs + track_ids = [str(t.id) for t in simple_tracks if t.id] + if not track_ids: + return [] + + full_tracks = await self.client.get_tracks(track_ids) + tracks = [] + for track in full_tracks: + try: + tracks.append(parse_track(self, track)) + except InvalidDataError as err: + self.logger.debug("Error parsing playlist track: %s", err) + return tracks + + @use_cache(3600 * 24 * 7) + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Get artist's albums. + + :param prov_artist_id: The provider artist ID. + :return: List of Album objects. + """ + artists = await self.client.get_artist_releases(prov_artist_id, limit=DEFAULT_LIMIT) + if not artists: + return [] + + result = [] + for artist in artists: + for release in artist.releases: + try: + result.append(parse_album(self, release)) + except InvalidDataError as err: + self.logger.debug("Error parsing artist album: %s", err) + return result + + @use_cache(3600 * 24 * 7) + async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: + """Get artist's top tracks. + + :param prov_artist_id: The provider artist ID. + :return: List of Track objects. + """ + artists = await self.client.get_artist_top_tracks(prov_artist_id, limit=DEFAULT_LIMIT) + if not artists: + return [] + + result = [] + for artist in artists: + for track in artist.popular_tracks: + try: + result.append(parse_track(self, track)) + except InvalidDataError as err: + self.logger.debug("Error parsing artist track: %s", err) + return result + + # Library methods + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve library artists from Zvuk Music.""" + collection = await self.client.get_collection() + if not collection or not collection.artists: + return + + artist_ids = [str(item.id) for item in collection.artists if item.id] + for i in range(0, len(artist_ids), DEFAULT_LIMIT): + batch_ids = artist_ids[i : i + DEFAULT_LIMIT] + artists = await self.client.get_artists(batch_ids) + for artist in artists: + try: + yield parse_artist(self, artist) + except InvalidDataError as err: + self.logger.debug("Error parsing library artist: %s", err) + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """Retrieve library albums from Zvuk Music.""" + collection = await self.client.get_collection() + if not collection or not collection.releases: + return + + release_ids = [str(item.id) for item in collection.releases if item.id] + for i in range(0, len(release_ids), DEFAULT_LIMIT): + batch_ids = release_ids[i : i + DEFAULT_LIMIT] + releases = await self.client.get_releases(batch_ids) + for release in releases: + try: + yield parse_album(self, release) + except InvalidDataError as err: + self.logger.debug("Error parsing library album: %s", err) + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """Retrieve library tracks from Zvuk Music.""" + tracks = await self.client.get_liked_tracks() + for track in tracks: + try: + yield parse_track(self, track) + except InvalidDataError as err: + self.logger.debug("Error parsing library track: %s", err) + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Retrieve library playlists from Zvuk Music.""" + collection_items = await self.client.get_user_playlists() + if not collection_items: + return + + playlist_ids = [str(item.id) for item in collection_items if item.id] + for i in range(0, len(playlist_ids), DEFAULT_LIMIT): + batch_ids = playlist_ids[i : i + DEFAULT_LIMIT] + playlists = await self.client.get_playlists(batch_ids) + for playlist in playlists: + try: + yield parse_playlist(self, playlist) + except InvalidDataError as err: + self.logger.debug("Error parsing library playlist: %s", err) + + # Library edit methods + + async def library_add(self, item: MediaItemType) -> bool: + """Add item to library. + + :param item: The media item to add. + :return: True if successful. + """ + prov_item_id = self._get_provider_item_id(item) + if not prov_item_id: + return False + + if item.media_type == MediaType.TRACK: + return await self.client.like_track(prov_item_id) + if item.media_type == MediaType.ALBUM: + return await self.client.like_release(prov_item_id) + if item.media_type == MediaType.ARTIST: + return await self.client.like_artist(prov_item_id) + if item.media_type == MediaType.PLAYLIST: + return await self.client.like_playlist(prov_item_id) + return False + + async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool: + """Remove item from library. + + :param prov_item_id: The provider item ID. + :param media_type: The media type. + :return: True if successful. + """ + if media_type == MediaType.TRACK: + return await self.client.unlike_track(prov_item_id) + if media_type == MediaType.ALBUM: + return await self.client.unlike_release(prov_item_id) + if media_type == MediaType.ARTIST: + return await self.client.unlike_artist(prov_item_id) + if media_type == MediaType.PLAYLIST: + return await self.client.unlike_playlist(prov_item_id) + return False + + def _get_provider_item_id(self, item: MediaItemType) -> str | None: + """Get provider item ID from media item.""" + for mapping in item.provider_mappings: + if mapping.provider_instance == self.instance_id: + return mapping.item_id + return item.item_id if item.provider == self.instance_id else None + + # Playlist management + + async def create_playlist(self, name: str) -> Playlist: + """Create a new playlist. + + :param name: Playlist name. + :return: The created Playlist object. + """ + playlist_id = await self.client.create_playlist(name) + playlist = await self.client.get_playlist(playlist_id) + if not playlist: + raise MediaNotFoundError(f"Created playlist {playlist_id} not found") + return parse_playlist(self, playlist) + + async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: + """Add tracks to a playlist. + + :param prov_playlist_id: The provider playlist ID. + :param prov_track_ids: List of track IDs to add. + """ + await self.client.add_tracks_to_playlist(prov_playlist_id, prov_track_ids) + + async def remove_playlist_tracks( + self, prov_playlist_id: str, positions_to_remove: tuple[int, ...] + ) -> None: + """Remove tracks from a playlist by position. + + :param prov_playlist_id: The provider playlist ID. + :param positions_to_remove: Tuple of track positions (0-based) to remove. + """ + # Fetch current tracks and filter out the ones at given positions + simple_tracks = await self.client.get_playlist_tracks(prov_playlist_id, limit=10000) + remove_positions = set(positions_to_remove) + remaining_ids = [ + str(t.id) for i, t in enumerate(simple_tracks) if t.id and i not in remove_positions + ] + await self.client.update_playlist(prov_playlist_id, remaining_ids) + + # Streaming + + async def get_stream_details( # noqa: PLR0915 + self, item_id: str, media_type: MediaType = MediaType.TRACK + ) -> StreamDetails: + """Get stream details for a track. + + :param item_id: The track ID. + :param media_type: The media type (should be TRACK). + :return: StreamDetails for the track. + :raises MediaNotFoundError: If stream URL cannot be obtained. + """ + streams = await self.client.get_stream_urls(item_id) + if not streams: + raise MediaNotFoundError(f"No stream info available for track {item_id}") + + stream = streams[0] + quality_pref = self.config.get_value(CONF_QUALITY) + quality_str = str(quality_pref) if quality_pref is not None else QUALITY_LOSSLESS + + # Select quality with fallback chain + url: str | None = None + content_type = ContentType.UNKNOWN + bitrate = 0 + + if quality_str == QUALITY_LOSSLESS: + # Try FLAC -> HIGH -> MID + for quality in (Quality.FLAC, Quality.HIGH, Quality.MID): + try: + url = stream.get_url(quality) + if quality == Quality.FLAC: + content_type = ContentType.FLAC + bitrate = 0 + elif quality == Quality.HIGH: + content_type = ContentType.MP3 + bitrate = 320 + else: + content_type = ContentType.MP3 + bitrate = 128 + break + except (SubscriptionRequiredError, QualityNotAvailableError): + continue + else: + # High quality: try HIGH -> MID + for quality in (Quality.HIGH, Quality.MID): + try: + url = stream.get_url(quality) + if quality == Quality.HIGH: + content_type = ContentType.MP3 + bitrate = 320 + else: + content_type = ContentType.MP3 + bitrate = 128 + break + except (SubscriptionRequiredError, QualityNotAvailableError): + continue + + # Ultimate fallback + if not url: + best_quality, url = stream.get_best_available() + if best_quality == Quality.FLAC: + content_type = ContentType.FLAC + bitrate = 0 + elif best_quality == Quality.HIGH: + content_type = ContentType.MP3 + bitrate = 320 + else: + content_type = ContentType.MP3 + bitrate = 128 + + if not url: + raise MediaNotFoundError(f"No stream URL available for track {item_id}") + + # zvuk-music Stream model (get_stream_urls) has no duration; only expire and URLs. + # Fetch track for duration so StreamDetails can expose it (e.g. for progress/seeking). + track = await self.client.get_track(item_id) + duration: int | None = None + if track is not None and getattr(track, "duration", None) is not None: + duration = int(track.duration) + + return StreamDetails( + item_id=item_id, + provider=self.instance_id, + audio_format=AudioFormat( + content_type=content_type, + bit_rate=bitrate, + ), + stream_type=StreamType.HTTP, + path=url, + duration=duration, + allow_seek=True, + can_seek=True, + ) diff --git a/requirements_all.txt b/requirements_all.txt index 8f8d0f00..6ff3cb07 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -78,3 +78,4 @@ xmltodict==1.0.2 yandex-music==2.2.0 ytmusicapi==1.11.5 zeroconf==0.148.0 +zvuk-music[async]==0.5.3 diff --git a/tests/providers/zvuk_music/__init__.py b/tests/providers/zvuk_music/__init__.py new file mode 100644 index 00000000..350955c6 --- /dev/null +++ b/tests/providers/zvuk_music/__init__.py @@ -0,0 +1 @@ +"""Tests for the Zvuk Music provider.""" diff --git a/tests/providers/zvuk_music/test_parsers.py b/tests/providers/zvuk_music/test_parsers.py new file mode 100644 index 00000000..d23117e0 --- /dev/null +++ b/tests/providers/zvuk_music/test_parsers.py @@ -0,0 +1,536 @@ +"""Tests for Zvuk Music parsers.""" + +from __future__ import annotations + +from unittest.mock import Mock + +import pytest +from music_assistant_models.enums import AlbumType, ImageType + +from music_assistant.providers.zvuk_music.parsers import ( + parse_album, + parse_artist, + parse_playlist, + parse_track, +) + + +def _create_mock_image(template: str = "https://zvuk.com/image/{width}x{height}") -> Mock: + """Create a mock Zvuk Image object. + + :param template: URL template with {width} and {height} placeholders. + :return: Mock Image object. + """ + image = Mock() + image.get_url = Mock( + side_effect=lambda w, h: template.format(width=w, height=h) if template else None + ) + return image + + +def _create_mock_artist( + artist_id: int = 123, + title: str | None = "Test Artist", + image: Mock | None = None, +) -> Mock: + """Create a mock Zvuk artist object. + + :param artist_id: Artist ID. + :param title: Artist name. + :param image: Optional mock image object. + :return: Mock artist object. + """ + artist = Mock() + artist.id = artist_id + artist.title = title + artist.image = image + return artist + + +def _create_mock_release( + release_id: int = 456, + title: str | None = "Test Album", + artists: list[Mock] | None = None, + release_type: str | None = None, + date: str | None = None, + explicit: bool = False, + genres: list[Mock] | None = None, + image: Mock | None = None, +) -> Mock: + """Create a mock Zvuk release object. + + :param release_id: Release ID. + :param title: Album title. + :param artists: List of mock artist objects. + :param release_type: Release type (album, single, ep, compilation). + :param date: Release date in ISO format. + :param explicit: Whether the release is explicit. + :param genres: List of mock genre objects. + :param image: Optional mock image object. + :return: Mock release object. + """ + release = Mock() + release.id = release_id + release.title = title + release.artists = artists or [] + release.explicit = explicit + release.image = image + release.date = date + + # Type handling + if release_type: + type_mock = Mock() + type_mock.value = release_type + release.type = type_mock + else: + release.type = None + + # get_year method + if date: + release.get_year = Mock(return_value=int(date[:4])) + else: + release.get_year = Mock(return_value=None) + + # Genres (only on full Release) + if genres is not None: + release.genres = genres + else: + # SimpleRelease doesn't have genres + del release.genres + + return release + + +def _create_mock_track( + track_id: int = 789, + title: str | None = "Test Track", + artists: list[Mock] | None = None, + release: Mock | None = None, + duration: int = 180, + position: int | None = None, + explicit: bool = False, +) -> Mock: + """Create a mock Zvuk track object. + + :param track_id: Track ID. + :param title: Track title. + :param artists: List of mock artist objects. + :param release: Mock release object. + :param duration: Track duration in seconds. + :param position: Track position in album. + :param explicit: Whether the track is explicit. + :return: Mock track object. + """ + track = Mock() + track.id = track_id + track.title = title + track.artists = artists or [] + track.release = release + track.duration = duration + track.explicit = explicit + + # Position is only on full Track, not SimpleTrack + if position is not None: + track.position = position + else: + del track.position + + return track + + +def _create_mock_playlist( + playlist_id: int = 999, + title: str | None = "Test Playlist", + description: str | None = None, + user_id: int | None = None, + image: Mock | None = None, +) -> Mock: + """Create a mock Zvuk playlist object. + + :param playlist_id: Playlist ID. + :param title: Playlist title. + :param description: Playlist description. + :param user_id: User ID (owner). + :param image: Optional mock image object. + :return: Mock playlist object. + """ + playlist = Mock() + playlist.id = playlist_id + playlist.title = title + playlist.description = description + playlist.image = image + + # user_id is only on full Playlist, not SimplePlaylist + if user_id is not None: + playlist.user_id = user_id + else: + del playlist.user_id + + return playlist + + +@pytest.fixture +def mock_provider() -> Mock: + """Create a mock ZvukMusicProvider.""" + provider = Mock() + provider.instance_id = "zvuk_music_test" + provider.domain = "zvuk_music" + provider.client = Mock() + provider.client.user_id = 12345 + + def mock_get_item_mapping( + media_type: str, # noqa: ARG001 + key: str, + name: str, + ) -> Mock: + mapping = Mock() + mapping.item_id = key + mapping.name = name + return mapping + + provider.get_item_mapping = Mock(side_effect=mock_get_item_mapping) + return provider + + +class TestParseArtist: + """Tests for parse_artist function.""" + + def test_parse_artist_basic(self, mock_provider: Mock) -> None: + """Test parsing a basic artist without image.""" + artist_obj = _create_mock_artist(artist_id=123, title="Test Artist") + + result = parse_artist(mock_provider, artist_obj) + + assert result.item_id == "123" + assert result.name == "Test Artist" + assert result.provider == "zvuk_music_test" + assert len(result.provider_mappings) == 1 + + mapping = next(iter(result.provider_mappings)) + assert mapping.item_id == "123" + assert mapping.provider_domain == "zvuk_music" + assert mapping.provider_instance == "zvuk_music_test" + assert mapping.url == "https://zvuk.com/artist/123" + + def test_parse_artist_with_image(self, mock_provider: Mock) -> None: + """Test parsing an artist with image.""" + image = _create_mock_image("https://zvuk.com/img/{width}x{height}.jpg") + artist_obj = _create_mock_artist(artist_id=456, title="Artist With Image", image=image) + + result = parse_artist(mock_provider, artist_obj) + + assert result.item_id == "456" + assert result.name == "Artist With Image" + assert result.metadata.images is not None + assert len(result.metadata.images) == 1 + assert result.metadata.images[0].type == ImageType.THUMB + assert result.metadata.images[0].path == "https://zvuk.com/img/600x600.jpg" + assert result.metadata.images[0].remotely_accessible is True + + def test_parse_artist_unknown_name(self, mock_provider: Mock) -> None: + """Test parsing an artist with missing title defaults to Unknown Artist.""" + artist_obj = _create_mock_artist(artist_id=789, title=None) + + result = parse_artist(mock_provider, artist_obj) + + assert result.name == "Unknown Artist" + + +class TestParseAlbum: + """Tests for parse_album function.""" + + def test_parse_album_basic(self, mock_provider: Mock) -> None: + """Test parsing a basic album.""" + release_obj = _create_mock_release(release_id=456, title="Test Album") + + result = parse_album(mock_provider, release_obj) + + assert result.item_id == "456" + assert result.name == "Test Album" + assert result.provider == "zvuk_music_test" + assert result.album_type == AlbumType.ALBUM + + mapping = next(iter(result.provider_mappings)) + assert mapping.url == "https://zvuk.com/release/456" + + def test_parse_album_type_single(self, mock_provider: Mock) -> None: + """Test parsing an album with type single.""" + release_obj = _create_mock_release( + release_id=1, title="Single Track", release_type="single" + ) + + result = parse_album(mock_provider, release_obj) + + assert result.album_type == AlbumType.SINGLE + + def test_parse_album_type_ep(self, mock_provider: Mock) -> None: + """Test parsing an album with type EP.""" + release_obj = _create_mock_release(release_id=2, title="EP Release", release_type="ep") + + result = parse_album(mock_provider, release_obj) + + assert result.album_type == AlbumType.EP + + def test_parse_album_type_compilation(self, mock_provider: Mock) -> None: + """Test parsing an album with type compilation.""" + release_obj = _create_mock_release( + release_id=3, title="Greatest Hits", release_type="compilation" + ) + + result = parse_album(mock_provider, release_obj) + + assert result.album_type == AlbumType.COMPILATION + + def test_parse_album_with_date(self, mock_provider: Mock) -> None: + """Test parsing an album with release date.""" + release_obj = _create_mock_release(release_id=456, title="Album 2023", date="2023-06-15") + + result = parse_album(mock_provider, release_obj) + + assert result.year == 2023 + assert result.metadata.release_date is not None + assert result.metadata.release_date.year == 2023 + assert result.metadata.release_date.month == 6 + assert result.metadata.release_date.day == 15 + + def test_parse_album_explicit(self, mock_provider: Mock) -> None: + """Test parsing an explicit album.""" + release_obj = _create_mock_release(release_id=456, title="Explicit Album", explicit=True) + + result = parse_album(mock_provider, release_obj) + + assert result.metadata.explicit is True + + def test_parse_album_with_artists(self, mock_provider: Mock) -> None: + """Test parsing an album with artists.""" + artists = [ + _create_mock_artist(artist_id=1, title="Artist One"), + _create_mock_artist(artist_id=2, title="Artist Two"), + ] + release_obj = _create_mock_release( + release_id=456, title="Collaboration Album", artists=artists + ) + + result = parse_album(mock_provider, release_obj) + + assert len(result.artists) == 2 + assert result.artists[0].name == "Artist One" + assert result.artists[1].name == "Artist Two" + + def test_parse_album_with_genres(self, mock_provider: Mock) -> None: + """Test parsing an album with genres (full Release only).""" + genre1 = Mock() + genre1.name = "Rock" + genre2 = Mock() + genre2.name = "Alternative" + release_obj = _create_mock_release( + release_id=456, title="Rock Album", genres=[genre1, genre2] + ) + + result = parse_album(mock_provider, release_obj) + + assert result.metadata.genres == {"Rock", "Alternative"} + + def test_parse_album_with_image(self, mock_provider: Mock) -> None: + """Test parsing an album with cover image.""" + image = _create_mock_image("https://zvuk.com/cover/{width}x{height}.jpg") + release_obj = _create_mock_release(release_id=456, title="Album With Cover", image=image) + + result = parse_album(mock_provider, release_obj) + + assert result.metadata.images is not None + assert len(result.metadata.images) == 1 + assert result.metadata.images[0].path == "https://zvuk.com/cover/600x600.jpg" + + def test_parse_album_with_version_in_title(self, mock_provider: Mock) -> None: + """Test parsing an album with version in title.""" + release_obj = _create_mock_release(release_id=456, title="Album Name (Deluxe Edition)") + + result = parse_album(mock_provider, release_obj) + + assert result.name == "Album Name" + assert result.version == "Deluxe Edition" + + +class TestParseTrack: + """Tests for parse_track function.""" + + def test_parse_track_basic(self, mock_provider: Mock) -> None: + """Test parsing a basic track.""" + track_obj = _create_mock_track(track_id=789, title="Test Track", duration=180) + + result = parse_track(mock_provider, track_obj) + + assert result.item_id == "789" + assert result.name == "Test Track" + assert result.duration == 180 + assert result.provider == "zvuk_music_test" + + mapping = next(iter(result.provider_mappings)) + assert mapping.url == "https://zvuk.com/track/789" + + def test_parse_track_with_artists(self, mock_provider: Mock) -> None: + """Test parsing a track with artists.""" + artists = [ + _create_mock_artist(artist_id=1, title="Singer"), + _create_mock_artist(artist_id=2, title="Featuring Artist"), + ] + track_obj = _create_mock_track(track_id=789, title="Track", artists=artists) + + result = parse_track(mock_provider, track_obj) + + assert len(result.artists) == 2 + assert result.artists[0].name == "Singer" + assert result.artists[1].name == "Featuring Artist" + + def test_parse_track_with_release(self, mock_provider: Mock) -> None: + """Test parsing a track with album (release) information.""" + release = Mock() + release.id = 456 + release.title = "Test Album" + release.image = _create_mock_image("https://zvuk.com/cover/{width}x{height}.jpg") + + track_obj = _create_mock_track(track_id=789, title="Track", release=release) + + result = parse_track(mock_provider, track_obj) + + assert result.album is not None + assert result.album.item_id == "456" + assert result.album.name == "Test Album" + mock_provider.get_item_mapping.assert_called_with( + media_type="album", + key="456", + name="Test Album", + ) + + def test_parse_track_with_release_image(self, mock_provider: Mock) -> None: + """Test parsing a track gets image from release.""" + release = Mock() + release.id = 456 + release.title = "Test Album" + release.image = _create_mock_image("https://zvuk.com/cover/{width}x{height}.jpg") + + track_obj = _create_mock_track(track_id=789, title="Track", release=release) + + result = parse_track(mock_provider, track_obj) + + assert result.metadata.images is not None + assert len(result.metadata.images) == 1 + assert result.metadata.images[0].path == "https://zvuk.com/cover/600x600.jpg" + + def test_parse_track_explicit(self, mock_provider: Mock) -> None: + """Test parsing an explicit track.""" + track_obj = _create_mock_track(track_id=789, title="Explicit Track", explicit=True) + + result = parse_track(mock_provider, track_obj) + + assert result.metadata.explicit is True + + def test_parse_track_with_position(self, mock_provider: Mock) -> None: + """Test parsing a track with position (full Track only).""" + track_obj = _create_mock_track(track_id=789, title="Track 5", position=5) + + result = parse_track(mock_provider, track_obj) + + assert result.track_number == 5 + + def test_parse_track_with_version_in_title(self, mock_provider: Mock) -> None: + """Test parsing a track with version in title.""" + track_obj = _create_mock_track(track_id=789, title="Song Name (Acoustic Version)") + + result = parse_track(mock_provider, track_obj) + + assert result.name == "Song Name" + assert result.version == "Acoustic Version" + + def test_parse_track_unknown_name(self, mock_provider: Mock) -> None: + """Test parsing a track with missing title defaults to Unknown Track.""" + track_obj = _create_mock_track(track_id=789, title=None) + + result = parse_track(mock_provider, track_obj) + + assert result.name == "Unknown Track" + + +class TestParsePlaylist: + """Tests for parse_playlist function.""" + + def test_parse_playlist_basic(self, mock_provider: Mock) -> None: + """Test parsing a basic playlist.""" + playlist_obj = _create_mock_playlist(playlist_id=999, title="Test Playlist") + + result = parse_playlist(mock_provider, playlist_obj) + + assert result.item_id == "999" + assert result.name == "Test Playlist" + assert result.provider == "zvuk_music_test" + assert result.owner == "Zvuk Music" + assert result.is_editable is False + + mapping = next(iter(result.provider_mappings)) + assert mapping.url == "https://zvuk.com/playlist/999" + assert mapping.is_unique is False + + def test_parse_playlist_editable(self, mock_provider: Mock) -> None: + """Test parsing a user-owned playlist (is_editable=True).""" + playlist_obj = _create_mock_playlist( + playlist_id=999, + title="My Playlist", + user_id=12345, # Same as mock_provider.client.user_id + ) + + result = parse_playlist(mock_provider, playlist_obj) + + assert result.is_editable is True + assert result.owner == "Me" + + mapping = next(iter(result.provider_mappings)) + assert mapping.is_unique is True + + def test_parse_playlist_not_editable(self, mock_provider: Mock) -> None: + """Test parsing another user's playlist (is_editable=False).""" + playlist_obj = _create_mock_playlist( + playlist_id=999, + title="Their Playlist", + user_id=99999, # Different from mock_provider.client.user_id + ) + + result = parse_playlist(mock_provider, playlist_obj) + + assert result.is_editable is False + assert result.owner == "Zvuk Music" + + def test_parse_playlist_with_description(self, mock_provider: Mock) -> None: + """Test parsing a playlist with description.""" + playlist_obj = _create_mock_playlist( + playlist_id=999, + title="Playlist", + description="A great collection of songs", + ) + + result = parse_playlist(mock_provider, playlist_obj) + + assert result.metadata.description == "A great collection of songs" + + def test_parse_playlist_with_image(self, mock_provider: Mock) -> None: + """Test parsing a playlist with cover image.""" + image = _create_mock_image("https://zvuk.com/playlist/{width}x{height}.jpg") + playlist_obj = _create_mock_playlist( + playlist_id=999, + title="Playlist With Cover", + image=image, + ) + + result = parse_playlist(mock_provider, playlist_obj) + + assert result.metadata.images is not None + assert len(result.metadata.images) == 1 + assert result.metadata.images[0].path == "https://zvuk.com/playlist/600x600.jpg" + + def test_parse_playlist_unknown_name(self, mock_provider: Mock) -> None: + """Test parsing a playlist with missing title defaults to Unknown Playlist.""" + playlist_obj = _create_mock_playlist(playlist_id=999, title=None) + + result = parse_playlist(mock_provider, playlist_obj) + + assert result.name == "Unknown Playlist"