self.server_url = server_url
self.connection = WebsocketsConnection(server_url, aiohttp_session)
self.logger = logging.getLogger(__package__)
- self._result_futures: dict[str, asyncio.Future] = {}
+ self._result_futures: dict[str | int, asyncio.Future[Any]] = {}
self._subscribers: list[EventSubscriptionType] = []
self._stop_called: bool = False
self._loop: asyncio.AbstractEventLoop | None = None
def get_image_url(self, image: MediaItemImage, size: int = 0) -> str:
"""Get (proxied) URL for MediaItemImage."""
+ assert self.server_info
if image.remotely_accessible and not size:
return image.path
if image.remotely_accessible and size:
def subscribe(
self,
cb_func: EventCallBackType,
- event_filter: EventType | tuple[EventType] | None = None,
- id_filter: str | tuple[str] | None = None,
- ) -> Callable:
+ event_filter: EventType | tuple[EventType, ...] | None = None,
+ id_filter: str | tuple[str, ...] | None = None,
+ ) -> Callable[[], None]:
"""Add callback to event listeners.
Returns function to remove the listener.
if self._stop_called:
return
+ assert self._loop
+
if event.event == EventType.PROVIDERS_UPDATED:
self._providers = {x["instance_id"]: ProviderInstance.from_dict(x) for x in event.data}
) -> bool | None:
"""Exit context manager."""
await self.disconnect()
+ return None
def __repr__(self) -> str:
"""Return the representation."""
from __future__ import annotations
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
from music_assistant.common.models.config_entries import (
ConfigEntry,
async def get_provider_config_value(self, instance_id: str, key: str) -> ConfigValueType:
"""Return single configentry value for a provider."""
- return await self.client.send_command(
- "config/providers/get_value", instance_id=instance_id, key=key
+ return cast(
+ ConfigValueType,
+ await self.client.send_command(
+ "config/providers/get_value", instance_id=instance_id, key=key
+ ),
)
async def get_provider_config_entries(
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- return (
+ return tuple(
ConfigEntry.from_dict(x)
for x in await self.client.send_command(
"config/providers/get_entries",
key: str,
) -> ConfigValueType:
"""Return single configentry value for a player."""
- return await self.client.send_command(
- "config/players/get_value", player_id=player_id, key=key
+ return cast(
+ ConfigValueType,
+ await self.client.send_command(
+ "config/players/get_value", player_id=player_id, key=key
+ ),
)
async def save_player_config(
async def get_core_config_value(self, domain: str, key: str) -> ConfigValueType:
"""Return single configentry value for a core controller."""
- return await self.client.send_command("config/core/get_value", domain=domain, key=key)
+ return cast(
+ ConfigValueType,
+ await self.client.send_command("config/core/get_value", domain=domain, key=key),
+ )
async def get_core_config_entries(
self,
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- return (
+ return tuple(
ConfigEntry.from_dict(x)
for x in await self.client.send_command(
"config/core/get_entries",
import logging
import pprint
-from typing import Any
+from typing import Any, cast
from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType, client_exceptions
"""Initialize."""
self.ws_server_url = get_websocket_url(server_url)
self._aiohttp_session_provided = aiohttp_session is not None
- self._aiohttp_session = aiohttp_session or ClientSession()
+ self._aiohttp_session: ClientSession | None = aiohttp_session or ClientSession()
self._ws_client: ClientWebSocketResponse | None = None
@property
ws_msg = await self._ws_client.receive()
if ws_msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
- msg = "Connection was closed."
- raise ConnectionClosed(msg)
+ raise ConnectionClosed("Connection was closed.")
if ws_msg.type == WSMsgType.ERROR:
raise ConnectionFailed
if ws_msg.type != WSMsgType.TEXT:
- msg = f"Received non-Text message: {ws_msg.type}"
- raise InvalidMessage(msg)
+ raise InvalidMessage(f"Received non-Text message: {ws_msg.type}")
try:
- msg = json_loads(ws_msg.data)
+ msg = cast(dict[str, Any], json_loads(ws_msg.data))
except TypeError as err:
- msg = f"Received unsupported JSON: {err}"
- raise InvalidMessage(msg) from err
+ raise InvalidMessage(f"Received unsupported JSON: {err}") from err
except ValueError as err:
- msg = "Received invalid JSON."
- raise InvalidMessage(msg) from err
+ raise InvalidMessage("Received invalid JSON.") from err
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("Received message:\n%s\n", pprint.pformat(ws_msg))
from __future__ import annotations
import urllib.parse
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
from music_assistant.common.models.enums import ImageType, MediaType
from music_assistant.common.models.media_items import (
media_from_dict,
)
from music_assistant.common.models.provider import SyncTask
+from music_assistant.common.models.queue_item import QueueItem
if TYPE_CHECKING:
from .client import MusicAssistantClient
provider_instance_id_or_domain: str,
) -> str:
"""Get URL to preview clip of given track."""
+ assert self.client.server_info
encoded_url = urllib.parse.quote(urllib.parse.quote(item_id))
return f"{self.client.server_info.base_url}/preview?path={encoded_url}&provider={provider_instance_id_or_domain}" # noqa: E501
) -> list[Track]:
"""Get (top)tracks for given artist."""
return [
- Artist.from_dict(item)
+ Track.from_dict(item)
for item in await self.client.send_command(
"music/artists/artist_tracks",
item_id=item_id,
async def add_item_to_library(self, item: str | MediaItemType) -> MediaItemType:
"""Add item (uri or mediaitem) to the library."""
- return await self.client.send_command("music/library/add_item", item=item)
+ return cast(
+ MediaItemType, await self.client.send_command("music/library/add_item", item=item)
+ )
async def refresh_item(
self,
def get_media_item_image(
self,
- item: MediaItemType | ItemMapping,
+ item: MediaItemType | ItemMapping | QueueItem,
type: ImageType = ImageType.THUMB, # noqa: A002
) -> MediaItemImage | None:
"""Get MediaItemImage for MediaItem, ItemMapping."""
if album_image := self.get_media_item_image(album, type):
return album_image
# handle regular image within mediaitem
- metadata: MediaItemMetadata
+ metadata: MediaItemMetadata | None
if metadata := getattr(item, "metadata", None):
for img in metadata.images or []:
if img.type == type:
- return img
+ return cast(MediaItemImage, img)
# retry with album/track artist(s)
artists: list[Artist | ItemMapping] | None
if artists := getattr(item, "artists", None):
"""
await self.client.send_command("player_queues/skip", queue_id=queue_id, seconds=seconds)
- async def queue_command_shuffle(self, queue_id: str, shuffle_enabled=bool) -> None:
+ async def queue_command_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
"""Configure shuffle mode on the the queue."""
await self.client.send_command(
"player_queues/shuffle", queue_id=queue_id, shuffle_enabled=shuffle_enabled
def _handle_event(self, event: MassEvent) -> None:
"""Handle incoming player(queue) event."""
if event.event in (EventType.QUEUE_ADDED, EventType.QUEUE_UPDATED):
+ # Queue events always have an object_id
+ assert event.object_id
self._queues[event.object_id] = PlayerQueue.from_dict(event.data)
async def cmd_unsync_many(self, player_ids: list[str]) -> None:
"""Create temporary sync group by joining given players to target player."""
- await self.client.send_command("players/cmd/unsync_many", player_ids)
+ await self.client.send_command("players/cmd/unsync_many", player_ids=player_ids)
async def play_announcement(
self,
def _handle_event(self, event: MassEvent) -> None:
"""Handle incoming player event."""
if event.event in (EventType.PLAYER_ADDED, EventType.PLAYER_UPDATED):
+ # Player events always have an object id
+ assert event.object_id
self._players[event.object_id] = Player.from_dict(event.data)
return
if event.event == EventType.PLAYER_REMOVED:
+ # Player events always have an object id
+ assert event.object_id
self._players.pop(event.object_id, None)
class ErrorResultMessage(ResultMessageBase):
"""Message sent when a command did not execute successfully."""
- error_code: str
+ error_code: int
details: str | None = None