CONF_EQ_MID,
CONF_EQ_TREBLE,
CONF_FLOW_MODE,
+ CONF_LOG_LEVEL,
CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALISATION,
CONF_VOLUME_NORMALISATION_TARGET,
if not isinstance(result.value, expected_type):
if result.value is None and allow_none:
# In some cases we allow this (e.g. create default config)
+ result.value = result.default_value
return result
# handle common conversions/mistakes
if expected_type == float and isinstance(result.value, int):
for key in ("enabled", "name"):
cur_val = getattr(self, key, None)
new_val = getattr(update, key, None)
+ if new_val is None:
+ continue
if new_val == cur_val:
continue
setattr(self, key, new_val)
cur_val = self.values[key].value
if cur_val == new_val:
continue
- self.values[key].value = new_val
+ if new_val is None:
+ self.values[key].value = self.values[key].default_value
+ else:
+ self.values[key].value = new_val
changed_keys.add(f"values/{key}")
return changed_keys
values: dict[str, ConfigValueType] | None = None
+DEFAULT_PROVIDER_CONFIG_ENTRIES = (
+ ConfigEntry(
+ key=CONF_LOG_LEVEL,
+ type=ConfigEntryType.STRING,
+ label="Log level",
+ options=[
+ ConfigValueOption("global", "GLOBAL"),
+ ConfigValueOption("info", "INFO"),
+ ConfigValueOption("warning", "WARNING"),
+ ConfigValueOption("error", "ERROR"),
+ ConfigValueOption("debug", "DEBIG"),
+ ],
+ default_value="GLOBAL",
+ description="Set the log verbosity for this provider",
+ advanced=True,
+ ),
+)
+
+
DEFAULT_PLAYER_CONFIG_ENTRIES = (
ConfigEntry(
key=CONF_VOLUME_NORMALISATION,
CONF_EQ_TREBLE: Final[str] = "eq_treble"
CONF_OUTPUT_CHANNELS: Final[str] = "output_channels"
CONF_FLOW_MODE: Final[str] = "flow_mode"
+CONF_LOG_LEVEL: Final[str] = "log_level"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
from music_assistant.common.models import config_entries
from music_assistant.common.models.config_entries import (
DEFAULT_PLAYER_CONFIG_ENTRIES,
+ DEFAULT_PROVIDER_CONFIG_ENTRIES,
ConfigEntryValue,
ConfigUpdate,
PlayerConfig,
raw_values: dict[str, dict] = self.get(CONF_PROVIDERS, {})
prov_entries = {x.domain: x.config_entries for x in self.mass.get_available_providers()}
return [
- ProviderConfig.parse(
- prov_entries[prov_conf["domain"]],
- prov_conf,
- )
+ self.get_provider_config(prov_conf["instance_id"])
for prov_conf in raw_values.values()
if (provider_type is None or prov_conf["type"] == provider_type)
and (provider_domain is None or prov_conf["domain"] == provider_domain)
for prov in self.mass.get_available_providers():
if prov.domain != raw_conf["domain"]:
continue
- return ProviderConfig.parse(prov.config_entries, raw_conf)
+ config_entries = DEFAULT_PROVIDER_CONFIG_ENTRIES + tuple(prov.config_entries)
+ return ProviderConfig.parse(config_entries, raw_conf)
raise KeyError(f"No config found for provider id {instance_id}")
@api_command("config/providers/update")
)
return db_item
- async def create(self, name: str, provider: str | None = None) -> Playlist:
+ async def create(self, name: str, provider_instance_or_domain: str | None = None) -> Playlist:
"""Create new playlist."""
# if provider is omitted, just pick first provider
- if provider:
- provider = self.mass.get_provider(provider)
+ if provider_instance_or_domain:
+ provider = self.mass.get_provider(provider_instance_or_domain)
else:
provider = next(
(
self.mass.register_api_command("music/track/versions", self.versions)
self.mass.register_api_command("music/track/update", self.update_db_item)
self.mass.register_api_command("music/track/delete", self.delete_db_item)
+ self.mass.register_api_command("music/track/preview", self.get_preview_url)
async def get(
self,
url = f"{self.mass.base_url}/stream/{player_id}/{queue_item.queue_item_id}/{stream_job.stream_id}.{fmt}" # noqa: E501
return url
- async def get_preview_url(self, provider: str, track_id: str) -> str:
+ def get_preview_url(self, provider_domain_or_instance_id: str, track_id: str) -> str:
"""Return url to short preview sample."""
enc_track_id = urllib.parse.quote(track_id)
- return f"{self.mass.base_url}/preview?provider={provider}&item_id={enc_track_id}"
+ return (
+ f"{self.mass.base_url}/stream/preview?"
+ f"provider={provider_domain_or_instance_id}&item_id={enc_track_id}"
+ )
async def _serve_queue_stream(self, request: web.Request) -> web.Response:
"""Serve Queue Stream audio to player(s)."""
async def _serve_preview(self, request: web.Request):
"""Serve short preview sample."""
- provider_mapping = request.query["provider_mapping"]
+ provider_domain_or_instance_id = request.query["provider"]
item_id = urllib.parse.unquote(request.query["item_id"])
resp = web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "audio/mp3"})
await resp.prepare(request)
- async for chunk in get_preview_stream(self.mass, provider_mapping, item_id):
+ async for chunk in get_preview_stream(self.mass, provider_domain_or_instance_id, item_id):
await resp.write(chunk)
return resp
async def get_preview_stream(
mass: MusicAssistant,
- provider_mapping: str,
+ provider_domain_or_instance_id: str,
track_id: str,
) -> AsyncGenerator[bytes, None]:
"""Create a 30 seconds preview audioclip for the given streamdetails."""
- music_prov = mass.get_provider(provider_mapping)
+ music_prov = mass.get_provider(provider_domain_or_instance_id)
streamdetails = await music_prov.get_stream_details(track_id)
from music_assistant.common.models.config_entries import ConfigEntryValue, ProviderConfig
from music_assistant.common.models.enums import ProviderFeature, ProviderType
from music_assistant.common.models.provider import ProviderInstance, ProviderManifest
-from music_assistant.constants import ROOT_LOGGER_NAME
+from music_assistant.constants import CONF_LOG_LEVEL, ROOT_LOGGER_NAME
if TYPE_CHECKING:
from music_assistant.server import MusicAssistant
self.mass = mass
self.manifest = manifest
self.config = config
- self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.providers.{self.domain}")
+ self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.providers.{self.instance_id}")
+ log_level = config.get_value(CONF_LOG_LEVEL)
+ if log_level != "GLOBAL":
+ self.logger.setLevel(log_level)
self.cache = mass.cache
self.available = False
from __future__ import annotations
import asyncio
+import contextlib
import logging
import threading
import time
"""Handle close/cleanup of the provider."""
if not self.browser:
return
+
# stop discovery
- await self.mass.loop.run_in_executor(None, self.browser.stop_discovery)
+ def stop_discovery():
+ """Stop the chromecast discovery threads."""
+ if self.browser._zc_browser:
+ with contextlib.suppress(RuntimeError):
+ self.browser._zc_browser.cancel()
+
+ self.browser.host_browser.stop.set()
+ self.browser.host_browser.join()
+
+ await self.mass.loop.run_in_executor(None, stop_discovery)
# stop all chromecasts
for castplayer in list(self.castplayers.values()):
await self._disconnect_chromecast(castplayer)
castplayer.mz_controller = mz_controller
castplayer.cc.start()
- self.mass.loop.call_soon_threadsafe(self.mass.players.register, castplayer.player)
+ self.mass.loop.call_soon_threadsafe(
+ self.mass.players.register_or_update, castplayer.player
+ )
# if player was already added, the player will take care of reconnects itself.
castplayer.cast_info.update(disc_info)
"""Create a new playlist on provider with given name."""
# creating a new playlist on the filesystem is as easy
# as creating a new (empty) file with the m3u extension...
- filename = await self.resolve(f"{name}.m3u")
+ # filename = await self.resolve(f"{name}.m3u")
+ filename = f"{name}.m3u"
await self.write_file_content(filename, b"")
playlist = await self.get_playlist(filename)
db_playlist = await self.mass.music.playlists.add_db_item(playlist)
"config_entries": [
],
- "requirements": ["music-assistant-frontend==20230317.0"],
+ "requirements": ["music-assistant-frontend==20230319.0"],
"documentation": "",
"multi_instance": false,
"builtin": true,
+++ /dev/null
-"""JSON-RPC API which is more or less compatible with Logitech Media Server."""
-from __future__ import annotations
-
-from typing import Any
-
-from aiohttp import web
-
-from music_assistant.common.helpers.json import json_dumps, json_loads
-from music_assistant.common.models.enums import PlayerState
-from music_assistant.server.models.plugin import PluginProvider
-
-from .models import (
- CommandErrorMessage,
- CommandMessage,
- CommandResultMessage,
- PlayerItem,
- PlayersResponse,
- PlayerStatusResponse,
- player_item_from_mass,
- player_status_from_mass,
-)
-
-# ruff: noqa: ARG002, E501
-
-ArgsType = list[int | str]
-KwargsType = dict[str, Any]
-
-
-def parse_value(raw_value: int | str) -> int | str | tuple[str, int | str]:
- """
- Transform API param into a usable value.
-
- Integer values are sometimes sent as string so we try to parse that.
- """
- if isinstance(raw_value, str):
- if ":" in raw_value:
- # this is a key:value value
- key, val = raw_value.split(":")
- return (key, val)
- if raw_value.isnumeric():
- # this is an integer sent as string
- return int(raw_value)
- return raw_value
-
-
-def parse_args(raw_values: list[int | str]) -> tuple[ArgsType, KwargsType]:
- """Pargse Args and Kwargs from raw CLI params."""
- args: ArgsType = []
- kwargs: KwargsType = {}
- for raw_value in raw_values:
- value = parse_value(raw_value)
- if isinstance(value, tuple):
- kwargs[value[0]] = value[1]
- else:
- args.append(value)
- return (args, kwargs)
-
-
-class JSONRPCApi(PluginProvider):
- """Basic JSON-RPC API implementation, (partly) compatible with Logitech Media Server."""
-
- async def setup(self) -> None:
- """Handle async initialization of the plugin."""
- self.mass.webapp.router.add_get("/jsonrpc.js", self._handle_jsonrpc)
- self.mass.webapp.router.add_post("/jsonrpc.js", self._handle_jsonrpc)
-
- async def _handle_jsonrpc(self, request: web.Request) -> web.Response:
- """Handle request for image proxy."""
- command_msg: CommandMessage = await request.json(loads=json_loads)
- self.logger.debug("Received request: %s", command_msg)
-
- if command_msg["method"] == "slim.request":
- # Slim request handler
- # {"method":"slim.request","id":1,"params":["aa:aa:ca:5a:94:4c",["status","-", 2, "tags:xcfldatgrKN"]]}
- player_id = command_msg["params"][0]
- command = str(command_msg["params"][1][0])
- args, kwargs = parse_args(command_msg["params"][1][1:])
-
- if handler := getattr(self, f"_handle_{command}", None):
- # run handler for command
- self.logger.debug(
- "Handling JSON-RPC-request (player: %s command: %s - args: %s - kwargs: %s)",
- player_id,
- command,
- str(args),
- str(kwargs),
- )
- cmd_result = handler(player_id, *args, **kwargs)
- if cmd_result is None:
- cmd_result = {}
- elif not isinstance(cmd_result, dict):
- # individual values are returned with underscore ?!
- cmd_result = {f"_{command}": cmd_result}
- result: CommandResultMessage = {
- **command_msg,
- "result": cmd_result,
- }
- else:
- # no handler found
- self.logger.warning("No handler for %s", command)
- result: CommandErrorMessage = {
- **command_msg,
- "error": {"code": -1, "message": "Invalid command"},
- }
- # return the response to the client
- return web.json_response(result, dumps=json_dumps)
-
- def _handle_players(
- self,
- player_id: str,
- start_index: int | str = 0,
- limit: int = 999,
- **kwargs,
- ) -> PlayersResponse:
- """Handle players command."""
- players: list[PlayerItem] = []
- for index, mass_player in enumerate(self.mass.players.all()):
- if isinstance(start_index, int) and index < start_index:
- continue
- if len(players) > limit:
- break
- players.append(player_item_from_mass(start_index + index, mass_player))
- return PlayersResponse(count=len(players), players_loop=players)
-
- def _handle_status(
- self,
- player_id: str,
- *args,
- start_index: int | str = "-",
- limit: int = 2,
- tags: str = "xcfldatgrKN",
- **kwargs,
- ) -> PlayerStatusResponse:
- """Handle player status command."""
- player = self.mass.players.get(player_id)
- assert player is not None
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
- if start_index == "-":
- start_index = queue.current_index or 0
- queue_items = self.mass.players.queues.items(queue.queue_id)[
- start_index : start_index + limit
- ]
- # we ignore the tags, just always send all info
- return player_status_from_mass(player=player, queue=queue, queue_items=queue_items)
-
- def _handle_mixer(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player mixer command."""
- arg = args[0] if args else "?"
- player = self.mass.players.get(player_id)
- assert player is not None
-
- # <playerid> mixer volume <0 .. 100|-100 .. +100|?>
- if subcommand == "volume" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, arg)
- return
- if subcommand == "volume" and arg == "?":
- return player.volume_level
- if subcommand == "volume" and "+" in arg:
- volume_level = min(100, player.volume_level + int(arg.split("+")[1]))
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
- return
- if subcommand == "volume" and "-" in arg:
- volume_level = max(0, player.volume_level - int(arg.split("-")[1]))
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
- return
-
- # <playerid> mixer muting <0|1|toggle|?|>
- if subcommand == "muting" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.cmd_volume_mute, player_id, int(arg))
- return
- if subcommand == "muting" and arg == "toggle":
- self.mass.create_task(
- self.mass.players.cmd_volume_mute, player_id, not player.volume_muted
- )
- return
- if subcommand == "muting":
- return int(player.volume_muted)
-
- def _handle_time(self, player_id: str, number: str | int) -> int | None:
- """Handle player `time` command."""
- # <playerid> time <number|-number|+number|?>
- # The "time" command allows you to query the current number of seconds that the
- # current song has been playing by passing in a "?".
- # You may jump to a particular position in a song by specifying a number of seconds
- # to seek to. You may also jump to a relative position within a song by putting an
- # explicit "-" or "+" character before a number of seconds you would like to seek.
- player_queue = self.mass.players.queues.get_active_queue(player_id)
- assert player_queue is not None
-
- if number == "?":
- return int(player_queue.corrected_elapsed_time)
-
- if isinstance(number, str) and "+" in number or "-" in number:
- jump = int(number.split("+")[1])
- self.mass.create_task(self.mass.players.queues.skip, jump)
- else:
- self.mass.create_task(self.mass.players.queues.seek, number)
-
- def _handle_power(self, player_id: str, value: str | int) -> int | None:
- """Handle player `time` command."""
- # <playerid> power <0|1|?|>
- # The "power" command turns the player on or off.
- # Use 0 to turn off, 1 to turn on, ? to query and
- # no parameter to toggle the power state of the player.
- player = self.mass.players.get(player_id)
- assert player is not None
-
- if value == "?":
- return int(player.powered)
-
- self.mass.create_task(self.mass.players.cmd_power, player_id, bool(value))
-
- def _handle_playlist(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `playlist` command."""
- arg = args[0] if args else "?"
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
-
- # <playerid> playlist index <index|+index|-index|?> <fadeInSecs>
- if subcommand == "index" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.queues.play_index, player_id, arg)
- return
- if subcommand == "index" and arg == "?":
- return queue.current_index
- if subcommand == "index" and "+" in arg:
- next_index = (queue.current_index or 0) + int(arg.split("+")[1])
- self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
- return
- if subcommand == "index" and "-" in arg:
- next_index = (queue.current_index or 0) - int(arg.split("-")[1])
- self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
- return
-
- self.logger.warning("Unhandled command: playlist/%s", subcommand)
-
- def _handle_play(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `play` command."""
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
- self.mass.create_task(self.mass.players.queues.play, player_id)
-
- def _handle_stop(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `stop` command."""
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
- self.mass.create_task(self.mass.players.queues.stop, player_id)
-
- def _handle_pause(
- self,
- player_id: str,
- force: int = 0,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `stop` command."""
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
-
- if force or queue.state == PlayerState.PLAYING:
- self.mass.create_task(self.mass.players.queues.pause, player_id)
- else:
- self.mass.create_task(self.mass.players.queues.play, player_id)
+++ /dev/null
-{
- "type": "plugin",
- "domain": "json_rpc",
- "name": "JSON-RPC API",
- "description": "Basic JSON-RPC API implementation, (partly) compatible with Logitech Media Server.",
- "codeowners": ["@marcelveldt"],
- "config_entries": [
- ],
-
- "requirements": [],
- "documentation": "",
- "multi_instance": false,
- "builtin": true,
- "load_by_default": true
-}
+++ /dev/null
-"""Models used for the JSON-RPC API."""
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Any, TypedDict
-
-from music_assistant.common.models.enums import MediaType, PlayerState, RepeatMode
-
-if TYPE_CHECKING:
- from music_assistant.common.models.player import Player
- from music_assistant.common.models.player_queue import PlayerQueue
- from music_assistant.common.models.queue_item import QueueItem
-
-# ruff: noqa: UP013
-
-PLAYMODE_MAP = {
- PlayerState.IDLE: "stop",
- PlayerState.PLAYING: "play",
- PlayerState.OFF: "stop",
- PlayerState.PAUSED: "pause",
-}
-
-REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
-
-
-class CommandMessage(TypedDict):
- """Representation of Base JSON RPC Command Message."""
-
- # https://www.jsonrpc.org/specification
-
- id: int | str
- method: str
- params: list[str | int | list[str | int]]
-
-
-class CommandResultMessage(CommandMessage):
- """Representation of JSON RPC Result Message."""
-
- result: Any
-
-
-class ErrorDetails(TypedDict):
- """Representation of JSON RPC ErrorDetails."""
-
- code: int
- message: str
-
-
-class CommandErrorMessage(CommandMessage, TypedDict):
- """Base Representation of JSON RPC Command Message."""
-
- id: int | str | None
- error: ErrorDetails
-
-
-PlayerItem = TypedDict(
- "PlayerItem",
- {
- "playerindex": int,
- "playerid": str,
- "name": str,
- "modelname": str,
- "connected": int,
- "isplaying": int,
- "power": int,
- "model": str,
- "canpoweroff": int,
- "firmware": int,
- "isplayer": int,
- "displaytype": str,
- "uuid": str | None,
- "seq_no": int,
- "ip": str,
- },
-)
-
-
-def player_item_from_mass(playerindex: int, player: Player) -> PlayerItem:
- """Parse PlayerItem for the Json RPC interface from MA QueueItem."""
- return {
- "playerindex": playerindex,
- "playerid": player.player_id,
- "name": player.display_name,
- "modelname": player.device_info.model,
- "connected": int(player.available),
- "isplaying": 1 if player.state == PlayerState.PLAYING else 0,
- "power": int(player.powered),
- "model": "squeezelite",
- "canpoweroff": 1,
- "firmware": 0,
- "isplayer": 1,
- "displaytype": None,
- "uuid": None,
- "seq_no": 0,
- "ip": player.device_info.address,
- }
-
-
-PlayersResponse = TypedDict(
- "PlayersResponse",
- {
- "count": int,
- "players_loop": list[PlayerItem],
- },
-)
-
-
-PlaylistItem = TypedDict(
- "PlaylistItem",
- {
- "playlist index": int,
- "id": str,
- "title": str,
- "artist": str,
- "remote": int,
- "remote_title": str,
- "artwork_url": str,
- "bitrate": str,
- "duration": str | int | None,
- "coverid": str,
- },
-)
-
-
-def playlist_item_from_mass(queue_item: QueueItem, index: int = 0) -> PlaylistItem:
- """Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
- if queue_item.media_item and queue_item.media_type == MediaType.TRACK:
- artist = queue_item.media_item.artist.name
- album = queue_item.media_item.album.name
- title = queue_item.media_item.name
- elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
- if " - " in queue_item.streamdetails.stream_title:
- artist, title = queue_item.streamdetails.stream_title.split(" - ")
- else:
- artist = ""
- title = queue_item.streamdetails.stream_title
- album = queue_item.name
- else:
- artist = ""
- album = ""
- title = queue_item.name
- return {
- "playlist index": index,
- "id": queue_item.queue_item_id,
- "title": title,
- "artist": artist,
- "album": album,
- "genre": "",
- "remote": 0,
- "remote_title": queue_item.streamdetails.stream_title if queue_item.streamdetails else "",
- "artwork_url": queue_item.image_url or "",
- "bitrate": "",
- "duration": queue_item.duration or 0,
- "coverid": "-94099753136392",
- }
-
-
-PlayerStatusResponse = TypedDict(
- "PlayerStatusResponse",
- {
- "time": int,
- "mode": str,
- "sync_slaves": str,
- "playlist_cur_index": int | None,
- "player_name": str,
- "sync_master": str,
- "player_connected": int,
- "power": int,
- "mixer volume": int,
- "playlist repeat": int,
- "playlist shuffle": int,
- "playlist mode": str,
- "player_ip": str,
- "remoteMeta": dict | None,
- "digital_volume_control": int,
- "playlist_timestamp": float,
- "current_title": str,
- "duration": int,
- "seq_no": int,
- "remote": int,
- "can_seek": int,
- "signalstrength": int,
- "rate": int,
- "playlist_tracks": int,
- "playlist_loop": list[PlaylistItem],
- },
-)
-
-
-def player_status_from_mass(
- player: Player, queue: PlayerQueue, queue_items: list[QueueItem]
-) -> PlayerStatusResponse:
- """Parse PlayerStatusResponse for the Json RPC interface from MA info."""
- return {
- "time": queue.corrected_elapsed_time,
- "mode": PLAYMODE_MAP[queue.state],
- "sync_slaves": ",".join(player.group_childs),
- "playlist_cur_index": queue.current_index,
- "player_name": player.display_name,
- "sync_master": player.synced_to or "",
- "player_connected": int(player.available),
- "mixer volume": player.volume_level,
- "power": int(player.powered),
- "digital_volume_control": 1,
- "playlist_timestamp": 0, # TODO !
- "current_title": queue.current_item.queue_item_id
- if queue.current_item
- else "Music Assistant",
- "duration": queue.current_item.duration if queue.current_item else 0,
- "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
- "playlist shuffle": int(queue.shuffle_enabled),
- "playlist mode": "off",
- "player_ip": player.device_info.address,
- "seq_no": 0,
- "remote": 0,
- "can_seek": 1,
- "signalstrength": 0,
- "rate": 1,
- "playlist_tracks": queue.items,
- "playlist_loop": [
- playlist_item_from_mass(item, queue.current_index + index)
- for index, item in enumerate(queue_items)
- ],
- }
--- /dev/null
+"""JSON-RPC API which is more or less compatible with Logitech Media Server."""
+from __future__ import annotations
+
+import asyncio
+import urllib.parse
+from typing import Any
+
+from aiohttp import web
+
+from music_assistant.common.helpers.json import json_dumps, json_loads
+from music_assistant.common.helpers.util import select_free_port
+from music_assistant.common.models.enums import PlayerState
+from music_assistant.server.models.plugin import PluginProvider
+
+from .models import (
+ CommandErrorMessage,
+ CommandMessage,
+ CommandResultMessage,
+ PlayerItem,
+ PlayersResponse,
+ PlayerStatusResponse,
+ player_item_from_mass,
+ player_status_from_mass,
+)
+
+# ruff: noqa: ARG002, E501
+
+ArgsType = list[int | str]
+KwargsType = dict[str, Any]
+
+
+def parse_value(raw_value: int | str) -> int | str | tuple[str, int | str]:
+ """
+ Transform API param into a usable value.
+
+ Integer values are sometimes sent as string so we try to parse that.
+ """
+ if isinstance(raw_value, str):
+ if ":" in raw_value:
+ # this is a key:value value
+ key, val = raw_value.split(":")
+ return (key, val)
+ if raw_value.isnumeric():
+ # this is an integer sent as string
+ return int(raw_value)
+ return raw_value
+
+
+def parse_args(raw_values: list[int | str]) -> tuple[ArgsType, KwargsType]:
+ """Pargse Args and Kwargs from raw CLI params."""
+ args: ArgsType = []
+ kwargs: KwargsType = {}
+ for raw_value in raw_values:
+ value = parse_value(raw_value)
+ if isinstance(value, tuple):
+ kwargs[value[0]] = value[1]
+ else:
+ args.append(value)
+ return (args, kwargs)
+
+
+class LmsCli(PluginProvider):
+ """Basic LMS CLI (json rpc and telnet) implementation, (partly) compatible with Logitech Media Server."""
+
+ cli_port: int = 9090
+
+ async def setup(self) -> None:
+ """Handle async initialization of the plugin."""
+ self.logger.info("Registering jsonrpc endpoints on the webserver")
+ self.mass.webapp.router.add_get("/jsonrpc.js", self._handle_jsonrpc)
+ self.mass.webapp.router.add_post("/jsonrpc.js", self._handle_jsonrpc)
+ # setup (telnet) cli for players requesting basic info on that port
+ self.cli_port = await select_free_port(9090, 9190)
+ self.logger.info("Starting (telnet) CLI on port %s", self.cli_port)
+ await asyncio.start_server(self._handle_cli_client, "0.0.0.0", self.cli_port),
+
+ async def _handle_cli_client(
+ self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+ ) -> None:
+ """Handle new connection on the legacy CLI."""
+ # https://raw.githubusercontent.com/Logitech/slimserver/public/7.8/HTML/EN/html/docs/cli-api.html
+ # https://github.com/elParaguayo/LMS-CLI-Documentation/blob/master/LMS-CLI.md
+ self.logger.info("Client connected on Telnet CLI")
+ try:
+ while True:
+ raw_request = await reader.readline()
+ raw_request = raw_request.strip().decode("utf-8")
+ # request comes in as url encoded strings, separated by space
+ raw_params = [urllib.parse.unquote(x) for x in raw_request.split(" ")]
+ # the first param is either a macaddress or a command
+ if ":" in raw_params[0]:
+ # assume this is a mac address (=player_id)
+ player_id = raw_params[0]
+ command = raw_params[1]
+ command_params = raw_params[2:]
+ else:
+ player_id = ""
+ command = raw_params[0]
+ command_params = raw_params[1:]
+
+ args, kwargs = parse_args(command_params)
+
+ response: str = raw_request
+
+ # check if we have a handler for this command
+ # note that we only have support for very limited commands
+ # just enough for compatibility with players but not to be used as api
+ # with 3rd party tools!
+ if handler := getattr(self, f"_handle_{command}", None):
+ self.logger.debug(
+ "Handling CLI-request (player: %s command: %s - args: %s - kwargs: %s)",
+ player_id,
+ command,
+ str(args),
+ str(kwargs),
+ )
+ cmd_result: list[str] = handler(player_id, *args, **kwargs)
+ if isinstance(cmd_result, dict):
+ result_parts = dict_to_strings(cmd_result)
+ result_str = " ".join(urllib.parse.quote(x) for x in result_parts)
+ elif not cmd_result:
+ result_str = ""
+ else:
+ result_str = str(cmd_result)
+ response += " " + result_str
+ else:
+ self.logger.warning(
+ "No handler for %s (player: %s - args: %s - kwargs: %s)",
+ command,
+ player_id,
+ str(args),
+ str(kwargs),
+ )
+ # echo back the request and the result (if any)
+ response += "\n"
+ writer.write(response.encode("utf-8"))
+ await writer.drain()
+ except Exception as err:
+ self.logger.debug("Error handling CLI command", exc_info=err)
+ finally:
+ self.logger.debug("Client disconnected from Telnet CLI")
+
+ async def _handle_jsonrpc(self, request: web.Request) -> web.Response:
+ """Handle request for image proxy."""
+ command_msg: CommandMessage = await request.json(loads=json_loads)
+ self.logger.debug("Received request: %s", command_msg)
+
+ if command_msg["method"] == "slim.request":
+ # Slim request handler
+ # {"method":"slim.request","id":1,"params":["aa:aa:ca:5a:94:4c",["status","-", 2, "tags:xcfldatgrKN"]]}
+ player_id = command_msg["params"][0]
+ command = str(command_msg["params"][1][0])
+ args, kwargs = parse_args(command_msg["params"][1][1:])
+
+ if handler := getattr(self, f"_handle_{command}", None):
+ # run handler for command
+ self.logger.debug(
+ "Handling JSON-RPC-request (player: %s command: %s - args: %s - kwargs: %s)",
+ player_id,
+ command,
+ str(args),
+ str(kwargs),
+ )
+ cmd_result = handler(player_id, *args, **kwargs)
+ if cmd_result is None:
+ cmd_result = {}
+ elif not isinstance(cmd_result, dict):
+ # individual values are returned with underscore ?!
+ cmd_result = {f"_{command}": cmd_result}
+ result: CommandResultMessage = {
+ **command_msg,
+ "result": cmd_result,
+ }
+ else:
+ # no handler found
+ self.logger.warning("No handler for %s", command)
+ result: CommandErrorMessage = {
+ **command_msg,
+ "error": {"code": -1, "message": "Invalid command"},
+ }
+ # return the response to the client
+ return web.json_response(result, dumps=json_dumps)
+
+ def _handle_players(
+ self,
+ player_id: str,
+ start_index: int | str = 0,
+ limit: int = 999,
+ **kwargs,
+ ) -> PlayersResponse:
+ """Handle players command."""
+ players: list[PlayerItem] = []
+ for index, mass_player in enumerate(self.mass.players.all()):
+ if isinstance(start_index, int) and index < start_index:
+ continue
+ if len(players) > limit:
+ break
+ players.append(player_item_from_mass(start_index + index, mass_player))
+ return PlayersResponse(count=len(players), players_loop=players)
+
+ def _handle_status(
+ self,
+ player_id: str,
+ *args,
+ start_index: int | str = "-",
+ limit: int = 2,
+ tags: str = "xcfldatgrKN",
+ **kwargs,
+ ) -> PlayerStatusResponse:
+ """Handle player status command."""
+ player = self.mass.players.get(player_id)
+ assert player is not None
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+ if start_index == "-":
+ start_index = queue.current_index or 0
+ queue_items = self.mass.players.queues.items(queue.queue_id)[
+ start_index : start_index + limit
+ ]
+ # we ignore the tags, just always send all info
+ return player_status_from_mass(player=player, queue=queue, queue_items=queue_items)
+
+ def _handle_mixer(
+ self,
+ player_id: str,
+ subcommand: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player mixer command."""
+ arg = args[0] if args else "?"
+ player = self.mass.players.get(player_id)
+ assert player is not None
+
+ # <playerid> mixer volume <0 .. 100|-100 .. +100|?>
+ if subcommand == "volume" and isinstance(arg, int):
+ self.mass.create_task(self.mass.players.cmd_volume_set, player_id, arg)
+ return
+ if subcommand == "volume" and arg == "?":
+ return player.volume_level
+ if subcommand == "volume" and "+" in arg:
+ volume_level = min(100, player.volume_level + int(arg.split("+")[1]))
+ self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
+ return
+ if subcommand == "volume" and "-" in arg:
+ volume_level = max(0, player.volume_level - int(arg.split("-")[1]))
+ self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
+ return
+
+ # <playerid> mixer muting <0|1|toggle|?|>
+ if subcommand == "muting" and isinstance(arg, int):
+ self.mass.create_task(self.mass.players.cmd_volume_mute, player_id, int(arg))
+ return
+ if subcommand == "muting" and arg == "toggle":
+ self.mass.create_task(
+ self.mass.players.cmd_volume_mute, player_id, not player.volume_muted
+ )
+ return
+ if subcommand == "muting":
+ return int(player.volume_muted)
+
+ def _handle_time(self, player_id: str, number: str | int) -> int | None:
+ """Handle player `time` command."""
+ # <playerid> time <number|-number|+number|?>
+ # The "time" command allows you to query the current number of seconds that the
+ # current song has been playing by passing in a "?".
+ # You may jump to a particular position in a song by specifying a number of seconds
+ # to seek to. You may also jump to a relative position within a song by putting an
+ # explicit "-" or "+" character before a number of seconds you would like to seek.
+ player_queue = self.mass.players.queues.get_active_queue(player_id)
+ assert player_queue is not None
+
+ if number == "?":
+ return int(player_queue.corrected_elapsed_time)
+
+ if isinstance(number, str) and "+" in number or "-" in number:
+ jump = int(number.split("+")[1])
+ self.mass.create_task(self.mass.players.queues.skip, jump)
+ else:
+ self.mass.create_task(self.mass.players.queues.seek, number)
+
+ def _handle_power(self, player_id: str, value: str | int) -> int | None:
+ """Handle player `time` command."""
+ # <playerid> power <0|1|?|>
+ # The "power" command turns the player on or off.
+ # Use 0 to turn off, 1 to turn on, ? to query and
+ # no parameter to toggle the power state of the player.
+ player = self.mass.players.get(player_id)
+ assert player is not None
+
+ if value == "?":
+ return int(player.powered)
+
+ self.mass.create_task(self.mass.players.cmd_power, player_id, bool(value))
+
+ def _handle_playlist(
+ self,
+ player_id: str,
+ subcommand: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `playlist` command."""
+ arg = args[0] if args else "?"
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+
+ # <playerid> playlist index <index|+index|-index|?> <fadeInSecs>
+ if subcommand == "index" and isinstance(arg, int):
+ self.mass.create_task(self.mass.players.queues.play_index, player_id, arg)
+ return
+ if subcommand == "index" and arg == "?":
+ return queue.current_index
+ if subcommand == "index" and "+" in arg:
+ next_index = (queue.current_index or 0) + int(arg.split("+")[1])
+ self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
+ return
+ if subcommand == "index" and "-" in arg:
+ next_index = (queue.current_index or 0) - int(arg.split("-")[1])
+ self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
+ return
+
+ self.logger.warning("Unhandled command: playlist/%s", subcommand)
+
+ def _handle_play(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `play` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+ self.mass.create_task(self.mass.players.queues.play, player_id)
+
+ def _handle_stop(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `stop` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+ self.mass.create_task(self.mass.players.queues.stop, player_id)
+
+ def _handle_pause(
+ self,
+ player_id: str,
+ force: int = 0,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `stop` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+
+ if force or queue.state == PlayerState.PLAYING:
+ self.mass.create_task(self.mass.players.queues.pause, player_id)
+ else:
+ self.mass.create_task(self.mass.players.queues.play, player_id)
+
+
+def dict_to_strings(source: dict) -> list[str]:
+ """Convert dict to key:value strings (used in slimproto cli)."""
+ result: list[str] = []
+
+ for key, value in source.items():
+ if value in (None, ""):
+ continue
+ if isinstance(value, list):
+ for subval in value:
+ if isinstance(subval, dict):
+ result += dict_to_strings(subval)
+ else:
+ result.append(str(subval))
+ elif isinstance(value, dict):
+ result += dict_to_strings(subval)
+ else:
+ result.append(f"{key}:{str(value)}")
+ return result
--- /dev/null
+{
+ "type": "plugin",
+ "domain": "lms_cli",
+ "name": "LMS CLI",
+ "description": "Basic CLI implementation (classic + JSON-RPC), which is (partly) compatible with Logitech Media Server to maximize compatibility with Squeezebox players.",
+ "codeowners": ["@marcelveldt"],
+ "config_entries": [
+ ],
+ "requirements": [],
+ "documentation": "",
+ "multi_instance": false,
+ "builtin": true,
+ "load_by_default": true
+}
--- /dev/null
+"""Models used for the JSON-RPC API."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, TypedDict
+
+from music_assistant.common.models.enums import MediaType, PlayerState, RepeatMode
+
+if TYPE_CHECKING:
+ from music_assistant.common.models.player import Player
+ from music_assistant.common.models.player_queue import PlayerQueue
+ from music_assistant.common.models.queue_item import QueueItem
+
+# ruff: noqa: UP013
+
+PLAYMODE_MAP = {
+ PlayerState.IDLE: "stop",
+ PlayerState.PLAYING: "play",
+ PlayerState.OFF: "stop",
+ PlayerState.PAUSED: "pause",
+}
+
+REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
+
+
+class CommandMessage(TypedDict):
+ """Representation of Base JSON RPC Command Message."""
+
+ # https://www.jsonrpc.org/specification
+
+ id: int | str
+ method: str
+ params: list[str | int | list[str | int]]
+
+
+class CommandResultMessage(CommandMessage):
+ """Representation of JSON RPC Result Message."""
+
+ result: Any
+
+
+class ErrorDetails(TypedDict):
+ """Representation of JSON RPC ErrorDetails."""
+
+ code: int
+ message: str
+
+
+class CommandErrorMessage(CommandMessage, TypedDict):
+ """Base Representation of JSON RPC Command Message."""
+
+ id: int | str | None
+ error: ErrorDetails
+
+
+PlayerItem = TypedDict(
+ "PlayerItem",
+ {
+ "playerindex": int,
+ "playerid": str,
+ "name": str,
+ "modelname": str,
+ "connected": int,
+ "isplaying": int,
+ "power": int,
+ "model": str,
+ "canpoweroff": int,
+ "firmware": int,
+ "isplayer": int,
+ "displaytype": str,
+ "uuid": str | None,
+ "seq_no": int,
+ "ip": str,
+ },
+)
+
+
+def player_item_from_mass(playerindex: int, player: Player) -> PlayerItem:
+ """Parse PlayerItem for the Json RPC interface from MA QueueItem."""
+ return {
+ "playerindex": playerindex,
+ "playerid": player.player_id,
+ "name": player.display_name,
+ "modelname": player.device_info.model,
+ "connected": int(player.available),
+ "isplaying": 1 if player.state == PlayerState.PLAYING else 0,
+ "power": int(player.powered),
+ "model": "squeezelite",
+ "canpoweroff": 1,
+ "firmware": 0,
+ "isplayer": 1,
+ "displaytype": None,
+ "uuid": None,
+ "seq_no": 0,
+ "ip": player.device_info.address,
+ }
+
+
+PlayersResponse = TypedDict(
+ "PlayersResponse",
+ {
+ "count": int,
+ "players_loop": list[PlayerItem],
+ },
+)
+
+
+PlaylistItem = TypedDict(
+ "PlaylistItem",
+ {
+ "playlist index": int,
+ "id": str,
+ "title": str,
+ "artist": str,
+ "remote": int,
+ "remote_title": str,
+ "artwork_url": str,
+ "bitrate": str,
+ "duration": str | int | None,
+ "coverid": str,
+ },
+)
+
+
+def playlist_item_from_mass(queue_item: QueueItem, index: int = 0) -> PlaylistItem:
+ """Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
+ if queue_item.media_item and queue_item.media_type == MediaType.TRACK:
+ artist = queue_item.media_item.artist.name
+ album = queue_item.media_item.album.name
+ title = queue_item.media_item.name
+ elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
+ if " - " in queue_item.streamdetails.stream_title:
+ artist, title = queue_item.streamdetails.stream_title.split(" - ")
+ else:
+ artist = ""
+ title = queue_item.streamdetails.stream_title
+ album = queue_item.name
+ else:
+ artist = ""
+ album = ""
+ title = queue_item.name
+ return {
+ "playlist index": index,
+ "id": queue_item.queue_item_id,
+ "title": title,
+ "artist": artist,
+ "album": album,
+ "genre": "",
+ "remote": 0,
+ "remote_title": queue_item.streamdetails.stream_title if queue_item.streamdetails else "",
+ "artwork_url": queue_item.image_url or "",
+ "bitrate": "",
+ "duration": queue_item.duration or 0,
+ "coverid": "-94099753136392",
+ }
+
+
+PlayerStatusResponse = TypedDict(
+ "PlayerStatusResponse",
+ {
+ "time": int,
+ "mode": str,
+ "sync_slaves": str,
+ "playlist_cur_index": int | None,
+ "player_name": str,
+ "sync_master": str,
+ "player_connected": int,
+ "power": int,
+ "mixer volume": int,
+ "playlist repeat": int,
+ "playlist shuffle": int,
+ "playlist mode": str,
+ "player_ip": str,
+ "remoteMeta": dict | None,
+ "digital_volume_control": int,
+ "playlist_timestamp": float,
+ "current_title": str,
+ "duration": int,
+ "seq_no": int,
+ "remote": int,
+ "can_seek": int,
+ "signalstrength": int,
+ "rate": int,
+ "playlist_tracks": int,
+ "playlist_loop": list[PlaylistItem],
+ },
+)
+
+
+def player_status_from_mass(
+ player: Player, queue: PlayerQueue, queue_items: list[QueueItem]
+) -> PlayerStatusResponse:
+ """Parse PlayerStatusResponse for the Json RPC interface from MA info."""
+ return {
+ "time": queue.corrected_elapsed_time,
+ "mode": PLAYMODE_MAP[queue.state],
+ "sync_slaves": ",".join(player.group_childs),
+ "playlist_cur_index": queue.current_index,
+ "player_name": player.display_name,
+ "sync_master": player.synced_to or "",
+ "player_connected": int(player.available),
+ "mixer volume": player.volume_level,
+ "power": int(player.powered),
+ "digital_volume_control": 1,
+ "playlist_timestamp": 0, # TODO !
+ "current_title": queue.current_item.queue_item_id
+ if queue.current_item
+ else "Music Assistant",
+ "duration": queue.current_item.duration if queue.current_item else 0,
+ "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
+ "playlist shuffle": int(queue.shuffle_enabled),
+ "playlist mode": "off",
+ "player_ip": player.device_info.address,
+ "seq_no": 0,
+ "remote": 0,
+ "can_seek": 1,
+ "signalstrength": 0,
+ "rate": 1,
+ "playlist_tracks": queue.items,
+ "playlist_loop": [
+ playlist_item_from_mass(item, queue.current_index + index)
+ for index, item in enumerate(queue_items)
+ ],
+ }
import asyncio
import time
-import urllib.parse
from collections import deque
from collections.abc import Callable, Generator
from dataclasses import dataclass
from aioslimproto.const import EventType as SlimEventType
from aioslimproto.discovery import start_discovery
-from music_assistant.common.helpers.util import select_free_port
from music_assistant.common.models.config_entries import ConfigEntry
from music_assistant.common.models.enums import (
ConfigEntryType,
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import CONF_PLAYERS
from music_assistant.server.models.player_provider import PlayerProvider
-from music_assistant.server.providers.json_rpc import parse_args
if TYPE_CHECKING:
- from music_assistant.server.providers.json_rpc import JSONRPCApi
+ pass
# sync constants
MIN_DEVIATION_ADJUST = 10 # 10 milliseconds
# autodiscovery of the slimproto server does not work
# when the port is not the default (3483) so we hardcode it for now
slimproto_port = 3483
- cli_port = await select_free_port(9090, 9190)
+ cli_port = cli_prov.cli_port if (cli_prov := self.mass.get_provider("lms_cli")) else None
self.logger.info("Starting SLIMProto server on port %s", slimproto_port)
self._socket_servers = (
# start slimproto server
await asyncio.start_server(self._create_client, "0.0.0.0", slimproto_port),
# setup discovery
await start_discovery(slimproto_port, cli_port, self.mass.port),
- # setup (telnet) cli for players requesting basic info on that port
- await asyncio.start_server(self._handle_cli_client, "0.0.0.0", cli_port),
)
async def close(self) -> None:
if virtual_provider_info:
# if this player is part of a virtual provider run the callback
virtual_provider_info[0](player)
- self.mass.players.register(player)
+ self.mass.players.register_or_update(player)
# update player state on player events
player.available = True
if sync_delay != 0:
return client.elapsed_milliseconds - sync_delay
return client.elapsed_milliseconds
-
- async def _handle_cli_client(
- self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
- ) -> None:
- """Handle new connection on the legacy CLI."""
- # https://raw.githubusercontent.com/Logitech/slimserver/public/7.8/HTML/EN/html/docs/cli-api.html
- self.logger.info("Client connected on Telnet CLI")
- try:
- while True:
- raw_request = await reader.readline()
- raw_request = raw_request.strip().decode("utf-8")
- # request comes in as url encoded strings, separated by space
- raw_params = [urllib.parse.unquote(x) for x in raw_request.split(" ")]
- # the first param is either a macaddress or a command
- if ":" in raw_params[0]:
- # assume this is a mac address (=player_id)
- player_id = raw_params[0]
- command = raw_params[1]
- command_params = raw_params[2:]
- else:
- player_id = ""
- command = raw_params[0]
- command_params = raw_params[1:]
-
- args, kwargs = parse_args(command_params)
-
- response: str = raw_request
-
- # check if we have a handler for this command
- # note that we only have support for very limited commands
- # just enough for compatibility with players but not to be used as api
- # with 3rd party tools!
- json_rpc: JSONRPCApi = self.mass.get_provider("json_rpc")
- assert json_rpc is not None
- if handler := getattr(json_rpc, f"_handle_{command}", None):
- self.logger.debug(
- "Handling CLI-request (player: %s command: %s - args: %s - kwargs: %s)",
- player_id,
- command,
- str(args),
- str(kwargs),
- )
- cmd_result: list[str] = handler(player_id, *args, **kwargs)
- if isinstance(cmd_result, dict):
- result_parts = dict_to_strings(cmd_result)
- result_str = " ".join(urllib.parse.quote(x) for x in result_parts)
- elif not cmd_result:
- result_str = ""
- else:
- result_str = str(cmd_result)
- response += " " + result_str
- else:
- self.logger.warning(
- "No handler for %s (player: %s - args: %s - kwargs: %s)",
- command,
- player_id,
- str(args),
- str(kwargs),
- )
- # echo back the request and the result (if any)
- response += "\n"
- writer.write(response.encode("utf-8"))
- await writer.drain()
- except Exception as err:
- self.logger.debug("Error handling CLI command", exc_info=err)
- finally:
- self.logger.debug("Client disconnected from Telnet CLI")
-
-
-def dict_to_strings(source: dict) -> list[str]:
- """Convert dict to key:value strings (used in slimproto cli)."""
- result: list[str] = []
-
- for key, value in source.items():
- if value in (None, ""):
- continue
- if isinstance(value, list):
- for subval in value:
- if isinstance(subval, dict):
- result += dict_to_strings(subval)
- else:
- result.append(str(subval))
- elif isinstance(value, dict):
- result += dict_to_strings(subval)
- else:
- result.append(f"{key}:{str(value)}")
- return result
"requirements": ["aioslimproto==2.2.0"],
"documentation": "https://github.com/music-assistant/hass-music-assistant/discussions/1123",
"multi_instance": false,
- "builtin": true,
- "load_by_default": true
+ "builtin": false,
+ "load_by_default": true,
+ "depends_on": "lms_cli"
}
self.sonosplayers[player_id] = sonos_player
- self.mass.players.register(sonos_player.player)
+ self.mass.players.register_or_update(sonos_player.player)
def _handle_av_transport_event(self, sonos_player: SonosPlayer, event: SonosEvent):
"""Handle a soco.SoCo AVTransport event."""
git+https://github.com/pytube/pytube.git@refs/pull/1501/head
mashumaro==3.5.0
memory-tempfile==2.2.3
-music-assistant-frontend==20230317.0
+music-assistant-frontend==20230319.0
orjson==3.8.7
pillow==9.4.0
PyChromecast==13.0.4