"""Check if port is in use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _sock:
try:
- return _sock.connect_ex(("localhost", port)) == 0
- except socket.gaierror:
+ _sock.bind(("127.0.0.1", port))
+ except OSError:
return True
if key not in dict1:
changed_keys.add(key)
elif isinstance(value, dict):
- changed_keys.update(get_changed_keys(dict1[key], value))
+ changed_keys.update(get_changed_keys(dict1[key], value, ignore_keys))
elif dict1[key] != value:
changed_keys.add(key)
return changed_keys
from music_assistant.common.models.enums import ProviderType
from music_assistant.constants import (
+ CONF_CROSSFADE_DURATION,
CONF_EQ_BASS,
CONF_EQ_MID,
CONF_EQ_TREBLE,
for key, new_val in update.items():
if key in root_values:
continue
- cur_val = self.values[key].value
+ cur_val = self.values[key].value if key in self.values else None
# parse entry to do type validation
parsed_val = self.values[key].parse_value(new_val)
if cur_val != parsed_val:
ConfigValueOption("AAC (lossy, superior quality)", "aac"),
ConfigValueOption("MP3 (lossy, average quality)", "mp3"),
ConfigValueOption("WAV (lossless, huge file size)", "wav"),
+ ConfigValueOption("PCM (lossless, huge file size)", "pcm"),
],
default_value="flac",
description="Define the codec that is sent to the player when streaming audio. "
advanced=False,
)
+CONF_ENTRY_CROSSFADE_DURATION = ConfigEntry(
+ key=CONF_CROSSFADE_DURATION,
+ type=ConfigEntryType.INTEGER,
+ range=(0, 12),
+ default_value=8,
+ label="Crossfade duration",
+ description="Duration in seconds of the crossfade between tracks (if enabled)",
+ advanced=True,
+)
+
CONF_ENTRY_GROUPED_POWER_ON = ConfigEntry(
key=CONF_GROUPED_POWER_ON,
type=ConfigEntryType.BOOLEAN,
DEFAULT_PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_VOLUME_NORMALIZATION,
CONF_ENTRY_FLOW_MODE,
+ CONF_ENTRY_OUTPUT_CODEC,
CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
CONF_ENTRY_EQ_BASS,
CONF_ENTRY_EQ_MID,
import time
from dataclasses import dataclass, field
+from typing import Any
from mashumaro import DataClassDictMixin
# always prefers any overridden name from settings
display_name: str = ""
+ # extra_data: any additional data to store on the player object
+ # and pass along freely
+ extra_data: dict[str, Any] = field(default_factory=dict)
+
@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
CONF_HIDE_GROUP_CHILDS: Final[str] = "hide_group_childs"
CONF_OUTPUT_CODEC: Final[str] = "output_codec"
CONF_GROUPED_POWER_ON: Final[str] = "grouped_power_on"
+CONF_CROSSFADE_DURATION: Final[str] = "crossfade_duration"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
await self.mass.load_provider(config)
@api_command("config/players")
- def get_player_configs(self, provider: str | None = None) -> list[PlayerConfig]:
+ async def get_player_configs(self, provider: str | None = None) -> list[PlayerConfig]:
"""Return all known player configurations, optionally filtered by provider domain."""
available_providers = {x.domain for x in self.mass.providers}
return [
- self.get_player_config(player_id)
+ await self.get_player_config(player_id)
for player_id, raw_conf in self.get(CONF_PLAYERS).items()
# filter out unavailable providers
if raw_conf["provider"] in available_providers
]
@api_command("config/players/get")
- def get_player_config(self, player_id: str) -> PlayerConfig:
+ async def get_player_config(self, player_id: str) -> PlayerConfig:
"""Return configuration for a single player."""
if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"):
if prov := self.mass.get_provider(raw_conf["provider"]):
- prov_entries = prov.get_player_config_entries(player_id)
+ prov_entries = await prov.get_player_config_entries(player_id)
if player := self.mass.players.get(player_id, False):
raw_conf["default_name"] = player.display_name
else:
raise KeyError(f"No config found for player id {player_id}")
@api_command("config/players/get_value")
- def get_player_config_value(self, player_id: str, key: str) -> ConfigValueType:
+ async def get_player_config_value(self, player_id: str, key: str) -> ConfigValueType:
"""Return single configentry value for a player."""
cache_key = f"player_conf_value_{player_id}.{key}"
if (cached_value := self._value_cache.get(cache_key)) and cached_value is not None:
return cached_value
- conf = self.get_player_config(player_id)
+ conf = await self.get_player_config(player_id)
val = (
conf.values[key].value
if conf.values[key].value is not None
self._value_cache[cache_key] = val
return val
+ def get_raw_player_config_value(
+ self, player_id: str, key: str, default: ConfigValueType = None
+ ) -> ConfigValueType:
+ """
+ Return (raw) single configentry value for a player.
+
+ Note that this only returns the stored value without any validation or default.
+ """
+ return self.get(f"{CONF_PLAYERS}/{player_id}/values/{key}", default)
+
@api_command("config/players/save")
- def save_player_config(
+ async def save_player_config(
self, player_id: str, values: dict[str, ConfigValueType]
) -> PlayerConfig:
"""Save/update PlayerConfig."""
- config = self.get_player_config(player_id)
+ config = await self.get_player_config(player_id)
changed_keys = config.update(values)
if not changed_keys:
if provider := self.mass.get_provider(config.provider):
provider.on_player_config_changed(config, changed_keys)
# return full player config (just in case)
- return self.get_player_config(player_id)
+ return await self.get_player_config(player_id)
@api_command("config/players/remove")
async def remove_player_config(self, player_id: str) -> None:
if config.enabled:
await self.mass.load_provider(config)
else:
+ # disable provider
+ # check if there are no other providers dependent of this provider
+ for prov in self.mass.get_available_providers():
+ if prov.depends_on == config.domain and self.mass.get_provider(prov.domain):
+ raise RuntimeError(f"Provider {prov.name} depends on {config.domain}.")
await self.mass.unload_provider(config.instance_id)
# load succeeded, save new config
config.last_error = None
return None
- def get_image_url(self, image: MediaItemImage) -> str:
+ def get_image_url(self, image: MediaItemImage, size: int = 0) -> str:
"""Get (proxied) URL for MediaItemImage."""
if image.provider != "url":
# return imageproxy url for images that need to be resolved
# the original path is double encoded
encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
- return f"{self.mass.webserver.base_url}/imageproxy?path={encoded_url}&provider={image.provider}" # noqa: E501
+ return f"{self.mass.webserver.base_url}/imageproxy?path={encoded_url}&provider={image.provider}&size={size}" # noqa: E501
return image.path
async def get_thumbnail(
@api_command("players/queue/get_active_queue")
def get_active_queue(self, player_id: str) -> PlayerQueue:
"""Return the current active/synced queue for a player."""
- player = self.mass.players.get(player_id)
- if queue := self.get(player.active_source):
- return queue
+ if player := self.mass.players.get(player_id): # noqa: SIM102
+ if queue := self.get(player.active_source):
+ return queue
return self.get(player_id)
# Queue commands
await self.mass.players.cmd_power(queue_id, True)
# execute the play_media command on the player(s)
player_prov = self.mass.players.get_player_provider(queue_id)
- flow_mode = self.mass.config.get_player_config_value(queue.queue_id, CONF_FLOW_MODE)
+ flow_mode = await self.mass.config.get_player_config_value(queue.queue_id, CONF_FLOW_MODE)
queue.flow_mode = flow_mode
await player_prov.cmd_play_media(
queue_id,
# handle automatic hiding of group child's feature
for group_player in self._get_player_groups(player_id):
try:
- hide_group_childs = self.mass.config.get_player_config_value(
- group_player.player_id, CONF_HIDE_GROUP_CHILDS
+ hide_group_childs = self.mass.config.get_raw_player_config_value(
+ group_player.player_id, CONF_HIDE_GROUP_CHILDS, "active"
)
except KeyError:
continue
changed_keys = get_changed_keys(
prev_state,
new_state,
- ignore_keys=["elapsed_time", "elapsed_time_last_updated"],
+ ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no"],
)
self._prev_states[player_id] = new_state
CONF_EQ_MID,
CONF_EQ_TREBLE,
CONF_OUTPUT_CHANNELS,
+ CONF_OUTPUT_CODEC,
ROOT_LOGGER_NAME,
)
from music_assistant.server.helpers.audio import (
player_id: str,
seek_position: int = 0,
fade_in: bool = False,
- content_type: ContentType = ContentType.WAV,
auto_start_runner: bool = True,
flow_mode: bool = False,
+ output_codec: ContentType | None = None,
) -> str:
"""Resolve the stream URL for the given QueueItem.
call resolve for every child player.
- seek_position: start playing from this specific position.
- fade_in: fade in the music at start (e.g. at resume).
- - content_type: Encode the stream in the given format.
- auto_start_runner: Start the audio stream in advance (stream track now).
- flow_mode: enable flow mode where the queue tracks are streamed as continuous stream.
+ - output_codec: Encode the stream in the given format (None for auto select).
"""
# check if there is already a pending job
for stream_job in self.stream_jobs.values():
stream_job.start()
# generate player-specific URL for the stream job
- fmt = content_type.value
+ if output_codec is None:
+ output_codec = ContentType(
+ await self.mass.config.get_player_config_value(player_id, CONF_OUTPUT_CODEC)
+ )
+ fmt = output_codec.value
url = f"{self.mass.webserver.base_url}/stream/{player_id}/{queue_item.queue_item_id}/{stream_job.stream_id}.{fmt}" # noqa: E501
+ # handle pcm
+ if output_codec.is_pcm():
+ player = self.mass.players.get(player_id)
+ output_sample_rate = min(stream_job.pcm_sample_rate, player.max_sample_rate)
+ player_max_bit_depth = 32 if player.supports_24bit else 16
+ output_bit_depth = min(stream_job.pcm_bit_depth, player_max_bit_depth)
+ output_channels = await self.mass.config.get_player_config_value(
+ player_id, CONF_OUTPUT_CHANNELS
+ )
+ channels = 1 if output_channels != "stereo" else 2
+ url += (
+ f";codec=pcm;rate={output_sample_rate};"
+ f"bitrate={output_bit_depth};channels={channels}"
+ )
return url
def get_preview_url(self, provider_instance_id_or_domain: str, track_id: str) -> str:
# resolve generic pcm type
output_format = ContentType.from_bit_depth(output_bit_depth)
if output_format.is_pcm() or output_format == ContentType.WAV:
- output_channels = self.mass.config.get_player_config_value(
+ output_channels = await self.mass.config.get_player_config_value(
player_id, CONF_OUTPUT_CHANNELS
)
channels = 1 if output_channels != "stereo" else 2
LOGGER.debug("Start serving audio stream %s to %s", stream_id, player.name)
# collect player specific ffmpeg args to re-encode the source PCM stream
- ffmpeg_args = self._get_player_ffmpeg_args(
+ ffmpeg_args = await self._get_player_ffmpeg_args(
player,
input_sample_rate=stream_job.pcm_sample_rate,
input_bit_depth=stream_job.pcm_bit_depth,
await resp.write(chunk)
return resp
- def _get_player_ffmpeg_args(
+ async def _get_player_ffmpeg_args(
self,
player: Player,
input_sample_rate: int,
output_sample_rate: int,
) -> list[str]:
"""Get player specific arguments for the given (pcm) input and output details."""
- player_conf = self.mass.config.get_player_config(player.player_id)
+ player_conf = await self.mass.config.get_player_config(player.player_id)
conf_channels = player_conf.get_value(CONF_OUTPUT_CHANNELS)
# generic args
generic_args = [
mass: MusicAssistant, streamdetails: StreamDetails
) -> tuple[float | None, float | None]:
"""Get gain correction for given queue / track combination."""
- player_settings = mass.config.get_player_config(streamdetails.queue_id)
+ player_settings = await mass.config.get_player_config(streamdetails.queue_id)
if not player_settings or not player_settings.get_value(CONF_VOLUME_NORMALIZATION):
return (None, None)
if streamdetails.gain_correct is not None:
Player Provider implementations should inherit from this base model.
"""
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
return tuple()
import aiofiles
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_OUTPUT_CODEC,
+ ConfigEntry,
+ ConfigValueType,
+)
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
key="read_ahead",
type=ConfigEntryType.INTEGER,
range=(0, 2000),
- default_value=500,
+ default_value=1000,
label="Read ahead buffer",
description="Sets the number of milliseconds of audio buffer in the player. "
"This is important to absorb network throughput jitter. "
"(lossless) ALAC at the cost of a bit CPU.",
advanced=True,
),
+ ConfigEntry.from_dict(
+ {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": "pcm", "hidden": True}
+ ),
)
NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode"}
self._closing = True
await self._stop_bridge()
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
slimproto_prov = self.mass.get_provider("slimproto")
- base_entries = slimproto_prov.get_player_config_entries(player_id)
+ base_entries = await slimproto_prov.get_player_config_entries(player_id)
return tuple(base_entries + PLAYER_CONFIG_ENTRIES)
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
from music_assistant.common.models.config_entries import (
CONF_ENTRY_HIDE_GROUP_MEMBERS,
- CONF_ENTRY_OUTPUT_CODEC,
ConfigEntry,
ConfigValueType,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
MediaType,
PlayerFeature,
PlayerState,
from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_OUTPUT_CODEC, CONF_PLAYERS, MASS_LOGO_ONLINE
+from music_assistant.constants import CONF_PLAYERS, MASS_LOGO_ONLINE
from music_assistant.server.models.player_provider import PlayerProvider
from .helpers import CastStatusListener, ChromecastInfo
"the playback experience but may not work on non-Google hardware.",
advanced=True,
),
- CONF_ENTRY_OUTPUT_CODEC,
)
for castplayer in list(self.castplayers.values()):
await self._disconnect_chromecast(castplayer)
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
cast_player = self.castplayers.get(player_id)
entries = BASE_PLAYER_CONFIG_ENTRIES
) -> None:
"""Send PLAY MEDIA command to given player."""
castplayer = self.castplayers[player_id]
- output_codec = self.mass.config.get_player_config_value(player_id, CONF_OUTPUT_CODEC)
url = await self.mass.streams.resolve_stream_url(
queue_item=queue_item,
player_id=player_id,
seek_position=seek_position,
fade_in=fade_in,
- content_type=ContentType(output_codec),
flow_mode=flow_mode,
)
castplayer.flow_mode_active = flow_mode
await asyncio.to_thread(
castplayer.cc.play_media,
url,
- content_type="audio/flac",
+ content_type=f"audio/{url.split('.')[-1]}",
title="Music Assistant",
thumb=MASS_LOGO_ONLINE,
media_info={
url = await self.mass.streams.resolve_stream_url(
queue_item=next_item,
player_id=castplayer.player_id,
- content_type=ContentType.FLAC,
auto_start_runner=False,
)
cc_queue_items = [self._create_queue_item(next_item, url)]
async def _launch_app(self, castplayer: CastPlayer) -> None:
"""Launch the default Media Receiver App on a Chromecast."""
event = asyncio.Event()
- if use_alt_app := self.mass.config.get_player_config_value(
+ if use_alt_app := await self.mass.config.get_player_config_value(
castplayer.player_id, CONF_ALT_APP
):
app_id = pychromecast.config.APP_BUBBLEUPNP
from async_upnp_client.search import async_search
from async_upnp_client.utils import CaseInsensitiveDict
-from music_assistant.common.models.config_entries import (
- CONF_ENTRY_OUTPUT_CODEC,
- ConfigEntry,
- ConfigValueType,
-)
-from music_assistant.common.models.enums import ContentType, PlayerFeature, PlayerState, PlayerType
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.enums import PlayerFeature, PlayerState, PlayerType
from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_OUTPUT_CODEC, CONF_PLAYERS
+from music_assistant.constants import CONF_PLAYERS
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
)
-PLAYER_CONFIG_ENTRIES = (CONF_ENTRY_OUTPUT_CODEC,)
_DLNAPlayerProviderT = TypeVar("_DLNAPlayerProviderT", bound="DLNAPlayerProvider")
_R = TypeVar("_R")
for dlna_player in self.dlnaplayers.values():
tg.create_task(self._device_disconnect(dlna_player))
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: # noqa: ARG002
- """Return all (provider/player specific) Config Entries for the given player (if any)."""
- return PLAYER_CONFIG_ENTRIES
-
def on_player_config_changed(
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
# always clear queue (by sending stop) first
if dlna_player.device.can_stop:
await self.cmd_stop(player_id)
- output_codec = self.mass.config.get_player_config_value(player_id, CONF_OUTPUT_CODEC)
url = await self.mass.streams.resolve_stream_url(
queue_item=queue_item,
player_id=dlna_player.udn,
seek_position=seek_position,
fade_in=fade_in,
- content_type=ContentType(output_codec),
flow_mode=flow_mode,
)
return
# send queue item to dlna queue
- output_codec = self.mass.config.get_player_config_value(
- dlna_player.player.player_id, CONF_OUTPUT_CODEC
- )
url = await self.mass.streams.resolve_stream_url(
queue_item=next_item,
player_id=dlna_player.udn,
- content_type=ContentType(output_codec),
# DLNA pre-caches pretty aggressively so do not yet start the runner
auto_start_runner=False,
)
+++ /dev/null
-"""JSON-RPC API which is more or less compatible with Logitech Media Server."""
-from __future__ import annotations
-
-import asyncio
-import urllib.parse
-from typing import TYPE_CHECKING, Any
-
-from aiohttp import web
-
-from music_assistant.common.helpers.json import json_dumps, json_loads
-from music_assistant.common.helpers.util import select_free_port
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import PlayerState
-from music_assistant.server.models.plugin import PluginProvider
-
-from .models import (
- CommandErrorMessage,
- CommandMessage,
- CommandResultMessage,
- PlayerItem,
- PlayersResponse,
- PlayerStatusResponse,
- player_item_from_mass,
- player_status_from_mass,
-)
-
-if TYPE_CHECKING:
- from music_assistant.common.models.config_entries import ProviderConfig
- from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.server import MusicAssistant
- from music_assistant.server.models import ProviderInstanceType
-
-
-# ruff: noqa: ARG002, E501
-
-ArgsType = list[int | str]
-KwargsType = dict[str, Any]
-
-
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- prov = LmsCli(mass, manifest, config)
- await prov.handle_setup()
- return prov
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- # ruff: noqa: ARG001
- return tuple() # we do not have any config entries (yet)
-
-
-def parse_value(raw_value: int | str) -> int | str | tuple[str, int | str]:
- """
- Transform API param into a usable value.
-
- Integer values are sometimes sent as string so we try to parse that.
- """
- if isinstance(raw_value, str):
- if ":" in raw_value:
- # this is a key:value value
- key, val = raw_value.split(":")
- return (key, val)
- if raw_value.isnumeric():
- # this is an integer sent as string
- return int(raw_value)
- return raw_value
-
-
-def parse_args(raw_values: list[int | str]) -> tuple[ArgsType, KwargsType]:
- """Pargse Args and Kwargs from raw CLI params."""
- args: ArgsType = []
- kwargs: KwargsType = {}
- for raw_value in raw_values:
- value = parse_value(raw_value)
- if isinstance(value, tuple):
- kwargs[value[0]] = value[1]
- else:
- args.append(value)
- return (args, kwargs)
-
-
-class LmsCli(PluginProvider):
- """Basic LMS CLI (json rpc and telnet) implementation, (partly) compatible with Logitech Media Server."""
-
- cli_port: int = 9090
-
- async def handle_setup(self) -> None:
- """Handle async initialization of the plugin."""
- self.logger.info("Registering jsonrpc endpoints on the webserver")
- self.mass.webserver.register_route("/jsonrpc.js", self._handle_jsonrpc)
- self.mass.webserver.register_route("/cometd", self._handle_cometd)
- # setup (telnet) cli for players requesting basic info on that port
- self.cli_port = await select_free_port(9090, 9190)
- self.logger.info("Starting (telnet) CLI on port %s", self.cli_port)
- await asyncio.start_server(self._handle_cli_client, "0.0.0.0", self.cli_port)
-
- async def unload(self) -> None:
- """
- Handle unload/close of the provider.
-
- Called when provider is deregistered (e.g. MA exiting or config reloading).
- """
- self.mass.webserver.unregister_route("/jsonrpc.js")
-
- async def _handle_cli_client(
- self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
- ) -> None:
- """Handle new connection on the legacy CLI."""
- # https://raw.githubusercontent.com/Logitech/slimserver/public/7.8/HTML/EN/html/docs/cli-api.html
- # https://github.com/elParaguayo/LMS-CLI-Documentation/blob/master/LMS-CLI.md
- self.logger.info("Client connected on Telnet CLI")
- try:
- while True:
- raw_request = await reader.readline()
- raw_request = raw_request.strip().decode("utf-8")
- # request comes in as url encoded strings, separated by space
- raw_params = [urllib.parse.unquote(x) for x in raw_request.split(" ")]
- # the first param is either a macaddress or a command
- if ":" in raw_params[0]:
- # assume this is a mac address (=player_id)
- player_id = raw_params[0]
- command = raw_params[1]
- command_params = raw_params[2:]
- else:
- player_id = ""
- command = raw_params[0]
- command_params = raw_params[1:]
-
- args, kwargs = parse_args(command_params)
-
- response: str = raw_request
-
- # check if we have a handler for this command
- # note that we only have support for very limited commands
- # just enough for compatibility with players but not to be used as api
- # with 3rd party tools!
- if handler := getattr(self, f"_handle_{command}", None):
- self.logger.debug(
- "Handling CLI-request (player: %s command: %s - args: %s - kwargs: %s)",
- player_id,
- command,
- str(args),
- str(kwargs),
- )
- cmd_result: list[str] = handler(player_id, *args, **kwargs)
- if asyncio.iscoroutine(cmd_result):
- cmd_result = await cmd_result
-
- if isinstance(cmd_result, dict):
- result_parts = dict_to_strings(cmd_result)
- result_str = " ".join(urllib.parse.quote(x) for x in result_parts)
- elif not cmd_result:
- result_str = ""
- else:
- result_str = str(cmd_result)
- response += " " + result_str
- else:
- self.logger.warning(
- "No handler for %s (player: %s - args: %s - kwargs: %s)",
- command,
- player_id,
- str(args),
- str(kwargs),
- )
- # echo back the request and the result (if any)
- response += "\n"
- writer.write(response.encode("utf-8"))
- await writer.drain()
- except ConnectionResetError:
- pass
- except Exception as err:
- self.logger.debug("Error handling CLI command", exc_info=err)
- finally:
- self.logger.debug("Client disconnected from Telnet CLI")
-
- async def _handle_jsonrpc(self, request: web.Request) -> web.Response:
- """Handle request for image proxy."""
- command_msg: CommandMessage = await request.json(loads=json_loads)
- self.logger.debug("Received request: %s", command_msg)
-
- if command_msg["method"] == "slim.request":
- # Slim request handler
- # {"method":"slim.request","id":1,"params":["aa:aa:ca:5a:94:4c",["status","-", 2, "tags:xcfldatgrKN"]]}
- player_id = command_msg["params"][0]
- command = str(command_msg["params"][1][0])
- args, kwargs = parse_args(command_msg["params"][1][1:])
-
- if handler := getattr(self, f"_handle_{command}", None):
- # run handler for command
- self.logger.debug(
- "Handling JSON-RPC-request (player: %s command: %s - args: %s - kwargs: %s)",
- player_id,
- command,
- str(args),
- str(kwargs),
- )
- cmd_result = handler(player_id, *args, **kwargs)
- if asyncio.iscoroutine(cmd_result):
- cmd_result = await cmd_result
-
- if cmd_result is None:
- cmd_result = {}
- elif not isinstance(cmd_result, dict):
- # individual values are returned with underscore ?!
- cmd_result = {f"_{command}": cmd_result}
- result: CommandResultMessage = {
- **command_msg,
- "result": cmd_result,
- }
- else:
- # no handler found
- self.logger.warning("No handler for %s", command)
- result: CommandErrorMessage = {
- **command_msg,
- "error": {"code": -1, "message": "Invalid command"},
- }
- # return the response to the client
- return web.json_response(result, dumps=json_dumps)
-
- async def _handle_cometd(self, request: web.Request) -> web.Response:
- """Handle request for image proxy."""
- return web.Response(status=404)
-
- def _handle_players(
- self,
- player_id: str,
- start_index: int | str = 0,
- limit: int = 999,
- **kwargs,
- ) -> PlayersResponse:
- """Handle players command."""
- players: list[PlayerItem] = []
- for index, mass_player in enumerate(self.mass.players.all()):
- if isinstance(start_index, int) and index < start_index:
- continue
- if len(players) > limit:
- break
- players.append(player_item_from_mass(start_index + index, mass_player))
- return PlayersResponse(count=len(players), players_loop=players)
-
- async def _handle_status(
- self,
- player_id: str,
- *args,
- start_index: int | str = "-",
- limit: int = 2,
- tags: str = "xcfldatgrKN",
- **kwargs,
- ) -> PlayerStatusResponse:
- """Handle player status command."""
- player = self.mass.players.get(player_id)
- assert player is not None
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
- if start_index == "-":
- start_index = queue.current_index or 0
- queue_items = []
- index = 0
- async for item in self.mass.players.queues.items(queue.queue_id):
- if index >= start_index:
- queue_items.append(item)
- if len(queue_items) == limit:
- break
- index += 1
- # we ignore the tags, just always send all info
- return player_status_from_mass(
- self.mass, player=player, queue=queue, queue_items=queue_items
- )
-
- def _handle_mixer(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player mixer command."""
- arg = args[0] if args else "?"
- player = self.mass.players.get(player_id)
- assert player is not None
-
- # <playerid> mixer volume <0 .. 100|-100 .. +100|?>
- if subcommand == "volume" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, arg)
- return
- if subcommand == "volume" and arg == "?":
- return player.volume_level
- if subcommand == "volume" and "+" in arg:
- volume_level = min(100, player.volume_level + int(arg.split("+")[1]))
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
- return
- if subcommand == "volume" and "-" in arg:
- volume_level = max(0, player.volume_level - int(arg.split("-")[1]))
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
- return
-
- # <playerid> mixer muting <0|1|toggle|?|>
- if subcommand == "muting" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.cmd_volume_mute, player_id, int(arg))
- return
- if subcommand == "muting" and arg == "toggle":
- self.mass.create_task(
- self.mass.players.cmd_volume_mute, player_id, not player.volume_muted
- )
- return
- if subcommand == "muting":
- return int(player.volume_muted)
-
- def _handle_time(self, player_id: str, number: str | int) -> int | None:
- """Handle player `time` command."""
- # <playerid> time <number|-number|+number|?>
- # The "time" command allows you to query the current number of seconds that the
- # current song has been playing by passing in a "?".
- # You may jump to a particular position in a song by specifying a number of seconds
- # to seek to. You may also jump to a relative position within a song by putting an
- # explicit "-" or "+" character before a number of seconds you would like to seek.
- player_queue = self.mass.players.queues.get_active_queue(player_id)
- assert player_queue is not None
-
- if number == "?":
- return int(player_queue.corrected_elapsed_time)
-
- if isinstance(number, str) and "+" in number or "-" in number:
- jump = int(number.split("+")[1])
- self.mass.create_task(self.mass.players.queues.skip, jump)
- else:
- self.mass.create_task(self.mass.players.queues.seek, number)
-
- def _handle_power(self, player_id: str, value: str | int) -> int | None:
- """Handle player `time` command."""
- # <playerid> power <0|1|?|>
- # The "power" command turns the player on or off.
- # Use 0 to turn off, 1 to turn on, ? to query and
- # no parameter to toggle the power state of the player.
- player = self.mass.players.get(player_id)
- assert player is not None
-
- if value == "?":
- return int(player.powered)
-
- self.mass.create_task(self.mass.players.cmd_power, player_id, bool(value))
-
- def _handle_playlist(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `playlist` command."""
- arg = args[0] if args else "?"
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
-
- # <playerid> playlist index <index|+index|-index|?> <fadeInSecs>
- if subcommand == "index" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.queues.play_index, player_id, arg)
- return
- if subcommand == "index" and arg == "?":
- return queue.current_index
- if subcommand == "index" and "+" in arg:
- next_index = (queue.current_index or 0) + int(arg.split("+")[1])
- self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
- return
- if subcommand == "index" and "-" in arg:
- next_index = (queue.current_index or 0) - int(arg.split("-")[1])
- self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
- return
-
- self.logger.warning("Unhandled command: playlist/%s", subcommand)
-
- def _handle_play(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `play` command."""
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
- self.mass.create_task(self.mass.players.queues.play, player_id)
-
- def _handle_stop(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `stop` command."""
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
- self.mass.create_task(self.mass.players.queues.stop, player_id)
-
- def _handle_pause(
- self,
- player_id: str,
- force: int = 0,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `stop` command."""
- queue = self.mass.players.queues.get_active_queue(player_id)
- assert queue is not None
-
- if force or queue.state == PlayerState.PLAYING:
- self.mass.create_task(self.mass.players.queues.pause, player_id)
- else:
- self.mass.create_task(self.mass.players.queues.play, player_id)
-
-
-def dict_to_strings(source: dict) -> list[str]:
- """Convert dict to key:value strings (used in slimproto cli)."""
- result: list[str] = []
-
- for key, value in source.items():
- if value in (None, ""):
- continue
- if isinstance(value, list):
- for subval in value:
- if isinstance(subval, dict):
- result += dict_to_strings(subval)
- else:
- result.append(str(subval))
- elif isinstance(value, dict):
- result += dict_to_strings(subval)
- else:
- result.append(f"{key}:{str(value)}")
- return result
+++ /dev/null
-{
- "type": "plugin",
- "domain": "lms_cli",
- "name": "LMS CLI",
- "description": "Basic CLI implementation (classic + JSON-RPC), which is (partly) compatible with Logitech Media Server to maximize compatibility with Squeezebox players.",
- "codeowners": ["@music-assistant"],
- "requirements": [],
- "documentation": "",
- "multi_instance": false,
- "builtin": true,
- "load_by_default": true,
- "icon": "md:api"
-}
+++ /dev/null
-"""Models used for the JSON-RPC API."""
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Any, TypedDict
-
-from music_assistant.common.models.enums import MediaType, PlayerState, RepeatMode
-
-if TYPE_CHECKING:
- from music_assistant.common.models.player import Player
- from music_assistant.common.models.player_queue import PlayerQueue
- from music_assistant.common.models.queue_item import QueueItem
- from music_assistant.server import MusicAssistant
-
-# ruff: noqa: UP013
-
-PLAYMODE_MAP = {
- PlayerState.IDLE: "stop",
- PlayerState.PLAYING: "play",
- PlayerState.OFF: "stop",
- PlayerState.PAUSED: "pause",
-}
-
-REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
-
-
-class CommandMessage(TypedDict):
- """Representation of Base JSON RPC Command Message."""
-
- # https://www.jsonrpc.org/specification
-
- id: int | str
- method: str
- params: list[str | int | list[str | int]]
-
-
-class CommandResultMessage(CommandMessage):
- """Representation of JSON RPC Result Message."""
-
- result: Any
-
-
-class ErrorDetails(TypedDict):
- """Representation of JSON RPC ErrorDetails."""
-
- code: int
- message: str
-
-
-class CommandErrorMessage(CommandMessage, TypedDict):
- """Base Representation of JSON RPC Command Message."""
-
- id: int | str | None
- error: ErrorDetails
-
-
-PlayerItem = TypedDict(
- "PlayerItem",
- {
- "playerindex": int,
- "playerid": str,
- "name": str,
- "modelname": str,
- "connected": int,
- "isplaying": int,
- "power": int,
- "model": str,
- "canpoweroff": int,
- "firmware": int,
- "isplayer": int,
- "displaytype": str,
- "uuid": str | None,
- "seq_no": int,
- "ip": str,
- },
-)
-
-
-def player_item_from_mass(playerindex: int, player: Player) -> PlayerItem:
- """Parse PlayerItem for the Json RPC interface from MA QueueItem."""
- return {
- "playerindex": playerindex,
- "playerid": player.player_id,
- "name": player.display_name,
- "modelname": player.device_info.model,
- "connected": int(player.available),
- "isplaying": 1 if player.state == PlayerState.PLAYING else 0,
- "power": int(player.powered),
- "model": "squeezelite",
- "canpoweroff": 1,
- "firmware": 0,
- "isplayer": 1,
- "displaytype": None,
- "uuid": None,
- "seq_no": 0,
- "ip": player.device_info.address,
- }
-
-
-PlayersResponse = TypedDict(
- "PlayersResponse",
- {
- "count": int,
- "players_loop": list[PlayerItem],
- },
-)
-
-
-PlaylistItem = TypedDict(
- "PlaylistItem",
- {
- "playlist index": int,
- "id": str,
- "title": str,
- "artist": str,
- "remote": int,
- "remote_title": str,
- "artwork_url": str,
- "bitrate": str,
- "duration": str | int | None,
- "coverid": str,
- },
-)
-
-
-def playlist_item_from_mass(
- mass: MusicAssistant, queue_item: QueueItem, index: int = 0
-) -> PlaylistItem:
- """Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
- if queue_item.media_item and queue_item.media_type == MediaType.TRACK:
- artist = queue_item.media_item.artists[0].name if queue_item.media_item.artists else ""
- album = queue_item.media_item.album.name if queue_item.media_item.album else ""
- title = queue_item.media_item.name
- elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
- if " - " in queue_item.streamdetails.stream_title:
- artist, title = queue_item.streamdetails.stream_title.split(" - ")
- else:
- artist = ""
- title = queue_item.streamdetails.stream_title
- album = queue_item.name
- else:
- artist = ""
- album = ""
- title = queue_item.name
- image_url = mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
- return {
- "playlist index": index,
- "id": queue_item.queue_item_id,
- "title": title,
- "artist": artist,
- "album": album,
- "genre": "",
- "remote": 0,
- "remote_title": queue_item.streamdetails.stream_title if queue_item.streamdetails else "",
- "artwork_url": image_url,
- "bitrate": "",
- "duration": queue_item.duration or 0,
- "coverid": "-94099753136392",
- }
-
-
-PlayerStatusResponse = TypedDict(
- "PlayerStatusResponse",
- {
- "time": int,
- "mode": str,
- "sync_slaves": str,
- "playlist_cur_index": int | None,
- "player_name": str,
- "sync_master": str,
- "player_connected": int,
- "power": int,
- "mixer volume": int,
- "playlist repeat": int,
- "playlist shuffle": int,
- "playlist mode": str,
- "player_ip": str,
- "remoteMeta": dict | None,
- "digital_volume_control": int,
- "playlist_timestamp": float,
- "current_title": str,
- "duration": int,
- "seq_no": int,
- "remote": int,
- "can_seek": int,
- "signalstrength": int,
- "rate": int,
- "playlist_tracks": int,
- "playlist_loop": list[PlaylistItem],
- },
-)
-
-
-def player_status_from_mass(
- mass: MusicAssistant, player: Player, queue: PlayerQueue, queue_items: list[QueueItem]
-) -> PlayerStatusResponse:
- """Parse PlayerStatusResponse for the Json RPC interface from MA info."""
- return {
- "time": queue.corrected_elapsed_time,
- "mode": PLAYMODE_MAP[queue.state],
- "sync_slaves": ",".join(player.group_childs),
- "playlist_cur_index": queue.current_index,
- "player_name": player.display_name,
- "sync_master": player.synced_to or "",
- "player_connected": int(player.available),
- "mixer volume": player.volume_level,
- "power": int(player.powered),
- "digital_volume_control": 1,
- "playlist_timestamp": 0, # TODO !
- "current_title": queue.current_item.queue_item_id
- if queue.current_item
- else "Music Assistant",
- "duration": queue.current_item.duration if queue.current_item else 0,
- "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
- "playlist shuffle": int(queue.shuffle_enabled),
- "playlist mode": "off",
- "player_ip": player.device_info.address,
- "seq_no": 0,
- "remote": 0,
- "can_seek": 1,
- "signalstrength": 0,
- "rate": 1,
- "playlist_tracks": queue.items,
- "playlist_loop": [
- playlist_item_from_mass(mass, item, queue.current_index + index)
- for index, item in enumerate(queue_items)
- ],
- }
from __future__ import annotations
import asyncio
+import statistics
import time
from collections import deque
from collections.abc import Callable, Generator
from aioslimproto.const import EventType as SlimEventType
from aioslimproto.discovery import start_discovery
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_OUTPUT_CODEC,
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
from music_assistant.common.models.enums import (
ConfigEntryType,
ContentType,
from music_assistant.common.models.errors import QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_PLAYERS
+from music_assistant.constants import CONF_CROSSFADE_DURATION
from music_assistant.server.models.player_provider import PlayerProvider
+from .cli import LmsCli
+
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
+CACHE_KEY_PREV_STATE = "slimproto_prev_state"
+
# sync constants
MIN_DEVIATION_ADJUST = 10 # 10 milliseconds
MAX_DEVIATION_ADJUST = 20000 # 10 seconds
-MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
+MIN_REQ_PLAYPOINTS = 2 # we need at least 8 measurements
+MIN_REQ_MILLISECONDS = 500
# TODO: Implement display support
STATE_MAP = {
SlimPlayerState.BUFFERING: PlayerState.PLAYING,
+ SlimPlayerState.BUFFER_READY: PlayerState.PLAYING,
SlimPlayerState.PAUSED: PlayerState.PAUSED,
SlimPlayerState.PLAYING: PlayerState.PLAYING,
SlimPlayerState.STOPPED: PlayerState.IDLE,
CONF_SYNC_ADJUST = "sync_adjust"
-CONF_PLAYER_VOLUME = "player_volume"
DEFAULT_PLAYER_VOLUME = 20
-SLIM_PLAYER_CONFIG_ENTRIES = (
- ConfigEntry(
- key=CONF_SYNC_ADJUST,
- type=ConfigEntryType.INTEGER,
- range=(0, 1500),
- default_value=0,
- label="Correct synchronization delay",
- description="If this player is playing audio synced with other players "
- "and you always hear the audio too late on this player, you can shift the audio a bit.",
- advanced=True,
- ),
- ConfigEntry(
- key=CONF_PLAYER_VOLUME,
- type=ConfigEntryType.INTEGER,
- default_value=DEFAULT_PLAYER_VOLUME,
- label="Default startup volume",
- description="Default volume level to set/use when initializing the player.",
- advanced=True,
- ),
-)
-
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
_socket_clients: dict[str, SlimClient]
_sync_playpoints: dict[str, deque[SyncPlayPoint]]
_virtual_providers: dict[str, tuple[Callable, Callable]]
+ _cli: LmsCli
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self._socket_clients = {}
self._sync_playpoints = {}
self._virtual_providers = {}
+ self._cli = LmsCli(self)
+ await self._cli.setup()
# autodiscovery of the slimproto server does not work
# when the port is not the default (3483) so we hardcode it for now
slimproto_port = 3483
- cli_port = cli_prov.cli_port if (cli_prov := self.mass.get_provider("lms_cli")) else None
self.logger.info("Starting SLIMProto server on port %s", slimproto_port)
self._socket_servers = (
# start slimproto server
await asyncio.start_server(self._create_client, "0.0.0.0", slimproto_port),
# setup discovery
- await start_discovery(slimproto_port, cli_port, self.mass.webserver.port),
+ await start_discovery(
+ self.mass.base_ip,
+ slimproto_port,
+ self._cli.cli_port,
+ self.mass.webserver.port,
+ "Music Assistant",
+ self.mass.server_id,
+ ),
)
async def unload(self) -> None:
if hasattr(self, "_socket_servers"):
for _server in self._socket_servers:
_server.close()
+ if hasattr(self, "_cli"):
+ await self._cli.unload()
self._socket_servers = None
async def _create_client(
event_type: SlimEventType, client: SlimClient, data: Any = None # noqa: ARG001
):
if event_type == SlimEventType.PLAYER_DISCONNECTED:
- self._handle_disconnected(client)
+ self.mass.create_task(self._handle_disconnected(client))
return
if event_type == SlimEventType.PLAYER_CONNECTED:
- self._handle_connected(client)
+ self.mass.create_task(self._handle_connected(client))
+ return
if event_type == SlimEventType.PLAYER_DECODER_READY:
self.mass.create_task(self._handle_decoder_ready(client))
return
+ if event_type == SlimEventType.PLAYER_BUFFER_READY:
+ self.mass.create_task(self._handle_buffer_ready(client))
+ return
+
if event_type == SlimEventType.PLAYER_HEARTBEAT:
self._handle_player_heartbeat(client)
return
# construct SlimClient from socket client
SlimClient(reader, writer, client_callback)
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- return SLIM_PLAYER_CONFIG_ENTRIES
+ # pick default codec based on capabilities
+ default_codec = ContentType.PCM
+ if client := self._socket_clients.get(player_id):
+ for fmt, fmt_type in (
+ ("flc", ContentType.FLAC),
+ ("pcm", ContentType.PCM),
+ ("mp3", ContentType.MP3),
+ ):
+ if fmt in client.supported_codecs:
+ default_codec = fmt_type
+ break
+
+ # create preset entries (for players that support it)
+ preset_entries = tuple()
+ if not (client and client.device_model in self._virtual_providers):
+ presets = []
+ async for playlist in self.mass.music.playlists.iter_db_items(True):
+ presets.append(ConfigValueOption(playlist.name, playlist.uri))
+ async for radio in self.mass.music.radio.iter_db_items(True):
+ presets.append(ConfigValueOption(radio.name, radio.uri))
+ # dynamically extend the amount of presets when needed
+ if self.mass.config.get_raw_player_config_value(player_id, "preset_15"):
+ preset_count = 20
+ elif self.mass.config.get_raw_player_config_value(player_id, "preset_10"):
+ preset_count = 15
+ elif self.mass.config.get_raw_player_config_value(player_id, "preset_5"):
+ preset_count = 10
+ else:
+ preset_count = 5
+ preset_entries = tuple(
+ ConfigEntry(
+ key=f"preset_{index}",
+ type=ConfigEntryType.STRING,
+ options=presets,
+ label=f"Preset {index}",
+ description="Assign a playable item to the player's preset. "
+ "Only supported on real squeezebox hardware or jive(lite) based emulators.",
+ advanced=False,
+ required=False,
+ )
+ for index in range(1, preset_count + 1)
+ )
+
+ return preset_entries + (
+ ConfigEntry(
+ key=CONF_SYNC_ADJUST,
+ type=ConfigEntryType.INTEGER,
+ range=(0, 1500),
+ default_value=0,
+ label="Correct synchronization delay",
+ description="If this player is playing audio synced with other players "
+ "and you always hear the audio too late on this player, "
+ "you can shift the audio a bit.",
+ advanced=True,
+ ),
+ CONF_ENTRY_CROSSFADE_DURATION,
+ ConfigEntry.from_dict(
+ {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": default_codec}
+ ),
+ )
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
# forward command to player and any connected sync child's
- for client in self._get_sync_clients(player_id):
- if client.state not in (
- SlimPlayerState.PAUSED,
- SlimPlayerState.BUFFERING,
- ):
- continue
- await client.play()
+ async with asyncio.TaskGroup() as tg:
+ for client in self._get_sync_clients(player_id):
+ if client.state not in (
+ SlimPlayerState.PAUSED,
+ SlimPlayerState.BUFFERING,
+ SlimPlayerState.BUFFER_READY,
+ ):
+ continue
+ tg.create_task(client.play())
async def cmd_play_media(
self,
) -> None:
"""Handle PlayMedia on slimproto player(s)."""
player_id = client.player_id
- # pick codec based on capabilities
- codec_map = (
- ("flc", ContentType.FLAC),
- ("pcm", ContentType.PCM),
- ("mp3", ContentType.MP3),
- )
- for fmt, fmt_type in codec_map:
- if fmt in client.supported_codecs:
- content_type = fmt_type
- break
- else:
- self.logger.debug("Could not auto determine supported codec, fallback to PCM")
- content_type = ContentType.PCM
+
url = await self.mass.streams.resolve_stream_url(
queue_item=queue_item,
player_id=player_id,
seek_position=seek_position,
fade_in=fade_in,
- content_type=content_type,
flow_mode=flow_mode,
)
+ if crossfade:
+ transition_duration = await self.mass.config.get_player_config_value(
+ player_id, CONF_CROSSFADE_DURATION
+ )
+ else:
+ transition_duration = 0
+
await client.play_url(
url=url,
- mime_type=f"audio/{content_type.value}",
- metadata={"item_id": queue_item.queue_item_id},
+ mime_type=f"audio/{url.split('.')[-1]}",
+ metadata={"item_id": queue_item.queue_item_id, "title": queue_item.name},
send_flush=send_flush,
transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
- transition_duration=10 if crossfade else 0,
+ transition_duration=transition_duration,
+ autostart=False,
)
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
# forward command to player and any connected sync child's
- for client in self._get_sync_clients(player_id):
- if client.state not in (
- SlimPlayerState.PLAYING,
- SlimPlayerState.BUFFERING,
- ):
- continue
- await client.pause()
+ async with asyncio.TaskGroup() as tg:
+ for client in self._get_sync_clients(player_id):
+ if client.state not in (
+ SlimPlayerState.PLAYING,
+ SlimPlayerState.BUFFERING,
+ SlimPlayerState.BUFFER_READY,
+ ):
+ continue
+ tg.create_task(client.pause())
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
if client := self._socket_clients.get(player_id):
await client.power(powered)
- # TODO: unsync client at poweroff if synced
+ # if player := self.mass.players.get(player_id, raise_unavailable=False):
+ # player.powered = powered
+ # self.mass.players.update(player_id)
+ # store last state in cache
+ await self.mass.cache.set(
+ f"{CACHE_KEY_PREV_STATE}.{player_id}", (powered, client.volume_level)
+ )
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
if client := self._socket_clients.get(player_id):
await client.volume_set(volume_level)
+ # store last state in cache
+ await self.mass.cache.set(
+ f"{CACHE_KEY_PREV_STATE}.{player_id}", (client.powered, volume_level)
+ )
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME MUTE command to given player."""
child_player.synced_to = parent_player.player_id
self.mass.players.update(child_player.player_id)
self.mass.players.update(parent_player.player_id)
- if parent_player.state == PlayerState.PLAYING:
+ if parent_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
# playback needs to be restarted to get all players in sync
# TODO: If there is any need, we could make this smarter where the new
# sync child waits for the next track.
player.powered = client.powered
player.state = STATE_MAP[client.state]
player.volume_level = client.volume_level
- player.volume_muted = client.muted
+ # player.volume_muted = client.muted
+ player.volume_muted = client.powered and client.muted
# set all existing player ids in `can_sync_with` field
player.can_sync_with = tuple(
x.player_id for x in self._socket_clients.values() if x.player_id != player_id
def _handle_player_heartbeat(self, client: SlimClient) -> None:
"""Process SlimClient elapsed_time update."""
- if client.state != SlimPlayerState.PLAYING:
- # ignore server heartbeats
+ if client.state == SlimPlayerState.STOPPED:
+ # ignore server heartbeats when stopped
return
player = self.mass.players.get(client.player_id)
sync_master = self._socket_clients[sync_master_id]
+ if sync_master.state != SlimPlayerState.PLAYING:
+ return
+ if client.state != SlimPlayerState.PLAYING:
+ return
+
# we collect a few playpoints of the player to determine
# average lag/drift so we can adjust accordingly
- sync_playpoints = self._sync_playpoints.setdefault(
- client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
- )
+ sync_playpoints = self._sync_playpoints.setdefault(client.player_id, deque(maxlen=5))
# make sure client has loaded the same track as sync master
client_item_id = client.current_metadata["item_id"] if client.current_metadata else None
+ prev_item_id = client._next_metadata["item_id"] if client._next_metadata else None
master_item_id = (
sync_master.current_metadata["item_id"] if sync_master.current_metadata else None
)
if client_item_id != master_item_id:
+ return
+ if client_item_id and prev_item_id and client_item_id != prev_item_id:
+ # transitioning
sync_playpoints.clear()
return
if len(sync_playpoints) < MIN_REQ_PLAYPOINTS:
return
- # if we have enough playpoints, get the average value
- prev_diffs = [x.diff for x in sync_playpoints]
- avg_diff = sum(prev_diffs) / len(prev_diffs)
+ # get the average diff
+ avg_diff = statistics.fmean(sync_playpoints)
delta = abs(avg_diff)
if delta < MIN_DEVIATION_ADJUST:
"""Handle decoder ready event, player is ready for the next track."""
if not client.current_metadata:
return
+ if client.state == SlimPlayerState.STOPPED:
+ return
try:
next_item, crossfade = await self.mass.players.queues.player_ready_for_next_track(
client.player_id, client.current_metadata["item_id"]
except QueueEmpty:
pass
- def _handle_connected(self, client: SlimClient) -> None:
+ async def _handle_buffer_ready(self, client: SlimClient) -> None:
+ """Handle buffer ready event, player has buffered a (new) track."""
+ player = self.mass.players.get(client.player_id)
+ if player.synced_to:
+ # unpause of sync child is handled by sync master
+ return
+ if not player.group_childs:
+ # not a sync group, continue
+ await client.play()
+ count = 0
+ while count < 20:
+ childs_total = 0
+ childs_ready = 0
+ for sync_child in self._get_sync_clients(player.player_id):
+ childs_total += 1
+ if sync_child.state == SlimPlayerState.BUFFER_READY:
+ childs_ready += 1
+ if childs_total == childs_ready:
+ break
+ await asyncio.sleep(0.2)
+ # all child's ready (or timeout) - start play
+ await self.cmd_play(player.player_id)
+
+ async def _handle_connected(self, client: SlimClient) -> None:
"""Handle a client connected event."""
player_id = client.player_id
- prev = self._socket_clients.pop(player_id, None)
- if prev is not None:
- # player reconnected while we did not yet cleanup the old socket
- prev.disconnect()
+ if existing := self._socket_clients.pop(player_id, None):
+ # race condition: new socket client connected while
+ # the old one has not yet been cleaned up
+ existing.disconnect()
+
self._socket_clients[player_id] = client
- if prev is None:
- # update existing players so they can update their `can_sync_with` field
- for client in self._socket_clients.values():
- self._handle_player_update(client)
- # handle init/startup volume
- init_volume = self.mass.config.get(
- f"{CONF_PLAYERS}/{player_id}/{CONF_PLAYER_VOLUME}", DEFAULT_PLAYER_VOLUME
- )
- self.mass.create_task(client.volume_set(init_volume))
+ # update all attributes
+ self._handle_player_update(client)
+ # update existing players so they can update their `can_sync_with` field
+ for item in self._socket_clients.values():
+ if item.player_id == player_id:
+ continue
+ self._handle_player_update(item)
+ # restore volume and power state
+ if last_state := await self.mass.cache.get(f"{CACHE_KEY_PREV_STATE}.{player_id}"):
+ init_power = last_state[0]
+ init_volume = last_state[1]
+ else:
+ init_volume = DEFAULT_PLAYER_VOLUME
+ init_power = False
+ await client.power(init_power)
+ await client.volume_set(init_volume)
- def _handle_disconnected(self, client: SlimClient) -> None:
+ async def _handle_disconnected(self, client: SlimClient) -> None:
"""Handle a client disconnected event."""
player_id = client.player_id
- prev = self._socket_clients.pop(player_id, None)
- if prev is None:
- # already cleaned up
- return
+ if client := self._socket_clients.pop(player_id, None):
+ # store last state in cache
+ await self.mass.cache.set(
+ f"{CACHE_KEY_PREV_STATE}.{player_id}", (client.powered, client.volume_level)
+ )
if player := self.mass.players.get(player_id):
player.available = False
self.mass.players.update(player_id)
def _get_corrected_elapsed_milliseconds(self, client: SlimClient) -> int:
"""Return corrected elapsed milliseconds."""
- sync_delay = self.mass.config.get_player_config_value(client.player_id, CONF_SYNC_ADJUST)
+ sync_delay = self.mass.config.get_raw_player_config_value(
+ client.player_id, CONF_SYNC_ADJUST, 0
+ )
if sync_delay != 0:
return client.elapsed_milliseconds - sync_delay
return client.elapsed_milliseconds
--- /dev/null
+"""
+CLI interface which is more or less compatible with Logitech Media Server.
+
+Implemented protocols: CometD, Telnet and JSON-RPC.
+
+NOTE: This only implements the bare minimum to have functional players.
+Output is adjusted to conform to Music Assistant logic or just for simplification.
+Goal is player compatibility, not API compatibility.
+Users that need more, should just stay with a full blown LMS server.
+"""
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import time
+import urllib.parse
+from collections.abc import Callable
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any
+
+import shortuuid
+from aiohttp import web
+
+from music_assistant.common.helpers.json import json_dumps, json_loads
+from music_assistant.common.helpers.util import empty_queue, select_free_port
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.enums import EventType, PlayerState, QueueOption, RepeatMode
+from music_assistant.common.models.errors import MusicAssistantError
+from music_assistant.common.models.event import MassEvent
+from music_assistant.common.models.media_items import MediaItemType
+
+from .models import (
+ CometDResponse,
+ CommandErrorMessage,
+ CommandMessage,
+ CommandResultMessage,
+ PlayerItem,
+ PlayersResponse,
+ PlayerStatusResponse,
+ ServerStatusResponse,
+ SlimMediaItem,
+ SlimSubscribeMessage,
+ get_media_details_from_mass,
+ player_item_from_mass,
+ player_status_from_mass,
+)
+
+if TYPE_CHECKING:
+ from music_assistant.common.models.config_entries import ProviderConfig
+ from music_assistant.common.models.provider import ProviderManifest
+ from music_assistant.server import MusicAssistant
+ from music_assistant.server.models import ProviderInstanceType
+
+ from . import SlimprotoProvider
+
+
+# ruff: noqa: ARG002, E501
+
+ArgsType = list[int | str]
+KwargsType = dict[str, Any]
+
+
+@dataclass
+class CometDClient:
+ """Representation of a connected CometD client."""
+
+ client_id: str
+ player_id: str = ""
+ queue: asyncio.Queue[CometDResponse] = field(default_factory=asyncio.Queue)
+ last_seen: int = int(time.time())
+ first_event: CometDResponse | None = None
+ meta_subscriptions: set[str] = field(default_factory=set)
+ slim_subscriptions: dict[str, SlimSubscribeMessage] = field(default_factory=dict)
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ prov = LmsCli(mass, manifest, config)
+ await prov.handle_setup()
+ return prov
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ return tuple() # we do not have any config entries (yet)
+
+
+def parse_value(raw_value: int | str) -> int | str | tuple[str, int | str]:
+ """
+ Transform API param into a usable value.
+
+ Integer values are sometimes sent as string so we try to parse that.
+ """
+ if isinstance(raw_value, str):
+ if ":" in raw_value:
+ # this is a key:value value
+ key, val = raw_value.split(":", 1)
+ if val.isnumeric():
+ val = int(val)
+ return (key, val)
+ if raw_value.isnumeric():
+ # this is an integer sent as string
+ return int(raw_value)
+ return raw_value
+
+
+def parse_args(raw_values: list[int | str]) -> tuple[ArgsType, KwargsType]:
+ """Pargse Args and Kwargs from raw CLI params."""
+ args: ArgsType = []
+ kwargs: KwargsType = {}
+ for raw_value in raw_values:
+ value = parse_value(raw_value)
+ if isinstance(value, tuple):
+ kwargs[value[0]] = value[1]
+ else:
+ args.append(value)
+ return (args, kwargs)
+
+
+class LmsCli:
+ """Basic LMS CLI (json rpc and telnet) implementation, (partly) compatible with Logitech Media Server."""
+
+ cli_port: int = 9090
+ _unsub_callback: Callable | None = None
+ _periodic_task: asyncio.Task | None = None
+
+ def __init__(self, slimproto: SlimprotoProvider) -> None:
+ """Initialize."""
+ self.slimproto = slimproto
+ self.logger = self.slimproto.logger.getChild("cli")
+ self.mass = self.slimproto.mass
+ self._cometd_clients: dict[str, CometDClient] = {}
+
+ async def setup(self) -> None:
+ """Handle async initialization of the plugin."""
+ self.logger.info("Registering jsonrpc endpoints on the webserver")
+ self.mass.webserver.register_route("/jsonrpc.js", self._handle_jsonrpc)
+ self.mass.webserver.register_route("/cometd", self._handle_cometd)
+ # setup (telnet) cli for players requesting basic info on that port
+ self.cli_port = await select_free_port(9090, 9190)
+ self.logger.info("Starting (telnet) CLI on port %s", self.cli_port)
+ await asyncio.start_server(self._handle_cli_client, "0.0.0.0", self.cli_port)
+ self._unsub_callback = self.mass.subscribe(
+ self._on_mass_event,
+ (EventType.PLAYER_UPDATED, EventType.QUEUE_UPDATED),
+ )
+ self._periodic_task = self.mass.create_task(self._do_periodic())
+
+ async def unload(self) -> None:
+ """
+ Handle unload/close of the provider.
+
+ Called when provider is deregistered (e.g. MA exiting or config reloading).
+ """
+ self.mass.webserver.unregister_route("/jsonrpc.js")
+ if self._unsub_callback:
+ self._unsub_callback()
+ self._unsub_callback = None
+ if self._periodic_task:
+ self._periodic_task.cancel()
+ self._periodic_task = None
+
+ async def _handle_cli_client(
+ self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+ ) -> None:
+ """Handle new connection on the legacy CLI."""
+ # https://raw.githubusercontent.com/Logitech/slimserver/public/7.8/HTML/EN/html/docs/cli-api.html
+ # https://github.com/elParaguayo/LMS-CLI-Documentation/blob/master/LMS-CLI.md
+ self.logger.info("Client connected on Telnet CLI")
+ try:
+ while True:
+ raw_request = await reader.readline()
+ raw_request = raw_request.strip().decode("utf-8")
+ # request comes in as url encoded strings, separated by space
+ raw_params = [urllib.parse.unquote(x) for x in raw_request.split(" ")]
+ # the first param is either a macaddress or a command
+ if ":" in raw_params[0]:
+ # assume this is a mac address (=player_id)
+ player_id = raw_params[0]
+ command = raw_params[1]
+ command_params = raw_params[2:]
+ else:
+ player_id = ""
+ command = raw_params[0]
+ command_params = raw_params[1:]
+
+ args, kwargs = parse_args(command_params)
+
+ response: str = raw_request
+
+ # check if we have a handler for this command
+ # note that we only have support for very limited commands
+ # just enough for compatibility with players but not to be used as api
+ # with 3rd party tools!
+ if handler := getattr(self, f"_handle_{command}", None):
+ self.logger.debug(
+ "Handling CLI-request (player: %s command: %s - args: %s - kwargs: %s)",
+ player_id,
+ command,
+ str(args),
+ str(kwargs),
+ )
+ cmd_result: list[str] = handler(player_id, *args, **kwargs)
+ if asyncio.iscoroutine(cmd_result):
+ cmd_result = await cmd_result
+
+ if isinstance(cmd_result, dict):
+ result_parts = dict_to_strings(cmd_result)
+ result_str = " ".join(urllib.parse.quote(x) for x in result_parts)
+ elif not cmd_result:
+ result_str = ""
+ else:
+ result_str = str(cmd_result)
+ response += " " + result_str
+ else:
+ self.logger.warning(
+ "No handler for %s (player: %s - args: %s - kwargs: %s)",
+ command,
+ player_id,
+ str(args),
+ str(kwargs),
+ )
+ # echo back the request and the result (if any)
+ response += "\n"
+ writer.write(response.encode("utf-8"))
+ await writer.drain()
+ except ConnectionResetError:
+ pass
+ except Exception as err:
+ self.logger.debug("Error handling CLI command", exc_info=err)
+ finally:
+ self.logger.debug("Client disconnected from Telnet CLI")
+
+ async def _handle_jsonrpc(self, request: web.Request) -> web.Response:
+ """Handle request on JSON-RPC endpoint."""
+ command_msg: CommandMessage = await request.json(loads=json_loads)
+ self.logger.debug("Received request: %s", command_msg)
+ cmd_result = await self._handle_request(command_msg["params"])
+ if cmd_result is None:
+ result: CommandErrorMessage = {
+ **command_msg,
+ "error": {"code": -1, "message": "Invalid command"},
+ }
+ else:
+ result: CommandResultMessage = {
+ **command_msg,
+ "result": cmd_result,
+ }
+ # return the response to the client
+ return web.json_response(result, dumps=json_dumps)
+
+ async def _handle_cometd(self, request: web.Request) -> web.Response: # noqa: PLR0912
+ """
+ Handle CometD request on the json CLI.
+
+ https://github.com/Logitech/slimserver/blob/public/8.4/Slim/Web/Cometd.pm
+ """
+ logger = self.logger.getChild("cometd")
+ # ruff: noqa: PLR0915
+ clientid: str = ""
+ response = []
+ streaming = False
+ json_msg: list[dict[str, Any]] = await request.json()
+ # cometd message is an array of commands/messages
+ for cometd_msg in json_msg:
+ channel = cometd_msg.get("channel")
+ # try to figure out clientid
+ if not clientid:
+ clientid = cometd_msg.get("clientId")
+ if not clientid and channel == "/meta/handshake":
+ # generate new clientid
+ clientid = shortuuid.uuid()
+ self._cometd_clients[clientid] = CometDClient(
+ client_id=clientid,
+ )
+ elif not clientid and channel in ("/slim/subscribe", "/slim/request"):
+ # pull clientId out of response channel
+ clientid = cometd_msg["data"]["response"].split("/")[1]
+ elif not clientid and channel == "/slim/unsubscribe":
+ # pull clientId out of unsubscribe
+ clientid = cometd_msg["data"]["unsubscribe"].split("/")[1]
+ assert clientid, "No clientID provided"
+ logger.debug("Incoming message for channel '%s' - clientid: %s", channel, clientid)
+
+ # messageid is optional but if provided we must pass it along
+ msgid = cometd_msg.get("id", "")
+
+ if clientid not in self._cometd_clients:
+ # If a client sends any request and we do not have a valid clid record
+ # because the streaming connection has been lost for example, re-handshake them
+ return web.json_response(
+ [
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": None,
+ "successful": False,
+ "timestamp": time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime()),
+ "error": "invalid clientId",
+ "advice": {
+ "reconnect": "handshake",
+ "interval": 0,
+ },
+ }
+ ]
+ )
+
+ # get the cometd_client object for the clientid
+ cometd_client = self._cometd_clients[clientid]
+ cometd_client.last_seen = int(time.time())
+
+ if channel == "/meta/handshake":
+ # handshake message
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "version": "1.0",
+ "supportedConnectionTypes": ["long-polling", "streaming"],
+ "clientId": clientid,
+ "successful": True,
+ "advice": {
+ "reconnect": "retry", # one of "none", "retry", "handshake"
+ "interval": 0, # initial interval is 0 to support long-polling's connect request
+ "timeout": 60000,
+ },
+ }
+ )
+ # playerid (mac) and uuid belonging to the client is sent in the ext field
+ if player_id := cometd_msg.get("ext", {}).get("mac"):
+ cometd_client.player_id = player_id
+ if (uuid := cometd_msg.get("ext", {}).get("uuid")) and (
+ player := self.mass.players.get(player_id)
+ ):
+ player.extra_data["uuid"] = uuid
+
+ elif channel in ("/meta/connect", "/meta/reconnect"):
+ # (re)connect message
+ logger.debug("Client (re-)connected: %s", clientid)
+ streaming = cometd_msg["connectionType"] == "streaming"
+ # confirm the connection
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ "timestamp": time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime()),
+ "advice": {
+ # update interval for streaming mode
+ "interval": 5000
+ if streaming
+ else 0
+ },
+ }
+ )
+ # TODO: do we want to implement long-polling support too ?
+ # https://github.com/Logitech/slimserver/blob/d9ebda7ebac41e82f1809dd85b0e4446e0c9be36/Slim/Web/Cometd.pm#L292
+
+ elif channel == "/meta/disconnect":
+ # disconnect message
+ logger.debug("CometD Client disconnected: %s", clientid)
+ self._cometd_clients.pop(clientid)
+ return web.json_response(
+ [
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ "timestamp": time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime()),
+ }
+ ]
+ )
+
+ elif channel == "/meta/subscribe":
+ cometd_client.meta_subscriptions.add(cometd_msg["subscription"])
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ "subscription": cometd_msg["subscription"],
+ }
+ )
+
+ elif channel == "/meta/unsubscribe":
+ if cometd_msg["subscription"] in cometd_client.meta_subscriptions:
+ cometd_client.meta_subscriptions.remove(cometd_msg["subscription"])
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ "subscription": cometd_msg["subscription"],
+ }
+ )
+ elif channel == "/slim/subscribe": # noqa: SIM114
+ # A request to execute & subscribe to some Logitech Media Server event
+ # A valid /slim/subscribe message looks like this:
+ # {
+ # channel => '/slim/subscribe',
+ # id => <unique id>,
+ # data => {
+ # response => '/slim/serverstatus', # the channel all messages should be sent back on
+ # request => [ '', [ 'serverstatus', 0, 50, 'subscribe:60' ],
+ # priority => <value>, # optional priority value, is passed-through with the response
+ # }
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ }
+ )
+ cometd_client.slim_subscriptions[cometd_msg["data"]["response"]] = cometd_msg
+ # Return one-off result now, rest is handled by the subscription logic
+ self._handle_cometd_request(cometd_client, cometd_msg)
+
+ elif channel == "/slim/unsubscribe":
+ # A request to unsubscribe from a Logitech Media Server event, this is not the same as /meta/unsubscribe
+ # A valid /slim/unsubscribe message looks like this:
+ # {
+ # channel => '/slim/unsubscribe',
+ # data => {
+ # unsubscribe => '/slim/serverstatus',
+ # }
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ }
+ )
+ cometd_client.slim_subscriptions.pop(cometd_msg["data"]["unsubscribe"], None)
+
+ elif channel == "/slim/request":
+ # A request to execute a one-time Logitech Media Server event
+ # A valid /slim/request message looks like this:
+ # {
+ # channel => '/slim/request',
+ # id => <unique id>, (optional)
+ # data => {
+ # response => '/slim/<clientId>/request',
+ # request => [ '', [ 'menu', 0, 100, ],
+ # priority => <value>, # optional priority value, is passed-through with the response
+ # }
+ if not msgid:
+ # If the caller does not want the response, id will be undef
+ logger.debug("Not sending response to request, caller does not want it")
+ else:
+ # This response is optional, but we do it anyway
+ response.append(
+ {
+ "id": msgid,
+ "channel": channel,
+ "clientId": clientid,
+ "successful": True,
+ }
+ )
+ self._handle_cometd_request(cometd_client, cometd_msg)
+ else:
+ logger.warning("Unhandled channel %s", channel)
+ # always reply with the (default) response to every message
+ response.append(
+ {
+ "channel": channel,
+ "id": msgid,
+ "clientId": clientid,
+ "successful": True,
+ }
+ )
+ # append any remaining messages from the queue
+ while True:
+ try:
+ msg = cometd_client.queue.get_nowait()
+ response.append(msg)
+ except asyncio.QueueEmpty:
+ break
+ # send response
+ headers = {
+ "Server": "Logitech Media Server (7.9.9 - 1667251155)",
+ "Cache-Control": "no-cache",
+ "Pragma": "no-cache",
+ "Expires": "-1",
+ "Connection": "keep-alive",
+ }
+ # regular command/handshake messages are just replied and connection closed
+ if not streaming:
+ return web.json_response(response, headers=headers)
+
+ # streaming mode: send messages from the queue to the client
+ # the subscription connection is kept open and events are streamed to the client
+ headers.update(
+ {
+ "Content-Type": "application/json",
+ }
+ )
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers=headers,
+ )
+ resp.enable_chunked_encoding()
+ await resp.prepare(request)
+ chunk = json_dumps(response).encode("utf8")
+ await resp.write(chunk)
+
+ # keep delivering messages to the client until it disconnects
+ # keep sending messages/events from the client's queue
+ while True:
+ # make sure we always send an array of messages
+ msg = [await cometd_client.queue.get()]
+ try:
+ chunk = json_dumps(msg).encode("utf8")
+ await resp.write(chunk)
+ cometd_client.last_seen = int(time.time())
+ except ConnectionResetError:
+ break
+ return resp
+
+ def _handle_cometd_request(self, client: CometDClient, cometd_request: dict[str, Any]) -> None:
+ """Handle request for CometD client (and put result on client queue)."""
+
+ async def _handle():
+ result = await self._handle_request(cometd_request["data"]["request"])
+ await client.queue.put(
+ {
+ "channel": cometd_request["data"]["response"],
+ "id": cometd_request["id"],
+ "data": result,
+ "ext": {"priority": cometd_request["data"].get("priority")},
+ }
+ )
+
+ self.mass.create_task(_handle())
+
+ async def _handle_request(self, params: tuple[str, list[str | int]]) -> Any:
+ """Handle command for either JSON or CometD request."""
+ # Slim request handler
+ # {"method":"slim.request","id":1,"params":["aa:aa:ca:5a:94:4c",["status","-", 2, "tags:xcfldatgrKN"]]}
+ self.logger.debug(
+ "Handling request: %s",
+ str(params),
+ )
+ player_id = params[0]
+ command = str(params[1][0])
+ args, kwargs = parse_args(params[1][1:])
+ if player_id and "seq_no" in kwargs and (player := self.mass.players.get(player_id)):
+ player.extra_data["seq_no"] = int(kwargs["seq_no"])
+ if handler := getattr(self, f"_handle_{command}", None):
+ # run handler for command
+ cmd_result = handler(player_id, *args, **kwargs)
+ if asyncio.iscoroutine(cmd_result):
+ cmd_result = await cmd_result
+ if cmd_result is None:
+ cmd_result = {}
+ elif not isinstance(cmd_result, dict):
+ # individual values are returned with underscore ?!
+ cmd_result = {f"_{command}": cmd_result}
+ return cmd_result
+ # no handler found
+ self.logger.warning("No handler for %s", command)
+ return None
+
+ def _handle_players(
+ self,
+ player_id: str,
+ start_index: int | str = 0,
+ limit: int = 999,
+ **kwargs,
+ ) -> PlayersResponse:
+ """Handle players command."""
+ players: list[PlayerItem] = []
+ for index, mass_player in enumerate(self.mass.players.all()):
+ if isinstance(start_index, int) and index < start_index:
+ continue
+ if len(players) > limit:
+ break
+ players.append(player_item_from_mass(start_index + index, mass_player))
+ return PlayersResponse(count=len(players), players_loop=players)
+
+ async def _handle_status(
+ self,
+ player_id: str,
+ offset: int | str = "-",
+ limit: int = 2,
+ tags: str = "xcfldatgrKN",
+ **kwargs,
+ ) -> PlayerStatusResponse:
+ """Handle player status command."""
+ player = self.mass.players.get(player_id)
+ if player is None:
+ return None
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+ start_index = queue.current_index or 0 if offset == "-" else offset
+ queue_items = []
+ index = 0
+ async for item in self.mass.players.queues.items(queue.queue_id):
+ if index >= start_index:
+ queue_items.append(item)
+ if len(queue_items) == limit:
+ break
+ index += 1
+ # we ignore the tags, just always send all info
+ presets = await self._get_preset_items(player_id)
+ return player_status_from_mass(
+ self.mass,
+ player=player,
+ queue=queue,
+ queue_items=queue_items,
+ offset=offset,
+ presets=presets,
+ )
+
+ async def _handle_serverstatus(
+ self,
+ player_id: str,
+ start_index: int = 0,
+ limit: int = 2,
+ **kwargs,
+ ) -> ServerStatusResponse:
+ """Handle server status command."""
+ players: list[PlayerItem] = []
+ for index, mass_player in enumerate(self.mass.players.all()):
+ if isinstance(start_index, int) and index < start_index:
+ continue
+ if len(players) > limit:
+ break
+ players.append(player_item_from_mass(start_index + index, mass_player))
+ return ServerStatusResponse(
+ {
+ "httpport": self.mass.webserver.port,
+ "ip": self.mass.base_ip,
+ "version": "7.999.999",
+ # "uuid": self.mass.server_id,
+ "uuid": "aioslimproto",
+ # TODO: set these vars ?
+ "info total duration": 0,
+ "info total genres": 0,
+ "sn player count": 0,
+ "lastscan": 1685548099,
+ "info total albums": 0,
+ "info total songs": 0,
+ "info total artists": 0,
+ "players_loop": players,
+ "player count": len(players),
+ "other player count": 0,
+ "other_players_loop": [],
+ }
+ )
+
+ async def _handle_firmwareupgrade(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> ServerStatusResponse:
+ """Handle firmwareupgrade command."""
+ return {"firmwareUpgrade": 0, "relativeFirmwareUrl": "/firmware/baby_7.7.3_r16676.bin"}
+
+ async def _handle_artworkspec(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> ServerStatusResponse:
+ """Handle firmwareupgrade command."""
+ # https://github.com/Logitech/slimserver/blob/e9c2f88e7ca60b3648b66116240f3f5fe6ca3188/Slim/Control/Commands.pm#L224
+ return None
+
+ def _handle_mixer(
+ self,
+ player_id: str,
+ subcommand: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player mixer command."""
+ arg = args[0] if args else "?"
+ player = self.mass.players.get(player_id)
+ assert player is not None
+
+ # <playerid> mixer volume <0 .. 100|-100 .. +100|?>
+ if subcommand == "volume" and isinstance(arg, int):
+ if "seq_no" in kwargs:
+ # handle a (jive based) squeezebox that already executed the command
+ # itself and just reports the new state
+ player.volume_level = arg
+ # self.mass.players.update(player_id)
+ else:
+ self.mass.create_task(self.mass.players.cmd_volume_set, player_id, arg)
+ return
+ if subcommand == "volume" and arg == "?":
+ return player.volume_level
+ if subcommand == "volume" and "+" in arg:
+ volume_level = min(100, player.volume_level + int(arg.split("+")[1]))
+ self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
+ return
+ if subcommand == "volume" and "-" in arg:
+ volume_level = max(0, player.volume_level - int(arg.split("-")[1]))
+ self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
+ return
+
+ # <playerid> mixer muting <0|1|toggle|?|>
+ if subcommand == "muting" and isinstance(arg, int):
+ self.mass.create_task(self.mass.players.cmd_volume_mute, player_id, int(arg))
+ return
+ if subcommand == "muting" and arg == "toggle":
+ self.mass.create_task(
+ self.mass.players.cmd_volume_mute, player_id, not player.volume_muted
+ )
+ return
+ if subcommand == "muting":
+ return int(player.volume_muted)
+ self.logger.warning(
+ "No handler for mixer/%s (player: %s - args: %s - kwargs: %s)",
+ subcommand,
+ player_id,
+ str(args),
+ str(kwargs),
+ )
+
+ def _handle_time(self, player_id: str, number: str | int) -> int | None:
+ """Handle player `time` command."""
+ # <playerid> time <number|-number|+number|?>
+ # The "time" command allows you to query the current number of seconds that the
+ # current song has been playing by passing in a "?".
+ # You may jump to a particular position in a song by specifying a number of seconds
+ # to seek to. You may also jump to a relative position within a song by putting an
+ # explicit "-" or "+" character before a number of seconds you would like to seek.
+ player_queue = self.mass.players.queues.get_active_queue(player_id)
+ assert player_queue is not None
+
+ if number == "?":
+ return int(player_queue.corrected_elapsed_time)
+
+ if isinstance(number, str) and ("+" in number or "-" in number):
+ jump = int(number.split("+")[1])
+ self.mass.create_task(self.mass.players.queues.skip, player_queue.queue_id, jump)
+ else:
+ self.mass.create_task(self.mass.players.queues.seek, player_queue.queue_id, number)
+
+ def _handle_power(self, player_id: str, value: str | int, *args, **kwargs) -> int | None:
+ """Handle player `time` command."""
+ # <playerid> power <0|1|?|>
+ # The "power" command turns the player on or off.
+ # Use 0 to turn off, 1 to turn on, ? to query and
+ # no parameter to toggle the power state of the player.
+ player = self.mass.players.get(player_id)
+ assert player is not None
+
+ if value == "?":
+ return int(player.powered)
+ if "seq_no" in kwargs:
+ # handle a (jive based) squeezebox that already executed the command
+ # itself and just reports the new state
+ player.powered = bool(value)
+ # self.mass.players.update(player_id)
+ return
+
+ self.mass.create_task(self.mass.players.cmd_power, player_id, bool(value))
+
+ def _handle_playlist(
+ self,
+ player_id: str,
+ subcommand: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `playlist` command."""
+ arg = args[0] if args else "?"
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+
+ # <playerid> playlist index <index|+index|-index|?> <fadeInSecs>
+ if subcommand == "index" and isinstance(arg, int):
+ self.mass.create_task(self.mass.players.queues.play_index, player_id, arg)
+ return
+ if subcommand == "index" and arg == "?":
+ return queue.current_index
+ if subcommand == "index" and "+" in arg:
+ next_index = (queue.current_index or 0) + int(arg.split("+")[1])
+ self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
+ return
+ if subcommand == "index" and "-" in arg:
+ next_index = (queue.current_index or 0) - int(arg.split("-")[1])
+ self.mass.create_task(self.mass.players.queues.play_index, player_id, next_index)
+ return
+ if subcommand == "shuffle":
+ self.mass.players.queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
+ return
+ if subcommand == "repeat":
+ if queue.repeat_mode == RepeatMode.ALL:
+ new_repeat_mode = RepeatMode.OFF
+ elif queue.repeat_mode == RepeatMode.OFF:
+ new_repeat_mode = RepeatMode.ONE
+ else:
+ new_repeat_mode = RepeatMode.ALL
+ self.mass.players.queues.set_repeat(queue.queue_id, new_repeat_mode)
+ return
+ if subcommand == "crossfade":
+ self.mass.players.queues.set_crossfade(queue.queue_id, not queue.crossfade_enabled)
+ return
+
+ self.logger.warning("Unhandled command: playlist/%s", subcommand)
+
+ def _handle_playlistcontrol(
+ self,
+ player_id: str,
+ *args,
+ cmd: str,
+ uri: str,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `playlistcontrol` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ if cmd == "play":
+ self.mass.create_task(
+ self.mass.players.queues.play_media(queue.queue_id, uri, QueueOption.PLAY)
+ )
+ return
+ if cmd == "load":
+ self.mass.create_task(
+ self.mass.players.queues.play_media(queue.queue_id, uri, QueueOption.REPLACE)
+ )
+ return
+ if cmd == "add":
+ self.mass.create_task(
+ self.mass.players.queues.play_media(queue.queue_id, uri, QueueOption.ADD)
+ )
+ return
+ if cmd == "insert":
+ self.mass.create_task(
+ self.mass.players.queues.play_media(queue.queue_id, uri, QueueOption.IN)
+ )
+ return
+ self.logger.warning("Unhandled command: playlistcontrol/%s", cmd)
+
+ def _handle_play(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `play` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+ self.mass.create_task(self.mass.players.queues.play, player_id)
+
+ def _handle_stop(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `stop` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+ self.mass.create_task(self.mass.players.queues.stop, player_id)
+
+ def _handle_pause(
+ self,
+ player_id: str,
+ force: int = 0,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player `stop` command."""
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ assert queue is not None
+
+ if force or queue.state == PlayerState.PLAYING:
+ self.mass.create_task(self.mass.players.queues.pause, player_id)
+ else:
+ self.mass.create_task(self.mass.players.queues.play, player_id)
+
+ def _handle_mode(
+ self,
+ player_id: str,
+ subcommand: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player 'mode' command."""
+ if subcommand == "play":
+ return self._handle_play(player_id, *args, **kwargs)
+ if subcommand == "pause":
+ return self._handle_pause(player_id, *args, **kwargs)
+ if subcommand == "stop":
+ return self._handle_stop(player_id, *args, **kwargs)
+
+ self.logger.warning(
+ "No handler for mode/%s (player: %s - args: %s - kwargs: %s)",
+ subcommand,
+ player_id,
+ str(args),
+ str(kwargs),
+ )
+
+ def _handle_button(
+ self,
+ player_id: str,
+ subcommand: str,
+ *args,
+ **kwargs,
+ ) -> int | None:
+ """Handle player 'button' command."""
+ player = self.mass.players.get(player_id)
+ assert player is not None
+
+ if subcommand == "volup":
+ self.mass.create_task(self.mass.players.cmd_volume_up, player_id)
+ return
+ if subcommand == "voldown":
+ self.mass.create_task(self.mass.players.cmd_volume_down, player_id)
+ return
+ if subcommand == "power":
+ self.mass.create_task(self.mass.players.cmd_power, player_id, not player.powered)
+ return
+ # queue related button commands
+ queue = self.mass.players.queues.get_active_queue(player_id)
+ if subcommand == "jump_fwd":
+ self.mass.create_task(self.mass.players.queues.next, queue.queue_id)
+ return
+ if subcommand == "jump_rew":
+ self.mass.create_task(self.mass.players.queues.previous, queue.queue_id)
+ return
+ if subcommand == "fwd":
+ self.mass.create_task(self.mass.players.queues.skip, queue.queue_id, 10)
+ return
+ if subcommand == "rew":
+ self.mass.create_task(self.mass.players.queues.skip, queue.queue_id, -10)
+ return
+ if subcommand == "shuffle":
+ self.mass.players.queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
+ return
+ if subcommand == "repeat":
+ if queue.repeat_mode == RepeatMode.ALL:
+ new_repeat_mode = RepeatMode.OFF
+ elif queue.repeat_mode == RepeatMode.OFF:
+ new_repeat_mode = RepeatMode.ONE
+ else:
+ new_repeat_mode = RepeatMode.ALL
+ self.mass.players.queues.set_repeat(queue.queue_id, new_repeat_mode)
+ return
+ if subcommand.startswith("preset_"):
+ preset_index = subcommand.split("preset_")[1].split(".")[0]
+ if preset_uri := self.mass.config.get_raw_player_config_value(
+ player_id, f"preset_{preset_index}"
+ ):
+ option = QueueOption.REPLACE if "playlist" in preset_uri else QueueOption.PLAY
+ self.mass.create_task(
+ self.mass.players.queues.play_media, queue.queue_id, preset_uri, option
+ )
+ return
+
+ self.logger.warning(
+ "No handler for button/%s (player: %s - args: %s - kwargs: %s)",
+ subcommand,
+ player_id,
+ str(args),
+ str(kwargs),
+ )
+
+ async def _handle_menu(
+ self,
+ player_id: str,
+ offset: int = 0,
+ limit: int = 10,
+ **kwargs,
+ ) -> dict[str, Any]:
+ """Handle menu request from CLI."""
+ menu_items = []
+ # we keep it simple for now and only add the presets to the 'My Music' menu
+ for preset_id, media_item in await self._get_preset_items(player_id):
+ menu_items.append(
+ {
+ **media_item,
+ "id": f"preset_{preset_id}",
+ "node": "myMusic",
+ # prefer short title in menu structure
+ "text": media_item["track"],
+ "homeMenuText": media_item["text"],
+ "weight": 80,
+ }
+ )
+ # add basic queue settings such as shuffle and repeat
+ menu_items += [
+ {
+ "node": "settings",
+ "isANode": 1,
+ "id": "settingsAudio",
+ "text": "Audio",
+ "weight": 35,
+ },
+ {
+ "selectedIndex": 1,
+ "actions": {
+ "do": {
+ "choices": [
+ {"player": 0, "cmd": ["playlist", "repeat", "0"]},
+ {"player": 0, "cmd": ["playlist", "repeat", "1"]},
+ {"player": 0, "cmd": ["playlist", "repeat", "2"]},
+ ]
+ }
+ },
+ "choiceStrings": ["Off", "Song", "Playlist"],
+ "id": "settingsRepeat",
+ "node": "settings",
+ "text": "Repeat",
+ "weight": 20,
+ },
+ {
+ "actions": {
+ "do": {
+ "choices": [
+ {"cmd": ["playlist", "shuffle", "0"], "player": 0},
+ {"cmd": ["playlist", "shuffle", "1"], "player": 0},
+ ]
+ }
+ },
+ "choiceStrings": ["Off", "On"],
+ "selectedIndex": 1,
+ "id": "settingsShuffle",
+ "node": "settings",
+ "weight": 10,
+ "text": "Shuffle",
+ },
+ {
+ "actions": {
+ "do": {
+ "choices": [
+ {"cmd": ["playlist", "crossfade", "0"], "player": 0},
+ {"cmd": ["playlist", "crossfade", "1"], "player": 0},
+ ]
+ }
+ },
+ "choiceStrings": ["Off", "On"],
+ "selectedIndex": 1,
+ "iconStyle": "hm_settingsAudio",
+ "id": "settingsXfade",
+ "node": "settings",
+ "weight": 10,
+ "text": "Crossfade",
+ },
+ ]
+ return {
+ "item_loop": menu_items[offset:limit],
+ "offset": offset,
+ "count": len(menu_items[offset:limit]),
+ }
+
+ async def _handle_browselibrary(
+ self,
+ player_id: str,
+ subcommand: str,
+ offset: int = 0,
+ limit: int = 10,
+ mode: str = "playlists",
+ *args,
+ **kwargs,
+ ) -> dict[str, Any]:
+ """Handle menustatus request from CLI."""
+ if mode == "albumartists":
+ items = (
+ await self.mass.music.artists.album_artists(True, limit=limit, offset=offset)
+ ).items
+ elif mode == "artists":
+ items = (await self.mass.music.artists.db_items(True, limit=limit, offset=offset)).items
+ elif mode == "artist" and "uri" in kwargs:
+ artist = await self.mass.music.get_item_by_uri(kwargs["uri"])
+ items = await self.mass.music.artists.tracks(artist.item_id, artist.provider)
+ elif mode == "albums":
+ items = (await self.mass.music.albums.db_items(True, limit=limit, offset=offset)).items
+ elif mode == "album" and "uri" in kwargs:
+ album = await self.mass.music.get_item_by_uri(kwargs["uri"])
+ items = await self.mass.music.albums.tracks(album.item_id, album.provider)
+ elif mode == "playlists":
+ items = (
+ await self.mass.music.playlists.db_items(True, limit=limit, offset=offset)
+ ).items
+ elif mode == "radios":
+ items = (await self.mass.music.radio.db_items(True, limit=limit, offset=offset)).items
+ elif mode == "playlist" and "uri" in kwargs:
+ playlist = await self.mass.music.get_item_by_uri(kwargs["uri"])
+ items = [
+ x
+ async for x in self.mass.music.playlists.tracks(playlist.item_id, playlist.provider)
+ ]
+ else:
+ items = []
+ return {
+ "base": {
+ "actions": {
+ "go": {
+ "params": {"menu": 1, "mode": "playlisttracks"},
+ "itemsParams": "commonParams",
+ "player": 0,
+ "cmd": ["browselibrary", "items"],
+ },
+ "add": {
+ "player": 0,
+ "itemsParams": "commonParams",
+ "params": {"menu": 1, "cmd": "add"},
+ "cmd": ["playlistcontrol"],
+ },
+ "more": {
+ "player": 0,
+ "itemsParams": "commonParams",
+ "params": {"menu": 1, "cmd": "add"},
+ "cmd": ["playlistcontrol"],
+ },
+ "play": {
+ "cmd": ["playlistcontrol"],
+ "itemsParams": "commonParams",
+ "params": {"menu": 1, "cmd": "play"},
+ "player": 0,
+ "nextWindow": "nowPlaying",
+ },
+ "play-hold": {
+ "cmd": ["playlistcontrol"],
+ "itemsParams": "commonParams",
+ "params": {"menu": 1, "cmd": "load"},
+ "player": 0,
+ "nextWindow": "nowPlaying",
+ },
+ "add-hold": {
+ "itemsParams": "commonParams",
+ "params": {"menu": 1, "cmd": "insert"},
+ "player": 0,
+ "cmd": ["playlistcontrol"],
+ },
+ }
+ },
+ "window": {"windowStyle": "icon_list"},
+ "item_loop": [
+ {
+ **get_media_details_from_mass(self.mass, item),
+ "presetParams": {
+ "favorites_title": item.name,
+ "favorites_url": item.uri,
+ "favorites_type": item.media_type.value,
+ "icon": self.mass.metadata.get_image_url(item.image, 256)
+ if item.image
+ else "",
+ },
+ "textkey": item.name[0].upper(),
+ "commonParams": {
+ "uri": item.uri,
+ "noEdit": 1,
+ f"{item.media_type.value}_id": item.item_id,
+ },
+ }
+ for item in items
+ ],
+ "offset": offset,
+ "count": len(items),
+ }
+
+ def _handle_menustatus(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> dict[str, Any]:
+ """Handle menustatus request from CLI."""
+ return None
+
+ def _handle_displaystatus(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> dict[str, Any]:
+ """Handle displaystatus request from CLI."""
+ return None
+
+ def _handle_date(
+ self,
+ player_id: str,
+ *args,
+ **kwargs,
+ ) -> dict[str, Any]:
+ """Handle date request from CLI."""
+ return {"date_epoch": int(time.time()), "date": "0000-00-00T00:00:00+00:00"}
+
+ async def _on_mass_event(self, event: MassEvent) -> None:
+ """Handle incoming Mass Event."""
+ player_id = event.object_id
+ if not player_id:
+ return
+ for client in self._cometd_clients.values():
+ if sub := client.slim_subscriptions.get(f"/{client.client_id}/slim/serverstatus"):
+ await client.queue.put(
+ {
+ "channel": sub["data"]["response"],
+ "id": sub["id"],
+ "data": await self._handle_serverstatus(player_id),
+ }
+ )
+ if sub := client.slim_subscriptions.get(
+ f"/{client.client_id}/slim/playerstatus/{player_id}"
+ ):
+ await client.queue.put(
+ {
+ "channel": sub["data"]["response"],
+ "id": sub["id"],
+ "data": await self._handle_status(player_id),
+ }
+ )
+
+ async def _do_periodic(self) -> None:
+ """Execute periodic sending of state and cleanup."""
+ while True:
+ # cleanup orphaned clients
+ disconnected_clients = set()
+ for cometd_client in self._cometd_clients.values():
+ if (time.time() - cometd_client.last_seen) > 80:
+ disconnected_clients.add(cometd_client.client_id)
+ continue
+ for clientid in disconnected_clients:
+ client = self._cometd_clients.pop(clientid)
+ empty_queue(client.queue)
+ self.logger.debug("Cleaned up disconnected CometD Client: %s", clientid)
+ # handle client subscriptions
+ for cometd_client in self._cometd_clients.values():
+ for sub in cometd_client.slim_subscriptions.values():
+ self._handle_cometd_request(cometd_client, sub)
+
+ await asyncio.sleep(60)
+
+ async def _get_preset_items(self, player_id: str) -> list[tuple[int, SlimMediaItem]]:
+ """Return all presets for a player."""
+ preset_items: list[tuple[int, MediaItemType]] = []
+ for preset_index in range(1, 100):
+ if preset_conf := self.mass.config.get_raw_player_config_value(
+ player_id, f"preset_{preset_index}"
+ ):
+ with contextlib.suppress(MusicAssistantError):
+ media_item = await self.mass.music.get_item_by_uri(preset_conf)
+ slim_media_item = get_media_details_from_mass(self.mass, media_item)
+ preset_items.append((preset_index, slim_media_item))
+ else:
+ break
+ return preset_items
+
+
+def dict_to_strings(source: dict) -> list[str]:
+ """Convert dict to key:value strings (used in slimproto cli)."""
+ result: list[str] = []
+
+ for key, value in source.items():
+ if value in (None, ""):
+ continue
+ if isinstance(value, list):
+ for subval in value:
+ if isinstance(subval, dict):
+ result += dict_to_strings(subval)
+ else:
+ result.append(str(subval))
+ elif isinstance(value, dict):
+ result += dict_to_strings(subval)
+ else:
+ result.append(f"{key}:{str(value)}")
+ return result
"name": "Slimproto",
"description": "Support for slimproto based players (e.g. squeezebox, squeezelite).",
"codeowners": ["@music-assistant"],
- "requirements": ["aioslimproto==2.2.0"],
+ "requirements": ["aioslimproto==2.3.0"],
"documentation": "https://github.com/music-assistant/hass-music-assistant/discussions/1123",
"multi_instance": false,
"builtin": false,
- "load_by_default": true,
- "depends_on": "lms_cli"
+ "load_by_default": true
}
--- /dev/null
+"""Models used for the JSON-RPC API."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, TypedDict
+
+from music_assistant.common.models.enums import MediaType, PlayerState, RepeatMode
+from music_assistant.common.models.media_items import MediaItemType
+
+if TYPE_CHECKING:
+ from music_assistant.common.models.player import Player
+ from music_assistant.common.models.player_queue import PlayerQueue
+ from music_assistant.common.models.queue_item import QueueItem
+ from music_assistant.server import MusicAssistant
+
+# ruff: noqa: UP013
+
+PLAYMODE_MAP = {
+ PlayerState.IDLE: "stop",
+ PlayerState.PLAYING: "play",
+ PlayerState.OFF: "stop",
+ PlayerState.PAUSED: "pause",
+}
+
+REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
+
+
+class CommandMessage(TypedDict):
+ """Representation of Base JSON RPC Command Message."""
+
+ # https://www.jsonrpc.org/specification
+
+ id: int | str
+ method: str
+ params: tuple[str, list[str | int]]
+
+
+class CommandResultMessage(CommandMessage):
+ """Representation of JSON RPC Result Message."""
+
+ result: Any
+
+
+class ErrorDetails(TypedDict):
+ """Representation of JSON RPC ErrorDetails."""
+
+ code: int
+ message: str
+
+
+class CommandErrorMessage(CommandMessage, TypedDict):
+ """Base Representation of JSON RPC Command Message."""
+
+ id: int | str | None
+ error: ErrorDetails
+
+
+class CometDResponse(TypedDict):
+ """CometD Response Message."""
+
+ channel: str
+ id: str
+ data: dict[str, Any]
+
+
+class SlimSubscribeData(CometDResponse):
+ """CometD SlimSubscribe Data."""
+
+ response: str # e.g. '/slim/serverstatus', the channel all messages should be sent back on
+ request: tuple[str, list[str | int]] # [ '', [ 'serverstatus', 0, 50, 'subscribe:60' ]
+ priority: int # # optional priority value, is passed-through with the response
+
+
+class SlimSubscribeMessage(CometDResponse):
+ """CometD SlimSubscribe Message."""
+
+ channel: str
+ id: str
+ data: SlimSubscribeData
+
+
+PlayerItem = TypedDict(
+ "PlayerItem",
+ {
+ "playerindex": str,
+ "playerid": str,
+ "name": str,
+ "modelname": str,
+ "connected": int,
+ "isplaying": int,
+ "power": int,
+ "model": str,
+ "canpoweroff": int,
+ "firmware": str,
+ "isplayer": int,
+ "displaytype": str,
+ "uuid": str | None,
+ "seq_no": str,
+ "ip": str,
+ },
+)
+
+
+def player_item_from_mass(playerindex: int, player: Player) -> PlayerItem:
+ """Parse PlayerItem for the Json RPC interface from MA QueueItem."""
+ return {
+ "playerindex": str(playerindex),
+ "playerid": player.player_id,
+ "name": player.display_name,
+ "modelname": player.device_info.model,
+ "connected": int(player.available),
+ "isplaying": 1 if player.state == PlayerState.PLAYING else 0,
+ "power": int(player.powered),
+ "model": player.provider,
+ "canpoweroff": 1,
+ "firmware": "unknown",
+ "isplayer": 1,
+ "displaytype": "none",
+ "uuid": player.extra_data.get("uuid"),
+ "seq_no": str(player.extra_data.get("seq_no", 0)),
+ "ip": player.device_info.address,
+ }
+
+
+PlayersResponse = TypedDict(
+ "PlayersResponse",
+ {
+ "count": int,
+ "players_loop": list[PlayerItem],
+ },
+)
+
+
+PlaylistItem = TypedDict(
+ "PlaylistItem",
+ {
+ "playlist index": int,
+ "id": str,
+ "title": str,
+ "artist": str,
+ "remote": int,
+ "remote_title": str,
+ "artwork_url": str,
+ "bitrate": str,
+ "duration": str | int | None,
+ "coverid": str,
+ "params": dict,
+ },
+)
+
+
+def playlist_item_from_mass(
+ mass: MusicAssistant, queue_item: QueueItem, index: int = 0, is_cur_index: bool = False
+) -> PlaylistItem:
+ """Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
+ if queue_item.media_item:
+ # media item
+ media_details = get_media_details_from_mass(mass, queue_item.media_item)
+ else:
+ # fallback/generic queue item
+ media_details = {
+ "text": queue_item.name,
+ "style": "itemplay",
+ "trackType": "radio",
+ "icon": mass.metadata.get_image_url(queue_item.image, 512) if queue_item.image else "",
+ "params": {
+ "playlist_index": index,
+ "item_id": queue_item.queue_item_id,
+ "uri": queue_item.uri,
+ },
+ }
+ if (
+ is_cur_index
+ and queue_item.streamdetails
+ and queue_item.streamdetails.stream_title
+ and " - " in queue_item.streamdetails.stream_title
+ ):
+ # radio with remote stream title present
+ # artist and title parsed from stream title
+ artist, track = queue_item.streamdetails.stream_title.split(" - ")
+ media_details["artist"] = artist
+ media_details["track"] = track
+ media_details["album"] = queue_item.name
+ media_details["text"] = f"{track}\n{artist} - {queue_item.name}"
+ # remove default item actions
+ media_details.pop("actions")
+ media_details["params"]["playlist_index"] = index
+ return media_details
+
+
+class SlimMediaItem(TypedDict):
+ """Representation of MediaItem details."""
+
+ style: str
+ track: str
+ album: str
+ trackType: str # noqa: N815
+ icon: str
+ artist: str
+ text: str
+ params: dict
+ type: str
+ actions: dict
+
+
+def get_media_details_from_mass(mass: MusicAssistant, media_item: MediaItemType) -> SlimMediaItem:
+ """Get media item details formatted to display on Squeezebox hardware."""
+ if media_item.media_type == MediaType.TRACK:
+ # track with all metadata
+ artist = media_item.artists[0].name if media_item.artists else ""
+ album = media_item.album.name if media_item.album else ""
+ title = media_item.name
+ text = f"{title}\n{artist} - {album}" if album else f"{title}\n{artist}"
+ elif media_item.media_type == MediaType.ALBUM:
+ # album with all metadata
+ artist = media_item.artists[0].name if media_item.artists else ""
+ title = media_item.name
+ text = f"{title}\n{artist}" if artist else f"{title}\nalbum"
+ elif media_item and media_item.metadata.description:
+ # (radio) item with description field
+ album = media_item.metadata.description
+ artist = ""
+ title = media_item.name
+ text = f"{media_item.metadata.description}\n{media_item.name}"
+ else:
+ title = media_item.name
+ artist = ""
+ album = media_item.media_type.value
+ text = f"{title}\n{album}"
+ image_url = mass.metadata.get_image_url(media_item.image, 512) if media_item.image else ""
+ if media_item.media_type in (MediaType.TRACK, MediaType.RADIO):
+ go_action = {
+ "cmd": ["playlistcontrol"],
+ "itemsParams": "commonParams",
+ "params": {"uri": media_item.uri, "cmd": "play"},
+ "player": 0,
+ "nextWindow": "nowPlaying",
+ }
+ else:
+ go_action = {
+ "params": {
+ "uri": media_item.uri,
+ "mode": media_item.media_type.value,
+ },
+ "itemsParams": "commonParams",
+ "player": 0,
+ "cmd": ["browselibrary", "items"],
+ }
+ details = SlimMediaItem(
+ track=title,
+ album=album,
+ trackType="radio",
+ icon=image_url,
+ artist=artist,
+ text=text,
+ params={"item_id": media_item.item_id, "uri": media_item.uri},
+ type=media_item.media_type.value,
+ actions={
+ "go": go_action,
+ "add": {
+ "player": 0,
+ "itemsParams": "commonParams",
+ "params": {"uri": media_item.uri, "cmd": "add"},
+ "cmd": ["playlistcontrol"],
+ "nextWindow": "refresh",
+ },
+ "more": {
+ "player": 0,
+ "itemsParams": "commonParams",
+ "params": {"uri": media_item.uri, "cmd": "add"},
+ "cmd": ["playlistcontrol"],
+ "nextWindow": "refresh",
+ },
+ "play": {
+ "cmd": ["playlistcontrol"],
+ "itemsParams": "commonParams",
+ "params": {
+ "uri": media_item.uri,
+ "cmd": "load" if media_item.media_type == MediaType.PLAYLIST else "play",
+ },
+ "player": 0,
+ "nextWindow": "nowPlaying",
+ },
+ "play-hold": {
+ "cmd": ["playlistcontrol"],
+ "itemsParams": "commonParams",
+ "params": {"uri": media_item.uri, "cmd": "load"},
+ "player": 0,
+ "nextWindow": "nowPlaying",
+ },
+ "add-hold": {
+ "itemsParams": "commonParams",
+ "params": {"uri": media_item.uri, "cmd": "insert"},
+ "player": 0,
+ "cmd": ["playlistcontrol"],
+ "nextWindow": "refresh",
+ },
+ },
+ )
+ if media_item.media_type in (MediaType.TRACK, MediaType.RADIO):
+ details["style"] = "itemplay"
+ details["nextWindow"] = "nowPlaying"
+ return details
+
+
+PlayerStatusResponse = TypedDict(
+ "PlayerStatusResponse",
+ {
+ "time": int,
+ "mode": str,
+ "sync_slaves": str,
+ "playlist_cur_index": int | None,
+ "player_name": str,
+ "sync_master": str,
+ "player_connected": int,
+ "power": int,
+ "mixer volume": int,
+ "playlist repeat": int,
+ "playlist shuffle": int,
+ "playlist mode": str,
+ "player_ip": str,
+ "remoteMeta": dict | None,
+ "digital_volume_control": int,
+ "playlist_timestamp": float,
+ "current_title": str,
+ "duration": int,
+ "seq_no": int,
+ "remote": int,
+ "can_seek": int,
+ "signalstrength": int,
+ "rate": int,
+ "playlist_tracks": int,
+ "item_loop": list[PlaylistItem],
+ "uuid": str,
+ },
+)
+
+
+def player_status_from_mass(
+ mass: MusicAssistant,
+ player: Player,
+ queue: PlayerQueue,
+ queue_items: list[QueueItem],
+ offset: int | str,
+ presets: list[tuple[int, SlimMediaItem]],
+) -> PlayerStatusResponse:
+ """Parse PlayerStatusResponse for the Json RPC interface from MA info."""
+ if queue.current_item:
+ cur_item = playlist_item_from_mass(mass, queue.current_item, queue.current_index, True)
+ remote_meta = {
+ **cur_item,
+ "id": cur_item["params"]["item_id"],
+ "title": cur_item["text"],
+ "artwork_url": cur_item["icon"],
+ "coverid": cur_item["params"]["item_id"],
+ "remote": 1,
+ }
+ else:
+ remote_meta = None
+ # handle preset data
+ preset_data: list[dict] = []
+ preset_loop: list[int] = []
+ for _, media_item in presets:
+ preset_data.append(
+ {
+ "URL": media_item["params"]["uri"],
+ "text": media_item["track"],
+ "type": "audio",
+ }
+ )
+ preset_loop.append(1)
+ while len(preset_loop) < 10:
+ preset_data.append({})
+ preset_loop.append(0)
+ return {
+ "alarm_next": 0,
+ "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
+ "signalstrength": 0,
+ "remoteMeta": remote_meta,
+ "rate": 1,
+ "player_name": player.display_name,
+ "preset_loop": preset_loop,
+ "mode": PLAYMODE_MAP[queue.state],
+ "playlist_cur_index": queue.current_index,
+ "playlist shuffle": int(queue.shuffle_enabled),
+ "time": queue.elapsed_time,
+ "alarm_version": 2,
+ "mixer volume": player.volume_level,
+ "player_connected": int(player.available),
+ "sync_slaves": ",".join(player.group_childs),
+ "playlist_tracks": queue.items,
+ # "count": queue.items,
+ # some players have trouble grabbing a very large list so limit it for now
+ "count": len(queue_items),
+ "base": {"actions": {}},
+ "seq_no": player.extra_data.get("seq_no", 0),
+ "player_ip": player.device_info.address,
+ "alarm_state": "none",
+ "duration": queue.current_item.duration if queue.current_item else 0,
+ "alarm_snooze_seconds": 540,
+ "digital_volume_control": 1,
+ "power": int(player.powered),
+ "playlist_timestamp": queue.elapsed_time_last_updated,
+ "offset": offset,
+ "can_seek": 1,
+ "alarm_timeout_seconds": 3600,
+ "current_title": None,
+ "remote": 1,
+ "preset_data": preset_data,
+ "playlist mode": "off",
+ "item_loop": [
+ playlist_item_from_mass(
+ mass,
+ item,
+ queue.current_index + index,
+ queue.current_index == (queue.current_index + index),
+ )
+ for index, item in enumerate(queue_items)
+ ],
+ }
+
+
+ServerStatusResponse = TypedDict(
+ "ServerStatusMessage",
+ {
+ "ip": str,
+ "httpport": str,
+ "version": str,
+ "uuid": str,
+ "info total genres": int,
+ "sn player count": int,
+ "lastscan": str,
+ "info total duration": int,
+ "info total albums": int,
+ "info total songs": int,
+ "info total artists": int,
+ "players_loop": list[PlayerItem],
+ "player count": int,
+ "other player count": int,
+ "other_players_loop": list[PlayerItem],
+ },
+)
from soco.events_base import SubscriptionBase
from soco.groups import ZoneGroup
-from music_assistant.common.models.config_entries import (
- CONF_ENTRY_OUTPUT_CODEC,
- ConfigEntry,
- ConfigValueType,
-)
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import (
ContentType,
MediaType,
from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_OUTPUT_CODEC, CONF_PLAYERS
+from music_assistant.constants import CONF_PLAYERS
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
)
-PLAYER_CONFIG_ENTRIES = (CONF_ENTRY_OUTPUT_CODEC,)
async def setup(
for player in self.sonosplayers.values():
player.soco.end_direct_control_session
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: # noqa: ARG002
- """Return all (provider/player specific) Config Entries for the given player (if any)."""
- return PLAYER_CONFIG_ENTRIES
-
def on_player_config_changed(
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
await asyncio.to_thread(sonos_player.soco.stop)
await asyncio.to_thread(sonos_player.soco.clear_queue)
- output_codec = self.mass.config.get_player_config_value(player_id, CONF_OUTPUT_CODEC)
radio_mode = (
flow_mode or not queue_item.duration or queue_item.media_type == MediaType.RADIO
)
player_id=sonos_player.player_id,
seek_position=seek_position,
fade_in=fade_in,
- content_type=ContentType.MP3 if radio_mode else ContentType(output_codec),
flow_mode=flow_mode,
+ output_codec=ContentType.MP3 if radio_mode else None,
)
if radio_mode:
sonos_player.radio_mode_started = time.time()
await asyncio.to_thread(set_crossfade)
# send queue item to sonos queue
- output_codec = self.mass.config.get_player_config_value(
- sonos_player.player_id, CONF_OUTPUT_CODEC
- )
is_radio = next_item.media_type != MediaType.TRACK
url = await self.mass.streams.resolve_stream_url(
queue_item=next_item,
player_id=sonos_player.player_id,
- content_type=ContentType.MP3 if is_radio else ContentType(output_codec),
# Sonos pre-caches pretty aggressively so do not yet start the runner
auto_start_runner=False,
+ output_codec=ContentType.MP3 if is_radio else None,
)
await self._enqueue_item(sonos_player, queue_item=next_item, url=url)
return
self.mass.players.remove(self.instance_id)
- def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
return (
CONF_ENTRY_HIDE_GROUP_MEMBERS,
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
- group_power_on = self.mass.config.get_player_config_value(player_id, CONF_GROUPED_POWER_ON)
+ group_power_on = await self.mass.config.get_player_config_value(
+ player_id, CONF_GROUPED_POWER_ON
+ )
async def set_child_power(child_player: Player) -> None:
await self.mass.players.cmd_power(child_player.player_id, powered)
aiofiles==23.1.0
aiohttp==3.8.4
aiorun==2022.11.1
-aioslimproto==2.2.0
+aioslimproto==2.3.0
aiosqlite==0.19.0
async-upnp-client==0.33.2
asyncio-throttle==1.0.2