additional_dependencies:
- tomli
- - repo: local
- hooks:
- - id: pylint
- name: pylint
- entry: script/run-in-env.sh pylint -j 0
- language: script
- types: [python]
- files: ^hass_client/.+\.py$
+ # - repo: local
+ # hooks:
+ # - id: pylint
+ # name: pylint
+ # entry: script/run-in-env.sh pylint -j 0
+ # language: script
+ # types: [python]
+ # files: ^music_assistant/.+\.py$
- - id: mypy
- name: mypy
- entry: script/run-in-env.sh mypy
- language: script
- types: [python]
- files: ^hass_client/.+\.py$
+ # - id: mypy
+ # name: mypy
+ # entry: script/run-in-env.sh mypy
+ # language: script
+ # types: [python]
+ # files: ^music_assistant/.+\.py$
- repo: local
hooks:
-"""Music Assistant: The music library manager in python."""
+"""Music Assistant Client: Manage a Music Assistant server remotely."""
+from .client import MusicAssistantClient # noqa: F401
--- /dev/null
+"""Music Assistant Client: Manage a Music Assistant server remotely."""
+from __future__ import annotations
+
+import asyncio
+import logging
+import urllib.parse
+import uuid
+from collections.abc import Callable
+from types import TracebackType
+from typing import TYPE_CHECKING, Any
+
+from music_assistant.client.exceptions import ConnectionClosed, InvalidServerVersion, InvalidState
+from music_assistant.common.models.api import (
+ ChunkedResultMessage,
+ CommandMessage,
+ ErrorResultMessage,
+ EventMessage,
+ ResultMessageBase,
+ ServerInfoMessage,
+ SuccessResultMessage,
+ parse_message,
+)
+from music_assistant.common.models.enums import EventType
+from music_assistant.common.models.errors import ERROR_MAP
+from music_assistant.common.models.event import MassEvent
+from music_assistant.common.models.media_items import MediaItemImage
+from music_assistant.constants import SCHEMA_VERSION
+
+from .connection import WebsocketsConnection
+from .music import Music
+from .players import Players
+
+if TYPE_CHECKING:
+ from aiohttp import ClientSession
+
+EventCallBackType = Callable[[MassEvent], None]
+EventSubscriptionType = tuple[
+ EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
+]
+
+
+class MusicAssistantClient:
+ """Manage a Music Assistant server remotely."""
+
+ def __init__(self, server_url: str, aiohttp_session: ClientSession | None) -> None:
+ """Initialize the Music Assistant client."""
+ self.server_url = server_url
+ self.connection = WebsocketsConnection(server_url, aiohttp_session)
+ self.logger = logging.getLogger(__package__)
+ self._result_futures: dict[str, asyncio.Future] = {}
+ self._subscribers: list[EventSubscriptionType] = list()
+ self._stop_called: bool = False
+ self._loop: asyncio.AbstractEventLoop | None = None
+ self._players = Players(self)
+ self._music = Music(self)
+ # below items are retrieved after connect
+ self._server_info: ServerInfoMessage | None = None
+
+ @property
+ def server_info(self) -> ServerInfoMessage | None:
+ """Return info of the server we're currently connected to."""
+ return self._server_info
+
+ @property
+ def players(self) -> Players:
+ """Return Players handler."""
+ return self._players
+
+ @property
+ def music(self) -> Music:
+ """Return Music handler."""
+ return self._music
+
+ def get_image_url(self, image: MediaItemImage) -> str:
+ """Get (proxied) URL for MediaItemImage."""
+ if image.provider != "url":
+ # return imageproxy url for images that need to be resolved
+ # the original path is double encoded
+ encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
+ return f"{self.server_info.base_url}/imageproxy?path={encoded_url}&provider={image.provider}" # noqa: E501
+ return image.path
+
+ def subscribe(
+ self,
+ cb_func: EventCallBackType,
+ event_filter: EventType | tuple[EventType] | None = None,
+ id_filter: str | tuple[str] | None = None,
+ ) -> Callable:
+ """Add callback to event listeners.
+
+ Returns function to remove the listener.
+ :param cb_func: callback function or coroutine
+ :param event_filter: Optionally only listen for these events
+ :param id_filter: Optionally only listen for these id's (player_id, queue_id, uri)
+ """
+ if isinstance(event_filter, EventType):
+ event_filter = (event_filter,)
+ if isinstance(id_filter, str):
+ id_filter = (id_filter,)
+ listener = (cb_func, event_filter, id_filter)
+ self._subscribers.append(listener)
+
+ def remove_listener():
+ self._subscribers.remove(listener)
+
+ return remove_listener
+
+ async def connect(self) -> None:
+ """Connect to the remote Music Assistant Server."""
+ self._loop = asyncio.get_running_loop()
+ if self.connection.connected:
+ # already connected
+ return
+ # NOTE: connect will raise when connecting failed
+ result = await self.connection.connect()
+ info = ServerInfoMessage.from_dict(result)
+
+ # basic check for server schema version compatibility
+ if info.min_supported_schema_version > SCHEMA_VERSION:
+ # our schema version is too low and can't be handled by the server anymore.
+ await self.connection.disconnect()
+ raise InvalidServerVersion(
+ f"Schema version is incompatible: {info.schema_version}, "
+ f"the server requires at least {info.min_supported_schema_version} "
+ " - update the Music Assistant client to a more "
+ "recent version or downgrade the server."
+ )
+
+ self._server_info = info
+
+ self.logger.info(
+ "Connected to Music Assistant Server %s using %s, Version %s, Schema Version %s",
+ info.server_id,
+ self.connection.__class__.__name__,
+ info.server_version,
+ info.schema_version,
+ )
+
+ async def send_command(
+ self,
+ command: str,
+ require_schema: int | None = None,
+ **kwargs: Any,
+ ) -> Any:
+ """Send a command and get a response."""
+ if not self.connection.connected or not self._loop:
+ raise InvalidState("Not connected")
+
+ if (
+ require_schema is not None
+ and self.server_info is not None
+ and require_schema > self.server_info.schema_version
+ ):
+ raise InvalidServerVersion(
+ "Command not available due to incompatible server version. Update the Music "
+ f"Assistant Server to a version that supports at least api schema {require_schema}."
+ )
+
+ command_message = CommandMessage(
+ message_id=uuid.uuid4().hex,
+ command=command,
+ args=kwargs,
+ )
+ future: asyncio.Future[Any] = self._loop.create_future()
+ self._result_futures[command_message.message_id] = future
+ await self.connection.send_message(command_message.to_dict())
+ try:
+ return await future
+ finally:
+ self._result_futures.pop(command_message.message_id)
+
+ async def send_command_no_wait(
+ self,
+ command: str,
+ require_schema: int | None = None,
+ **kwargs: Any,
+ ) -> None:
+ """Send a command without waiting for the response."""
+ if not self.server_info:
+ raise InvalidState("Not connected")
+
+ if require_schema is not None and require_schema > self.server_info.schema_version:
+ raise InvalidServerVersion(
+ "Command not available due to incompatible server version. Update the Music "
+ f"Assistant Server to a version that supports at least api schema {require_schema}."
+ )
+ command_message = CommandMessage(
+ message_id=uuid.uuid4().hex,
+ command=command,
+ args=kwargs,
+ )
+ await self.connection.send_message(command_message.to_dict())
+
+ async def start_listening(self, init_ready: asyncio.Event | None = None) -> None:
+ """Connect (if needed) and start listening to incoming messages from the server."""
+ await self.connect()
+
+ # fetch initial state
+ # we do this in a separate task to not block reading messages
+ async def fetch_initial_state():
+ await self._players.fetch_state()
+
+ if init_ready is not None:
+ init_ready.set()
+
+ asyncio.create_task(fetch_initial_state())
+
+ try:
+ # keep reading incoming messages
+ while not self._stop_called:
+ msg = await self.connection.receive_message()
+ self._handle_incoming_message(msg)
+ except ConnectionClosed:
+ pass
+ finally:
+ await self.disconnect()
+
+ async def disconnect(self) -> None:
+ """Disconnect the client and cleanup."""
+ self._stop_called = True
+ # cancel all command-tasks awaiting a result
+ for future in self._result_futures.values():
+ future.cancel()
+ await self.connection.disconnect()
+
+ def _handle_incoming_message(self, raw: dict[str, Any]) -> None:
+ """
+ Handle incoming message.
+
+ Run all async tasks in a wrapper to log appropriately.
+ """
+ msg = parse_message(raw)
+ # handle result message
+ if isinstance(msg, ResultMessageBase):
+ future = self._result_futures.get(msg.message_id)
+
+ if future is None:
+ # no listener for this result
+ return
+ if isinstance(msg, ChunkedResultMessage):
+ # handle chunked response (for very large objects)
+ if not hasattr(future, "intermediate_result"):
+ future.intermediate_result = []
+ future.intermediate_result += msg.result
+ if msg.is_last_chunk:
+ future.set_result(future.intermediate_result)
+ return
+ if isinstance(msg, SuccessResultMessage):
+ future.set_result(msg.result)
+ return
+ if isinstance(msg, ErrorResultMessage):
+ exc = ERROR_MAP[msg.error_code]
+ future.set_exception(exc(msg.details))
+ return
+
+ # handle EventMessage
+ if isinstance(msg, EventMessage):
+ self.logger.debug("Received event: %s", msg)
+ self._handle_event(msg)
+ return
+
+ # Log anything we can't handle here
+ self.logger.debug(
+ "Received message with unknown type '%s': %s",
+ type(msg),
+ msg,
+ )
+
+ def _handle_event(self, event: MassEvent) -> None:
+ """Forward event to subscribers."""
+ if self._stop_called:
+ return
+
+ for cb_func, event_filter, id_filter in self._subscribers:
+ if not (event_filter is None or event.event in event_filter):
+ continue
+ if not (id_filter is None or event.object_id in id_filter):
+ continue
+ if asyncio.iscoroutinefunction(cb_func):
+ asyncio.run_coroutine_threadsafe(cb_func(event), self._loop)
+ else:
+ self._loop.call_soon_threadsafe(cb_func, event)
+
+ async def __aenter__(self) -> MusicAssistantClient:
+ """Initialize and connect the connection to the Music Assistant Server."""
+ await self.connect()
+ return self
+
+ async def __aexit__(
+ self, exc_type: Exception, exc_value: str, traceback: TracebackType
+ ) -> None:
+ """Disconnect from the server and exit."""
+ await self.disconnect()
+
+ def __repr__(self) -> str:
+ """Return the representation."""
+ conn_type = self.connection.__class__.__name__
+ prefix = "" if self.connection.connected else "not "
+ return f"{type(self).__name__}(connection={conn_type}, {prefix}connected)"
--- /dev/null
+""""Connect o a remote Music Assistant Server using the default Websocket API."""
+from __future__ import annotations
+
+import logging
+import pprint
+from typing import Any
+
+from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType, client_exceptions
+
+from music_assistant.client.exceptions import (
+ CannotConnect,
+ ConnectionClosed,
+ ConnectionFailed,
+ InvalidMessage,
+ InvalidState,
+ NotConnected,
+)
+from music_assistant.common.helpers.json import json_dumps, json_loads
+
+LOGGER = logging.getLogger(f"{__package__}.connection")
+
+
+def get_websocket_url(url: str) -> str:
+ """Extract Websocket URL from (base) Music Assistant URL."""
+ if not url or "://" not in url:
+ raise RuntimeError(f"{url} is not a valid url")
+ ws_url = url.replace("http", "ws")
+ if not ws_url.endswith("/ws"):
+ ws_url += "/ws"
+ return ws_url.replace("//ws", "/ws")
+
+
+class WebsocketsConnection:
+ """Websockets connection to a Music Assistant Server."""
+
+ def __init__(self, server_url: str, aiohttp_session: ClientSession | None) -> None:
+ """Initialize."""
+ self.ws_server_url = get_websocket_url(server_url)
+ self._aiohttp_session_provided = aiohttp_session is not None
+ self._aiohttp_session = aiohttp_session or ClientSession()
+ self._ws_client: ClientWebSocketResponse | None = None
+
+ @property
+ def connected(self) -> bool:
+ """Return if we're currently connected."""
+ return self._ws_client is not None and not self._ws_client.closed
+
+ async def connect(self) -> dict[str, Any]:
+ """Connect to the websocket server and return the first message (server info)."""
+ if self._aiohttp_session is None:
+ self._aiohttp_session = ClientSession()
+ if self._ws_client is not None:
+ raise InvalidState("Already connected")
+
+ LOGGER.debug("Trying to connect")
+ try:
+ self._ws_client = await self._aiohttp_session.ws_connect(
+ self.ws_server_url,
+ heartbeat=55,
+ compress=15,
+ max_msg_size=0,
+ )
+ # receive first server info message
+ return await self.receive_message()
+ except (
+ client_exceptions.WSServerHandshakeError,
+ client_exceptions.ClientError,
+ ) as err:
+ raise CannotConnect(err) from err
+
+ async def disconnect(self) -> None:
+ """Disconnect the client."""
+ LOGGER.debug("Closing client connection")
+ if self._ws_client is not None and not self._ws_client.closed:
+ await self._ws_client.close()
+ self._ws_client = None
+ if self._aiohttp_session and not self._aiohttp_session_provided:
+ await self._aiohttp_session.close()
+ self._aiohttp_session = None
+
+ async def receive_message(self) -> dict[str, Any]:
+ """Receive the next message from the server (or raise on error)."""
+ assert self._ws_client
+ ws_msg = await self._ws_client.receive()
+
+ if ws_msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
+ raise ConnectionClosed("Connection was closed.")
+
+ if ws_msg.type == WSMsgType.ERROR:
+ raise ConnectionFailed()
+
+ if ws_msg.type != WSMsgType.TEXT:
+ raise InvalidMessage(f"Received non-Text message: {ws_msg.type}")
+
+ try:
+ msg = json_loads(ws_msg.data)
+ except TypeError as err:
+ raise InvalidMessage(f"Received unsupported JSON: {err}") from err
+ except ValueError as err:
+ raise InvalidMessage("Received invalid JSON.") from err
+
+ if LOGGER.isEnabledFor(logging.DEBUG):
+ LOGGER.debug("Received message:\n%s\n", pprint.pformat(ws_msg))
+
+ return msg
+
+ async def send_message(self, message: dict[str, Any]) -> None:
+ """
+ Send a message to the server.
+
+ Raises NotConnected if client not connected.
+ """
+ if not self.connected:
+ raise NotConnected
+
+ if LOGGER.isEnabledFor(logging.DEBUG):
+ LOGGER.debug("Publishing message:\n%s\n", pprint.pformat(message))
+
+ assert self._ws_client
+ assert isinstance(message, dict)
+
+ await self._ws_client.send_json(message, dumps=json_dumps)
+
+ def __repr__(self) -> str:
+ """Return the representation."""
+ prefix = "" if self.connected else "not "
+ return f"{type(self).__name__}(ws_server_url={self.ws_server_url!r}, {prefix}connected)"
--- /dev/null
+"""Client-specific Exceptions for Music Assistant."""
+from __future__ import annotations
+
+
+class MusicAssistantClientException(Exception):
+ """Generic MusicAssistant exception."""
+
+
+class TransportError(MusicAssistantClientException):
+ """Exception raised to represent transport errors."""
+
+ def __init__(self, message: str, error: Exception | None = None) -> None:
+ """Initialize a transport error."""
+ super().__init__(message)
+ self.error = error
+
+
+class ConnectionClosed(TransportError):
+ """Exception raised when the connection is closed."""
+
+
+class CannotConnect(TransportError):
+ """Exception raised when failed to connect the client."""
+
+ def __init__(self, error: Exception) -> None:
+ """Initialize a cannot connect error."""
+ super().__init__(f"{error}", error)
+
+
+class ConnectionFailed(TransportError):
+ """Exception raised when an established connection fails."""
+
+ def __init__(self, error: Exception | None = None) -> None:
+ """Initialize a connection failed error."""
+ if error is None:
+ super().__init__("Connection failed.")
+ return
+ super().__init__(f"{error}", error)
+
+
+class NotConnected(MusicAssistantClientException):
+ """Exception raised when not connected to client."""
+
+
+class InvalidState(MusicAssistantClientException):
+ """Exception raised when data gets in invalid state."""
+
+
+class InvalidMessage(MusicAssistantClientException):
+ """Exception raised when an invalid message is received."""
+
+
+class InvalidServerVersion(MusicAssistantClientException):
+ """Exception raised when connected to server with incompatible version."""
--- /dev/null
+"""Handle Music/library related endpoints for Music Assistant."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from music_assistant.common.models.enums import MediaType
+from music_assistant.common.models.media_items import (
+ Album,
+ Artist,
+ BrowseFolder,
+ MediaItemType,
+ PagedItems,
+ Playlist,
+ Radio,
+ SearchResults,
+ Track,
+ media_from_dict,
+)
+from music_assistant.common.models.provider import SyncTask
+
+if TYPE_CHECKING:
+ from .client import MusicAssistantClient
+
+
+class Music:
+ """Music(library) related endpoints/data for Music Assistant."""
+
+ def __init__(self, client: MusicAssistantClient) -> None:
+ """Handle Initialization."""
+ self.client = client
+
+ # Tracks related endpoints/commands
+
+ async def get_tracks(
+ self,
+ in_library: bool | None = None,
+ search: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ order_by: str | None = None,
+ ) -> PagedItems:
+ """Get Track listing from the server."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/tracks",
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ ),
+ Track,
+ )
+
+ async def get_track(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ album: str | None = None,
+ ) -> Track:
+ """Get single Track from the server."""
+ return Track.from_dict(
+ await self.client.send_command(
+ "music/track",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ album=album,
+ ),
+ )
+
+ async def get_track_versions(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Track]:
+ """Get all other versions for given Track from the server."""
+ return [
+ Track.from_dict(item)
+ for item in await self.client.send_command(
+ "music/track/versions",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ async def get_track_albums(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Album]:
+ """Get all (known) albums this track is featured on."""
+ return [
+ Album.from_dict(item)
+ for item in await self.client.send_command(
+ "music/track/albums",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ async def get_track_preview_url(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> str:
+ """Get URL to preview clip of given track."""
+ return await self.client.send_command(
+ "music/track/preview",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+
+ # Albums related endpoints/commands
+
+ async def get_albums(
+ self,
+ in_library: bool | None = None,
+ search: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ order_by: str | None = None,
+ ) -> PagedItems:
+ """Get Albums listing from the server."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/albums",
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ ),
+ Album,
+ )
+
+ async def get_album(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ ) -> Album:
+ """Get single Album from the server."""
+ return Album.from_dict(
+ await self.client.send_command(
+ "music/album",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ ),
+ )
+
+ async def get_album_tracks(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Track]:
+ """Get tracks for given album."""
+ return [
+ Track.from_dict(item)
+ for item in await self.client.send_command(
+ "music/album/tracks",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ async def get_album_versions(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Album]:
+ """Get all other versions for given Album from the server."""
+ return [
+ Album.from_dict(item)
+ for item in await self.client.send_command(
+ "music/album/versions",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ # Artist related endpoints/commands
+
+ async def get_artists(
+ self,
+ in_library: bool | None = None,
+ search: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ order_by: str | None = None,
+ ) -> PagedItems:
+ """Get Artists listing from the server."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/artists",
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ ),
+ Artist,
+ )
+
+ async def get_album_artists(
+ self,
+ in_library: bool | None = None,
+ search: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ order_by: str | None = None,
+ ) -> PagedItems:
+ """Get AlbumArtists listing from the server."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/albumartists",
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ ),
+ Artist,
+ )
+
+ async def get_artist(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ ) -> Artist:
+ """Get single Artist from the server."""
+ return Artist.from_dict(
+ await self.client.send_command(
+ "music/artist",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ ),
+ )
+
+ async def get_artist_tracks(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Track]:
+ """Get (top)tracks for given artist."""
+ return [
+ Artist.from_dict(item)
+ for item in await self.client.send_command(
+ "music/artist/tracks",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ async def get_artist_albums(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Album]:
+ """Get (top)albums for given artist."""
+ return [
+ Album.from_dict(item)
+ for item in await self.client.send_command(
+ "music/artist/albums",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ # Playlist related endpoints/commands
+
+ async def get_playlists(
+ self,
+ in_library: bool | None = None,
+ search: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ order_by: str | None = None,
+ ) -> PagedItems:
+ """Get Playlists listing from the server."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/playlists",
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ ),
+ Playlist,
+ )
+
+ async def get_playlist(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ ) -> Playlist:
+ """Get single Playlist from the server."""
+ return Playlist.from_dict(
+ await self.client.send_command(
+ "music/playlist",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ ),
+ )
+
+ async def get_playlist_tracks(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Track]:
+ """Get tracks for given playlist."""
+ return [
+ Track.from_dict(item)
+ for item in await self.client.send_command(
+ "music/playlist/tracks",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None:
+ """Add multiple tracks to playlist. Creates background tasks to process the action."""
+ await self.client.send_command(
+ "music/playlist/tracks/add",
+ db_playlist_id=db_playlist_id,
+ uris=uris,
+ )
+
+ async def remove_playlist_tracks(
+ self, db_playlist_id: str | int, positions_to_remove: tuple[int, ...]
+ ) -> None:
+ """Remove multiple tracks from playlist."""
+ await self.client.send_command(
+ "music/playlist/tracks/add",
+ db_playlist_id=db_playlist_id,
+ positions_to_remove=positions_to_remove,
+ )
+
+ async def create_playlist(
+ self, name: str, provider_instance_or_domain: str | None = None
+ ) -> Playlist:
+ """Create new playlist."""
+ return Playlist.from_dict(
+ await self.client.send_command(
+ "music/playlist/create",
+ name=name,
+ provider_instance_or_domain=provider_instance_or_domain,
+ )
+ )
+
+ # Radio related endpoints/commands
+
+ async def get_radios(
+ self,
+ in_library: bool | None = None,
+ search: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ order_by: str | None = None,
+ ) -> PagedItems:
+ """Get Radio listing from the server."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/radios",
+ in_library=in_library,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ ),
+ Radio,
+ )
+
+ async def get_radio(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ ) -> Radio:
+ """Get single Radio from the server."""
+ return Radio.from_dict(
+ await self.client.send_command(
+ "music/radio",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ ),
+ )
+
+ async def get_radio_versions(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> list[Radio]:
+ """Get all other versions for given Radio from the server."""
+ return [
+ Radio.from_dict(item)
+ for item in await self.client.send_command(
+ "music/radio/versions",
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+ ]
+
+ # Other/generic endpoints/commands
+
+ async def get_item_by_uri(
+ self,
+ uri: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ ) -> MediaItemType:
+ """Get single music item providing a mediaitem uri."""
+ return media_from_dict(
+ await self.client.send_command(
+ "music/item_by_uri", uri=uri, force_refresh=force_refresh, lazy=lazy
+ )
+ )
+
+ async def refresh_item(
+ self,
+ media_item: MediaItemType,
+ ) -> MediaItemType | None:
+ """Try to refresh a mediaitem by requesting it's full object or search for substitutes."""
+ if result := await self.client.send_command("music/refresh_item", media_item=media_item):
+ return media_from_dict(result)
+ return None
+
+ async def get_item(
+ self,
+ media_type: MediaType,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool | None = None,
+ lazy: bool | None = None,
+ ) -> MediaItemType:
+ """Get single music item by id and media type."""
+ return media_from_dict(
+ await self.client.send_command(
+ "music/item",
+ media_type=media_type,
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ )
+ )
+
+ async def add_to_library(
+ self,
+ media_type: MediaType,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> None:
+ """Add an item to the library."""
+ await self.client.send_command(
+ "music/library/add",
+ media_type=media_type,
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+
+ async def remove_from_library(
+ self,
+ media_type: MediaType,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ ) -> None:
+ """Remove an item from the library."""
+ await self.client.send_command(
+ "music/library/remove",
+ media_type=media_type,
+ item_id=item_id,
+ provider_instance_id_or_domain=provider_instance_id_or_domain,
+ )
+
+ async def delete_db_item(
+ self, media_type: MediaType, db_item_id: str | int, recursive: bool = False
+ ) -> None:
+ """Remove item from the database."""
+ await self.client.send_command(
+ "music/delete", media_type=media_type, db_item_id=db_item_id, recursive=recursive
+ )
+
+ async def browse(
+ self,
+ path: str | None = None,
+ ) -> BrowseFolder:
+ """Browse Music providers."""
+ return BrowseFolder.from_dict(
+ await self.client.send_command("music/browse", path=path),
+ )
+
+ async def search(
+ self, search_query: str, media_types: tuple[MediaType] = MediaType.ALL, limit: int = 25
+ ) -> SearchResults:
+ """Perform global search for media items on all providers."""
+ return SearchResults.from_dict(
+ await self.client.send_command(
+ "music/search", search_query=search_query, media_types=media_types, limit=limit
+ ),
+ )
+
+ async def get_sync_tasks(self) -> list[SyncTask]:
+ """Return any/all sync tasks that are in progress on the server."""
+ return [
+ SyncTask.from_dict(item) for item in await self.client.send_command("music/synctasks")
+ ]
--- /dev/null
+"""Handle player related endpoints for Music Assistant."""
+from __future__ import annotations
+
+from collections.abc import Iterator
+from typing import TYPE_CHECKING
+
+from music_assistant.common.models.enums import EventType, QueueOption, RepeatMode
+from music_assistant.common.models.event import MassEvent
+from music_assistant.common.models.media_items import MediaItemType
+from music_assistant.common.models.player import Player
+from music_assistant.common.models.player_queue import PlayerQueue
+from music_assistant.common.models.queue_item import QueueItem
+
+if TYPE_CHECKING:
+ from .client import MusicAssistantClient
+
+
+class Players:
+ """Player related endpoints/data for Music Assistant."""
+
+ def __init__(self, client: MusicAssistantClient) -> None:
+ """Handle Initialization."""
+ self.client = client
+ # subscribe to player events
+ client.subscribe(
+ self._handle_event,
+ (
+ EventType.PLAYER_ADDED,
+ EventType.PLAYER_REMOVED,
+ EventType.PLAYER_UPDATED,
+ EventType.QUEUE_ADDED,
+ EventType.QUEUE_UPDATED,
+ ),
+ )
+ # below items are retrieved after connect
+ self._players: dict[str, Player] = {}
+ self._queues: dict[str, PlayerQueue] = {}
+
+ @property
+ def players(self) -> list[Player]:
+ """Return all players."""
+ return list(self._players.values())
+
+ @property
+ def player_queues(self) -> list[PlayerQueue]:
+ """Return all player queues."""
+ return list(self._queues.values())
+
+ def __iter__(self) -> Iterator[Player]:
+ """Iterate over (available) players."""
+ return iter(self._players.values())
+
+ def get_player(self, player_id: str) -> Player | None:
+ """Return Player by ID (or None if not found)."""
+ return self._players.get(player_id)
+
+ def get_player_queue(self, queue_id: str) -> PlayerQueue | None:
+ """Return PlayerQueue by ID (or None if not found)."""
+ return self._queues.get(queue_id)
+
+ # Player related endpoints/commands
+
+ async def get_players(self) -> list[Player]:
+ """Fetch all Players from the server."""
+ return [Player.from_dict(item) for item in await self.client.send_command("players/all")]
+
+ async def player_command_stop(self, player_id: str) -> None:
+ """Send STOP command to given player (directly)."""
+ await self.client.send_command("players/cmd/stop", player_id=player_id)
+
+ async def player_command_power(self, player_id: str, powered: bool) -> None:
+ """Send POWER command to given player."""
+ await self.client.send_command("players/cmd/power", player_id=player_id, powered=powered)
+
+ async def player_command_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME SET command to given player."""
+ await self.client.send_command(
+ "players/cmd/volume_set", player_id=player_id, volume_level=volume_level
+ )
+
+ async def player_command_volume_up(self, player_id: str) -> None:
+ """Send VOLUME UP command to given player."""
+ await self.client.send_command("players/cmd/volume_up", player_id=player_id)
+
+ async def player_command_volume_down(self, player_id: str) -> None:
+ """Send VOLUME DOWN command to given player."""
+ await self.client.send_command("players/cmd/volume_down", player_id=player_id)
+
+ async def player_command_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
+ await self.client.send_command("players/cmd/volume_mute", player_id=player_id, muted=muted)
+
+ async def player_command_sync(self, player_id: str, target_player: str) -> None:
+ """
+ Handle SYNC command for given player.
+
+ Join/add the given player(id) to the given (master) player/sync group.
+ If the player is already synced to another player, it will be unsynced there first.
+ If the target player itself is already synced to another player, this will fail.
+ If the player can not be synced with the given target player, this will fail.
+
+ - player_id: player_id of the player to handle the command.
+ - target_player: player_id of the syncgroup master or group player.
+ """
+ await self.client.send_command(
+ "players/cmd/sync", player_id=player_id, target_player=target_player
+ )
+
+ async def player_command_unsync(self, player_id: str) -> None:
+ """
+ Handle UNSYNC command for given player.
+
+ Remove the given player from any syncgroups it currently is synced to.
+ If the player is not currently synced to any other player,
+ this will silently be ignored.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ await self.client.send_command("players/cmd/unsync", player_id=player_id)
+
+ # PlayerGroup related endpoints/commands
+
+ async def set_player_group_volume(self, player_id: str, volume_level: int) -> None:
+ """
+ Send VOLUME_SET command to given playergroup.
+
+ Will send the new (average) volume level to group child's.
+ - player_id: player_id of the playergroup to handle the command.
+ - volume_level: volume level (0..100) to set on the player.
+ """
+ await self.client.send_command(
+ "players/cmd/group_volume", player_id=player_id, volume_level=volume_level
+ )
+
+ async def set_player_group_members(self, player_id: str, members: list[str]) -> None:
+ """
+ Update the memberlist of the given PlayerGroup.
+
+ - player_id: player_id of the groupplayer to handle the command.
+ - members: list of player ids to set as members.
+ """
+ await self.client.send_command(
+ "players/cmd/set_members", player_id=player_id, members=members
+ )
+
+ # PlayerQueue related endpoints/commands
+
+ async def get_player_queues(self) -> list[PlayerQueue]:
+ """Fetch all PlayerQueues from the server."""
+ return [
+ PlayerQueue.from_dict(item)
+ for item in await self.client.send_command("players/queue/all")
+ ]
+
+ async def get_player_queue_items(self, queue_id: str) -> list[QueueItem]:
+ """Get all QueueItems for given PlayerQueue."""
+ return [
+ QueueItem.from_dict(item)
+ for item in await self.client.send_command("players/queue/items", queue_id=queue_id)
+ ]
+
+ async def queue_command_play(self, queue_id: str) -> None:
+ """Send PLAY command to given queue."""
+ await self.client.send_command("players/queue/play", queue_id=queue_id)
+
+ async def queue_command_pause(self, queue_id: str) -> None:
+ """Send PAUSE command to given queue."""
+ await self.client.send_command("players/queue/pause", queue_id=queue_id)
+
+ async def queue_command_stop(self, queue_id: str) -> None:
+ """Send STOP command to given queue."""
+ await self.client.send_command("players/queue/stop", queue_id=queue_id)
+
+ async def queue_command_next(self, queue_id: str) -> None:
+ """Send NEXT TRACK command to given queue."""
+ await self.client.send_command("players/queue/next", queue_id=queue_id)
+
+ async def queue_command_previous(self, queue_id: str) -> None:
+ """Send PREVIOUS TRACK command to given queue."""
+ await self.client.send_command("players/queue/previous", queue_id=queue_id)
+
+ async def queue_command_clear(self, queue_id: str) -> None:
+ """Send CLEAR QUEUE command to given queue."""
+ await self.client.send_command("players/queue/clear", queue_id=queue_id)
+
+ async def queue_command_move_item(
+ self, queue_id: str, queue_item_id: str, pos_shift: int = 1
+ ) -> None:
+ """
+ Move queue item x up/down the queue.
+
+ Parameters:
+ - queue_id: id of the queue to process this request.
+ - queue_item_id: the item_id of the queueitem that needs to be moved.
+ - pos_shift: move item x positions down if positive value
+ - pos_shift: move item x positions up if negative value
+ - pos_shift: move item to top of queue as next item if 0
+
+ NOTE: Fails if the given QueueItem is already player or loaded in the buffer.
+ """
+ await self.client.send_command(
+ "players/queue/move_item",
+ queue_id=queue_id,
+ queue_item_id=queue_item_id,
+ pos_shift=pos_shift,
+ )
+
+ async def queue_command_move_up(self, queue_id: str, queue_item_id: str) -> None:
+ """Move given queue item one place up in the queue."""
+ await self.queue_command_move_item(
+ queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=-1
+ )
+
+ async def queue_command_move_down(self, queue_id: str, queue_item_id: str) -> None:
+ """Move given queue item one place down in the queue."""
+ await self.queue_command_move_item(
+ queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=1
+ )
+
+ async def queue_command_move_next(self, queue_id: str, queue_item_id: str) -> None:
+ """Move given queue item as next up in the queue."""
+ await self.queue_command_move_item(
+ queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=0
+ )
+
+ async def queue_command_delete(self, queue_id: str, item_id_or_index: int | str) -> None:
+ """Delete item (by id or index) from the queue."""
+ await self.client.send_command(
+ "players/queue/delete_item", queue_id=queue_id, item_id_or_index=item_id_or_index
+ )
+
+ async def queue_command_seek(self, queue_id: str, position: int) -> None:
+ """
+ Handle SEEK command for given queue.
+
+ Parameters:
+ - position: position in seconds to seek to in the current playing item.
+ """
+ await self.client.send_command("players/queue/seek", queue_id=queue_id, position=position)
+
+ async def queue_command_skip(self, queue_id: str, seconds: int) -> None:
+ """
+ Handle SKIP command for given queue.
+
+ Parameters:
+ - seconds: number of seconds to skip in track. Use negative value to skip back.
+ """
+ await self.client.send_command("players/queue/skip", queue_id=queue_id, seconds=seconds)
+
+ async def queue_command_shuffle(self, queue_id: str, shuffle_enabled=bool) -> None:
+ """Configure shuffle mode on the the queue."""
+ await self.client.send_command(
+ "players/queue/shuffle", queue_id=queue_id, shuffle_enabled=shuffle_enabled
+ )
+
+ async def queue_command_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
+ """Configure repeat mode on the the queue."""
+ await self.client.send_command(
+ "players/queue/repeat", queue_id=queue_id, repeat_mode=repeat_mode
+ )
+
+ async def queue_command_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None:
+ """Configure crossfade mode on the the queue."""
+ await self.client.send_command(
+ "players/queue/crossfade", queue_id=queue_id, crossfade_enabled=crossfade_enabled
+ )
+
+ async def play_media(
+ self,
+ queue_id: str,
+ media: MediaItemType | list[MediaItemType] | str | list[str],
+ option: QueueOption = QueueOption.PLAY,
+ radio_mode: bool = False,
+ ) -> None:
+ """
+ Play media item(s) on the given queue.
+
+ - media: Media that should be played (MediaItem(s) or uri's).
+ - queue_opt: Which enqueue mode to use.
+ - radio_mode: Enable radio mode for the given item(s).
+ """
+ await self.client.send_command(
+ "players/queue/play_media",
+ queue_id=queue_id,
+ media=media,
+ option=option,
+ radio_mode=radio_mode,
+ )
+
+ # Other endpoints/commands
+
+ async def fetch_state(self) -> None:
+ """Fetch initial state once the server is connected."""
+ for player in await self.get_players():
+ self._players[player.player_id] = player
+ for queue in await self.get_player_queues():
+ self._queues[queue.queue_id] = queue
+
+ def _handle_event(self, event: MassEvent) -> None:
+ """Handle incoming player(queue) event."""
+ if event.event in (EventType.PLAYER_ADDED, EventType.PLAYER_UPDATED):
+ self._players[event.object_id] = Player.from_dict(event.data)
+ return
+ if event.event == EventType.PLAYER_REMOVED:
+ self._players.pop(event.object_id, None)
+ if event.event in (EventType.QUEUE_ADDED, EventType.QUEUE_UPDATED):
+ self._queues[event.object_id] = PlayerQueue.from_dict(event.data)
import asyncio
import os
-import platform
-import re
import socket
-import tempfile
from collections.abc import Callable
from typing import Any, TypeVar
-import memory_tempfile
-import unidecode
-
# pylint: disable=invalid-name
T = TypeVar("T")
_UNDEF: dict = {}
return possible_bool in ["true", "True", "1", "on", "ON", 1]
-def create_safe_string(input_str: str) -> str:
- """Return clean lowered string for compare actions."""
- input_str = input_str.lower().strip()
- unaccented_string = unidecode.unidecode(input_str)
- return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
-
-
def create_sort_name(input_str: str) -> str:
"""Create sort name/title from string."""
input_str = input_str.lower().strip()
return await asyncio.to_thread(_resolve)
-def get_ip_pton():
+def get_ip_pton(ip_string: str = get_ip()):
"""Return socket pton for local ip."""
# pylint:disable=no-member
try:
- return socket.inet_pton(socket.AF_INET, get_ip())
+ return socket.inet_pton(socket.AF_INET, ip_string)
except OSError:
- return socket.inet_pton(socket.AF_INET6, get_ip())
+ return socket.inet_pton(socket.AF_INET6, ip_string)
def get_folder_size(folderpath):
return list(x for x in base if x not in new) + list(new)
-def create_tempfile():
- """Return a (named) temporary file."""
- if platform.system() == "Linux":
- return memory_tempfile.MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
- return tempfile.NamedTemporaryFile(buffering=0)
-
-
def get_changed_keys(
dict1: dict[str, Any],
dict2: dict[str, Any],
class ServerInfoMessage(DataClassORJSONMixin):
"""Message sent by the server with it's info when a client connects."""
+ server_id: str
server_version: str
schema_version: int
+ min_supported_schema_version: int
+ base_url: str
MessageType = (
CommandMessage | EventMessage | SuccessResultMessage | ErrorResultMessage | ServerInfoMessage
)
+
+
+def parse_message(raw: dict) -> MessageType:
+ """Parse Message from raw dict object."""
+ if "event" in raw:
+ return EventMessage.from_dict(raw)
+ if "error_code" in raw:
+ return ErrorResultMessage.from_dict(raw)
+ if "result" in raw and "is_last_chunk" in raw:
+ return ChunkedResultMessage.from_dict(raw)
+ if "result" in raw:
+ return SuccessResultMessage.from_dict(raw)
+ if "sdk_version" in raw:
+ return ServerInfoMessage.from_dict(raw)
+ return CommandMessage.from_dict(raw)
"""Custom errors and exceptions."""
+# mapping from error_code to Exception class
+ERROR_MAP: dict[int, type] = {}
+
class MusicAssistantError(Exception):
"""Custom Exception for all errors."""
error_code = 0
+ def __init_subclass__(cls, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
+ """Register a subclass."""
+ super().__init_subclass__(*args, **kwargs)
+ ERROR_MAP[cls.error_code] = cls
+
class ProviderUnavailableError(MusicAssistantError):
"""Error raised when trying to access mediaitem of unavailable provider."""
"""Error thrown when a MediaItem cannot be played properly."""
error_code = 13
-
-
-def error_code_to_exception(error_code: int) -> MusicAssistantError:
- """Return MusicAssistant Error (exception) from error_code."""
- match error_code:
- case 1:
- return ProviderUnavailableError
- case 2:
- return MediaNotFoundError
- case 3:
- return InvalidDataError
- case 4:
- return AlreadyRegisteredError
- case 5:
- return SetupFailedError
- case 6:
- return LoginFailed
- case 7:
- return AudioError
- case 8:
- return QueueEmpty
- case 9:
- return UnsupportedFeaturedException
- case 10:
- return PlayerUnavailableError
- case 11:
- return PlayerCommandFailed
- case 12:
- return InvalidCommand
- case 13:
- return UnplayableMediaError
- case _:
- return MusicAssistantError
from collections.abc import Mapping
from dataclasses import dataclass, field, fields
from time import time
-from typing import Any
+from typing import Any, Self
from mashumaro import DataClassDictMixin
offset: int
total: int | None = None
+ @classmethod
+ def parse(cls: Self, raw: dict[str, Any], item_type: type) -> PagedItems:
+ """Parse PagedItems object including correct item type."""
+ return PagedItems(
+ items=[item_type.from_dict(x) for x in raw["items"]],
+ count=raw["count"],
+ limit=raw["limit"],
+ offset=raw["offset"],
+ total=raw["total"],
+ )
+
@dataclass
class SearchResults(DataClassDictMixin):
__version__: Final[str] = "2.0.0b30"
+VERSION: Final[int] = __version__
SCHEMA_VERSION: Final[int] = 22
+MIN_SCHEMA_VERSION = 22
ROOT_LOGGER_NAME: Final[str] = "music_assistant"
async def refresh_item(
self,
media_item: MediaItemType,
- ):
+ ) -> MediaItemType | None:
"""Try to refresh a mediaitem by requesting it's full object or search for substitutes."""
try:
return await self.get_item(
result = searchresult.radio
for item in result:
if item.available:
- await self.get_item(
+ return await self.get_item(
item.media_type, item.item_id, item.provider, lazy=False, add_to_db=True
)
return None
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_volume_set(player_id, volume_level)
+ @api_command("players/cmd/volume_up")
+ async def cmd_volume_up(self, player_id: str) -> None:
+ """Send VOLUME_UP command to given player.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ new_volume = min(100, self._players[player_id].volume_level + 5)
+ await self.cmd_volume_set(player_id, new_volume)
+
+ @api_command("players/cmd/volume_down")
+ async def cmd_volume_down(self, player_id: str) -> None:
+ """Send VOLUME_DOWN command to given player.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ new_volume = max(0, self._players[player_id].volume_level - 5)
+ await self.cmd_volume_set(player_id, new_volume)
+
@api_command("players/cmd/group_volume")
async def cmd_group_volume(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given playergroup.
import aiofiles
from aiohttp import ClientTimeout
-from music_assistant.common.helpers.util import create_tempfile
from music_assistant.common.models.errors import AudioError, MediaNotFoundError, MusicAssistantError
from music_assistant.common.models.media_items import ContentType, MediaType, StreamDetails
from music_assistant.constants import (
CONF_VOLUME_NORMALIZATION_TARGET,
ROOT_LOGGER_NAME,
)
-from music_assistant.server.helpers.process import AsyncProcess, check_output
+
+from .process import AsyncProcess, check_output
+from .util import create_tempfile
if TYPE_CHECKING:
from music_assistant.common.models.player_queue import QueueItem
"""Several helper/utils to compare objects."""
from __future__ import annotations
-from music_assistant.common.helpers.util import create_safe_string, create_sort_name
+import re
+
+import unidecode
+
+from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.media_items import (
Album,
Artist,
)
+def create_safe_string(input_str: str) -> str:
+ """Return clean lowered string for compare actions."""
+ input_str = input_str.lower().strip()
+ unaccented_string = unidecode.unidecode(input_str)
+ return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
+
+
def loose_compare_strings(base: str, alt: str) -> bool:
"""Compare strings and return True even on partial match."""
# this is used to display 'versions' of the same track/album
import asyncio
import importlib
import logging
+import platform
+import tempfile
from functools import lru_cache
from typing import TYPE_CHECKING
+import memory_tempfile
+
if TYPE_CHECKING:
from music_assistant.server.models import ProviderModuleType
return importlib.import_module(f".{domain}", "music_assistant.server.providers")
return await asyncio.to_thread(_get_provider_module, domain)
+
+
+def create_tempfile():
+ """Return a (named) temporary file."""
+ if platform.system() == "Linux":
+ return memory_tempfile.MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ return tempfile.NamedTemporaryFile(buffering=0)
CommandMessage,
ErrorResultMessage,
MessageType,
- ServerInfoMessage,
SuccessResultMessage,
)
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.errors import InvalidCommand
from music_assistant.common.models.event import MassEvent
-from music_assistant.constants import ROOT_LOGGER_NAME, __version__
-from music_assistant.server.helpers.api import (
- API_SCHEMA_VERSION,
- APICommandHandler,
- parse_arguments,
-)
+from music_assistant.constants import ROOT_LOGGER_NAME
+from music_assistant.server.helpers.api import APICommandHandler, parse_arguments
from music_assistant.server.models.plugin import PluginProvider
if TYPE_CHECKING:
self._writer_task = asyncio.create_task(self._writer())
# send server(version) info when client connects
- self._send_message(
- ServerInfoMessage(server_version=__version__, schema_version=API_SCHEMA_VERSION)
- )
+ self._send_message(self.mass.get_server_info())
# forward all events to clients
def handle_event(event: MassEvent) -> None:
from zeroconf import InterfaceChoice, NonUniqueNameException, ServiceInfo, Zeroconf
from music_assistant.common.helpers.util import get_ip, get_ip_pton
+from music_assistant.common.models.api import ServerInfoMessage
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.enums import EventType, ProviderType
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.common.models.event import MassEvent
from music_assistant.common.models.provider import ProviderManifest
-from music_assistant.constants import CONF_PROVIDERS, CONF_SERVER_ID, CONF_WEB_IP, ROOT_LOGGER_NAME
+from music_assistant.constants import (
+ CONF_PROVIDERS,
+ CONF_SERVER_ID,
+ CONF_WEB_IP,
+ MIN_SCHEMA_VERSION,
+ ROOT_LOGGER_NAME,
+ SCHEMA_VERSION,
+ VERSION,
+)
from music_assistant.server.controllers.cache import CacheController
from music_assistant.server.controllers.config import ConfigController
from music_assistant.server.controllers.metadata import MetaDataController
return ""
return self.config.get(CONF_SERVER_ID) # type: ignore[no-any-return]
+ @api_command("info")
+ def get_server_info(self) -> ServerInfoMessage:
+ """Return Info of this server."""
+ return ServerInfoMessage(
+ server_id=self.server_id,
+ server_version=VERSION,
+ schema_version=SCHEMA_VERSION,
+ min_supported_schema_version=MIN_SCHEMA_VERSION,
+ base_url=self.webserver.base_url,
+ )
+
@api_command("providers/available")
def get_available_providers(self) -> list[ProviderManifest]:
"""Return all available Providers."""
def _setup_discovery(self) -> None:
"""Make this Music Assistant instance discoverable on the network."""
- zeroconf_type = "_music-assistant._tcp.local."
+ zeroconf_type = "_mass._tcp.local."
server_id = self.server_id
info = ServiceInfo(
zeroconf_type,
name=f"{server_id}.{zeroconf_type}",
- addresses=[get_ip_pton()],
+ addresses=[get_ip_pton(self.base_ip)],
port=self.webserver.port,
- properties={},
- server=f"mass_{server_id}.local.",
+ properties=self.get_server_info().to_dict(),
+ server="mass.local.",
)
LOGGER.debug("Starting Zeroconf broadcast...")
try:
license = {text = "Apache-2.0"}
description = "Music Assistant"
readme = "README.md"
-requires-python = ">=3.11"
+requires-python = ">=3.10"
authors = [
{name = "The Music Assistant Authors", email = "marcelveldt@users.noreply.github.com"}
]
--- /dev/null
+"""Example script to test the MusicAssistant server and client."""
+
+import argparse
+import asyncio
+import logging
+import os
+from os.path import abspath, dirname
+from pathlib import Path
+from sys import path
+
+import coloredlogs
+from aiorun import run
+
+path.insert(1, dirname(dirname(abspath(__file__))))
+
+from music_assistant.client.client import MusicAssistantClient # noqa: E402
+from music_assistant.server.server import MusicAssistant # noqa: E402
+
+logging.basicConfig(level=logging.DEBUG)
+
+DEFAULT_PORT = 8095
+DEFAULT_URL = f"http://127.0.0.1:{DEFAULT_PORT}"
+DEFAULT_STORAGE_PATH = os.path.join(Path.home(), ".musicassistant")
+
+
+# Get parsed passed in arguments.
+parser = argparse.ArgumentParser(description="MusicAssistant Server Example.")
+parser.add_argument(
+ "--config",
+ type=str,
+ default=DEFAULT_STORAGE_PATH,
+ help="Storage path to keep persistent (configuration) data, "
+ "defaults to {DEFAULT_STORAGE_PATH}",
+)
+parser.add_argument(
+ "--log-level",
+ type=str,
+ default="info",
+ help="Provide logging level. Example --log-level debug, default=info, "
+ "possible=(critical, error, warning, info, debug)",
+)
+
+args = parser.parse_args()
+
+
+if __name__ == "__main__":
+ # configure logging
+ logging.basicConfig(level=args.log_level.upper())
+ coloredlogs.install(level=args.log_level.upper())
+
+ # make sure storage path exists
+ if not os.path.isdir(args.config):
+ os.mkdir(args.config)
+
+ # Init server
+ server = MusicAssistant(args.config)
+
+ async def run_mass():
+ """Run the MusicAssistant server and client."""
+ # start MusicAssistant Server
+ await server.start()
+
+ # run the client
+ async with MusicAssistantClient(DEFAULT_URL) as client:
+ # start listening
+ await client.start_listening()
+
+ async def handle_stop(loop: asyncio.AbstractEventLoop): # noqa: ARG001
+ """Handle server stop."""
+ await server.stop()
+
+ # run the server
+ run(run_mass(), shutdown_callback=handle_stop)