CONF_HIDE_PLAYER,
CONF_LOG_LEVEL,
CONF_OUTPUT_CHANNELS,
+ CONF_SYNC_ADJUST,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
SECURE_STRING_SUBSTITUTE,
"Only enable when needed. Saves some bandwidth at the cost of audio quality.",
advanced=True,
)
+
+CONF_ENTRY_SYNC_ADJUST = ConfigEntry(
+ key=CONF_SYNC_ADJUST,
+ type=ConfigEntryType.INTEGER,
+ range=(-500, 500),
+ default_value=0,
+ label="Audio synchronization delay correction",
+ description="If this player is playing audio synced with other players "
+ "and you always hear the audio too early or late on this player, "
+ "you can shift the audio a bit.",
+ advanced=True,
+)
def __post_init__(self):
"""Execute actions after init."""
- if not self.output_format_str:
+ if not self.output_format_str and self.content_type.is_pcm():
+ self.output_format_str = (
+ f"pcm;codec=pcm;rate={self.sample_rate};"
+ f"bitrate={self.bit_depth};channels={self.channels}"
+ )
+ elif not self.output_format_str:
self.output_format_str = self.content_type.value
@property
CONF_GROUP_MEMBERS: Final[str] = "group_members"
CONF_HIDE_PLAYER: Final[str] = "hide_player"
CONF_ENFORCE_MP3: Final[str] = "enforce_mp3"
+CONF_SYNC_ADJUST: Final[str] = "sync_adjust"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
) -> str:
"""Resolve the stream URL for the given QueueItem."""
fmt = output_codec.value
- # handle raw pcm
- if output_codec.is_pcm():
- msg = "PCM is not possible as output format"
- raise RuntimeError(msg)
+ # handle raw pcm without exact format specifiers
+ if output_codec.is_pcm() and ";" not in fmt:
+ fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
query_params = {}
base_path = "flow" if flow_mode else "single"
url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_SYNC_ADJUST,
ConfigEntry,
ConfigValueType,
)
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
+from music_assistant.constants import CONF_SYNC_ADJUST
from music_assistant.server.helpers.process import check_output
from music_assistant.server.models.player_provider import PlayerProvider
CONF_ENCRYPTION = "encryption"
CONF_ALAC_ENCODE = "alac_encode"
CONF_VOLUME_START = "volume_start"
-CONF_SYNC_ADJUST = "sync_adjust"
CONF_PASSWORD = "password"
+
+
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
"(lossless) ALAC at the cost of a bit CPU.",
advanced=True,
),
- ConfigEntry(
- key=CONF_SYNC_ADJUST,
- type=ConfigEntryType.INTEGER,
- range=(-500, 500),
- default_value=0,
- label="Audio synchronization delay correction",
- description="If this player is playing audio synced with other players "
- "and you always hear the audio too early or late on this player, "
- "you can shift the audio a bit.",
- advanced=True,
- ),
+ CONF_ENTRY_SYNC_ADJUST,
ConfigEntry(
key=CONF_PASSWORD,
- type=ConfigEntryType.STRING,
+ type=ConfigEntryType.SECURE_STRING,
default_value=None,
required=False,
label="Device password",
from __future__ import annotations
import asyncio
+import logging
import statistics
import time
from collections import deque
+from collections.abc import Iterator
from contextlib import suppress
from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
from aioslimproto.client import PlayerState as SlimPlayerState
from aioslimproto.client import SlimClient
from aioslimproto.client import TransitionType as SlimTransition
-from aioslimproto.const import EventType as SlimEventType
-from aioslimproto.discovery import start_discovery
+from aioslimproto.models import EventType as SlimEventType
+from aioslimproto.models import Preset as SlimPreset
+from aioslimproto.models import VisualisationType as SlimVisualisationType
+from aioslimproto.server import SlimServer
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_ENFORCE_MP3,
CONF_ENTRY_EQ_BASS,
CONF_ENTRY_EQ_MID,
CONF_ENTRY_EQ_TREBLE,
CONF_ENTRY_OUTPUT_CHANNELS,
+ CONF_ENTRY_SYNC_ADJUST,
ConfigEntry,
ConfigValueOption,
ConfigValueType,
+ PlayerConfig,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
PlayerState,
PlayerType,
ProviderFeature,
+ RepeatMode,
)
-from music_assistant.common.models.errors import QueueEmpty, SetupFailedError
+from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_PORT
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_CROSSFADE_DURATION,
+ CONF_ENFORCE_MP3,
+ CONF_PORT,
+ CONF_SYNC_ADJUST,
+ MASS_LOGO_ONLINE,
+)
from music_assistant.server.models.player_provider import PlayerProvider
-from .cli import LmsCli
-
if TYPE_CHECKING:
- from collections.abc import Callable, Coroutine
+ from aioslimproto.models import SlimEvent
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server.models import ProviderInstanceType
-# monkey patch the SlimClient
-SlimClient._process_stat_stmf = lambda x, y: None # noqa: ARG005
-
CACHE_KEY_PREV_STATE = "slimproto_prev_state"
-# sync constants
-MIN_DEVIATION_ADJUST = 6 # 6 milliseconds
-MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
-
-# TODO: Implement display support
STATE_MAP = {
SlimPlayerState.BUFFERING: PlayerState.PLAYING,
SlimPlayerState.PLAYING: PlayerState.PLAYING,
SlimPlayerState.STOPPED: PlayerState.IDLE,
}
+REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
+
+# sync constants
+MIN_DEVIATION_ADJUST = 6 # 6 milliseconds
+MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
+MAX_SKIP_AHEAD_MS = 1500 # 1.5 seconds
@dataclass
diff: int
-CONF_SYNC_ADJUST = "sync_adjust"
CONF_CLI_TELNET = "cli_telnet"
CONF_CLI_JSON = "cli_json"
CONF_DISCOVERY = "discovery"
+CONF_DISPLAY = "display"
+CONF_VISUALIZATION = "visualization"
+
DEFAULT_PLAYER_VOLUME = 20
DEFAULT_SLIMPROTO_PORT = 3483
+DEFAULT_VISUALIZATION = SlimVisualisationType.SPECTRUM_ANALYZER.value
+
-CONF_ENTRY_CROSSFADE_DURATION = ConfigEntry(
- key=CONF_CROSSFADE_DURATION,
- type=ConfigEntryType.INTEGER,
- range=(1, 10),
- default_value=8,
- label="Crossfade duration",
- description="Duration in seconds of the crossfade between tracks (if enabled)",
+CONF_ENTRY_DISPLAY = ConfigEntry(
+ key=CONF_DISPLAY,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=True,
+ required=False,
+ label="Enable display support",
+ description="Enable/disable native display support on " "squeezebox or squeezelite32 hardware.",
advanced=True,
)
+CONF_ENTRY_VISUALIZATION = ConfigEntry(
+ key=CONF_VISUALIZATION,
+ type=ConfigEntryType.STRING,
+ default_value=DEFAULT_VISUALIZATION,
+ options=tuple(
+ ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value)
+ for x in SlimVisualisationType
+ ),
+ required=False,
+ label="Visualization type",
+ description="The type of visualization to show on the display "
+ "during playback if the device supports this.",
+ advanced=True,
+ depends_on=CONF_DISPLAY,
+)
async def setup(
"""
# ruff: noqa: ARG001
return (
- ConfigEntry(
- key=CONF_PORT,
- type=ConfigEntryType.INTEGER,
- default_value=DEFAULT_SLIMPROTO_PORT,
- label="Slimproto port",
- description="The TCP/UDP port to run the slimproto sockets server. "
- "The default is 3483 and using a different port is not supported by "
- "hardware squeezebox players. Only adjust this port if you want to "
- "use other slimproto based servers side by side with software players, "
- "such as squeezelite.\n\n"
- "NOTE that the Airplay provider in MA (which relies on slimproto), does not seem "
- "to support a different slimproto port.",
- advanced=True,
- ),
ConfigEntry(
key=CONF_CLI_TELNET,
type=ConfigEntryType.BOOLEAN,
default_value=True,
label="Enable classic Squeezebox Telnet CLI",
description="Some slimproto based players require the presence of the telnet CLI "
- " to request more information. For example the Airplay provider "
- "(which relies on slimproto) uses this to fetch the album cover and other metadata."
+ " to request more information. "
"By default this Telnet CLI is hosted on port 9090 but another port will be chosen if "
"that port is already taken. \n\n"
"Commands allowed on this interface are very limited and just enough to satisfy "
"on your network and/or you don't want clients to auto connect to this server.",
advanced=True,
),
+ ConfigEntry(
+ key=CONF_PORT,
+ type=ConfigEntryType.INTEGER,
+ default_value=DEFAULT_SLIMPROTO_PORT,
+ label="Slimproto port",
+ description="The TCP/UDP port to run the slimproto sockets server. "
+ "The default is 3483 and using a different port is not supported by "
+ "hardware squeezebox players. Only adjust this port if you want to "
+ "use other slimproto based servers side by side with (squeezelite) software players.",
+ advanced=True,
+ ),
)
class SlimprotoProvider(PlayerProvider):
"""Base/builtin provider for players using the SLIM protocol (aka slimproto)."""
- _socket_servers: list[asyncio.Server | asyncio.BaseTransport]
- _socket_clients: dict[str, SlimClient]
+ slimproto: SlimServer
_sync_playpoints: dict[str, deque[SyncPlayPoint]]
- _virtual_providers: dict[str, tuple[Coroutine, Callable]]
_do_not_resync_before: dict[str, float]
- _cli: LmsCli
- port: int = DEFAULT_SLIMPROTO_PORT
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self._socket_clients = {}
self._sync_playpoints = {}
- self._virtual_providers = {}
self._do_not_resync_before = {}
- self.port = self.config.get_value(CONF_PORT)
self._resync_handle: asyncio.TimerHandle | None = None
- # start slimproto socket server
- try:
- self._socket_servers = [
- await asyncio.start_server(self._create_client, "0.0.0.0", self.port)
- ]
- self.logger.info("Started SLIMProto server on port %s", self.port)
- except OSError:
- msg = f"Unable to start the Slimproto server - is port {self.port} already taken ?"
- raise SetupFailedError(msg)
-
- # start CLI interface(s)
+ control_port = self.config.get_value(CONF_PORT)
enable_telnet = self.config.get_value(CONF_CLI_TELNET)
enable_json = self.config.get_value(CONF_CLI_JSON)
- if enable_json or enable_telnet:
- self._cli = LmsCli(self, enable_telnet, enable_json)
- await self._cli.setup()
-
- # start discovery
- if self.config.get_value(CONF_DISCOVERY):
- self._socket_servers.append(
- await start_discovery(
- self.mass.streams.publish_ip,
- self.port,
- self._cli.cli_port if enable_telnet else None,
- self.mass.streams.publish_port if enable_json else None,
- "Music Assistant",
- self.mass.server_id,
- )
- )
+ logging.getLogger("aioslimproto").setLevel(self.logger.level)
+ self.slimproto = SlimServer(
+ cli_port=0 if enable_telnet else None,
+ cli_port_json=0 if enable_json else None,
+ ip_address=self.mass.streams.publish_ip,
+ name="Music Assistant",
+ control_port=control_port,
+ )
+ # start slimproto socket server
+ try:
+ await self.slimproto.start()
+ except OSError as err:
+ msg = f"Unable to start the Slimproto server - is port {control_port} already taken ?"
+ raise SetupFailedError(msg) from err
+
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ self.slimproto.subscribe(self._client_callback)
async def unload(self) -> None:
"""Handle close/cleanup of the provider."""
- if getattr(self, "_virtual_providers", None):
- msg = "Virtual providers loaded"
- raise RuntimeError(msg)
- if hasattr(self, "_socket_clients"):
- for client in list(self._socket_clients.values()):
- with suppress(RuntimeError):
- # this may fail due to a race condition sometimes
- client.disconnect()
- self._socket_clients = {}
- if hasattr(self, "_socket_servers"):
- for _server in self._socket_servers:
- _server.close()
- if hasattr(self, "_cli"):
- await self._cli.unload()
- self._socket_servers = []
-
- async def _create_client(
- self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
- ) -> None:
- """Create player from new connection on the socket."""
- if self.mass.closing:
- return
- addr = writer.get_extra_info("peername")
- self.logger.debug("Socket client connected: %s", addr)
-
- def client_callback(
- event_type: SlimEventType,
- client: SlimClient,
- data: Any = None,
- ) -> None:
- if event_type == SlimEventType.PLAYER_DISCONNECTED:
- self.mass.create_task(self._handle_disconnected(client))
- return
-
- if event_type == SlimEventType.PLAYER_CONNECTED:
- self.mass.create_task(self._handle_connected(client))
- return
-
- if event_type == SlimEventType.PLAYER_DECODER_READY:
- self.mass.create_task(self._handle_decoder_ready(client))
- return
-
- if event_type == SlimEventType.PLAYER_BUFFER_READY:
- self.mass.create_task(self._handle_buffer_ready(client))
- return
-
- if event_type == SlimEventType.PLAYER_OUTPUT_UNDERRUN:
- # player ran out of buffer
- self.mass.create_task(self._handle_output_underrun(client))
- return
-
- if event_type == SlimEventType.PLAYER_HEARTBEAT:
- self._handle_player_heartbeat(client)
- return
-
- # forward player update to MA player controller
- self.mass.create_task(self._handle_player_update(client))
-
- # construct SlimClient from socket client
- SlimClient(reader, writer, client_callback)
+ await self.slimproto.stop()
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
base_entries = await super().get_player_config_entries(player_id)
- if not (client := self._socket_clients.get(player_id)):
- return base_entries
# create preset entries (for players that support it)
preset_entries = ()
- if client.device_model not in self._virtual_providers:
- presets = []
- async for playlist in self.mass.music.playlists.iter_library_items(True):
- presets.append(ConfigValueOption(playlist.name, playlist.uri))
- async for radio in self.mass.music.radio.iter_library_items(True):
- presets.append(ConfigValueOption(radio.name, radio.uri))
- # dynamically extend the amount of presets when needed
- if self.mass.config.get_raw_player_config_value(player_id, "preset_15"):
- preset_count = 20
- elif self.mass.config.get_raw_player_config_value(player_id, "preset_10"):
- preset_count = 15
- elif self.mass.config.get_raw_player_config_value(player_id, "preset_5"):
- preset_count = 10
- else:
- preset_count = 5
- preset_entries = tuple(
- ConfigEntry(
- key=f"preset_{index}",
- type=ConfigEntryType.STRING,
- options=presets,
- label=f"Preset {index}",
- description="Assign a playable item to the player's preset. "
- "Only supported on real squeezebox hardware or jive(lite) based emulators.",
- advanced=False,
- required=False,
- )
- for index in range(1, preset_count + 1)
+ presets = []
+ async for playlist in self.mass.music.playlists.iter_library_items(True):
+ presets.append(ConfigValueOption(playlist.name, playlist.uri))
+ async for radio in self.mass.music.radio.iter_library_items(True):
+ presets.append(ConfigValueOption(radio.name, radio.uri))
+ preset_count = 10
+ preset_entries = tuple(
+ ConfigEntry(
+ key=f"preset_{index}",
+ type=ConfigEntryType.STRING,
+ options=presets,
+ label=f"Preset {index}",
+ description="Assign a playable item to the player's preset. "
+ "Only supported on real squeezebox hardware or jive(lite) based emulators.",
+ advanced=False,
+ required=False,
)
+ for index in range(1, preset_count + 1)
+ )
return (
base_entries
CONF_ENTRY_EQ_TREBLE,
CONF_ENTRY_OUTPUT_CHANNELS,
CONF_ENTRY_CROSSFADE_DURATION,
- ConfigEntry(
- key=CONF_SYNC_ADJUST,
- type=ConfigEntryType.INTEGER,
- range=(0, 1500),
- default_value=0,
- label="Audio synchronization delay correction",
- description="If this player is playing audio synced with other players "
- "and you always hear the audio too late on this player, "
- "you can shift the audio a bit.",
- advanced=True,
- ),
+ CONF_ENTRY_ENFORCE_MP3,
+ CONF_ENTRY_SYNC_ADJUST,
+ CONF_ENTRY_DISPLAY,
+ CONF_ENTRY_VISUALIZATION,
)
)
+ def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ """Call (by config manager) when the configuration of a player changes."""
+ super().on_player_config_changed(config, changed_keys)
+
+ if slimplayer := self.slimproto.get_player(config.player_id):
+ self.mass.create_task(self._set_preset_items(slimplayer))
+ self.mass.create_task(self._set_display(slimplayer))
+
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
# forward command to player and any connected sync members
- for client in self._get_sync_clients(player_id):
- if client.state == SlimPlayerState.STOPPED:
- continue
- await client.stop()
- # workaround: some players do not send an event when playback stopped
- client._process_stat_stmu(b"")
+ async with asyncio.TaskGroup() as tg:
+ for slimplayer in self._get_sync_clients(player_id):
+ tg.create_task(slimplayer.stop())
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
# forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
- for client in self._get_sync_clients(player_id):
- if client.state not in (
- SlimPlayerState.PAUSED,
- SlimPlayerState.BUFFERING,
- SlimPlayerState.BUFFER_READY,
- ):
- continue
- tg.create_task(client.play())
+ for slimplayer in self._get_sync_clients(player_id):
+ tg.create_task(slimplayer.play())
async def play_media(
self,
if player.synced_to:
msg = "A synced player cannot receive play commands directly"
raise RuntimeError(msg)
- # stop any existing streams first
- await self.cmd_stop(player_id)
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
if player.group_childs:
- # player has sync members, we need to start a multi client stream job
+ # player has sync members, we need to start a multi slimplayer stream job
stream_job = await self.mass.streams.create_multi_client_stream_job(
queue_id=queue_item.queue_id,
start_queue_item=queue_item,
# forward command to player and any connected sync members
sync_clients = self._get_sync_clients(player_id)
async with asyncio.TaskGroup() as tg:
- for client in sync_clients:
+ for slimplayer in sync_clients:
tg.create_task(
self._handle_play_url(
- client,
+ slimplayer,
url=stream_job.resolve_stream_url(
- client.player_id, output_codec=ContentType.FLAC
+ slimplayer.player_id,
+ output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
),
queue_item=None,
send_flush=True,
)
else:
# regular, single player playback
- client = self._socket_clients[player_id]
+ slimplayer = self.slimproto.get_player(player_id)
+ if not slimplayer:
+ return
url = await self.mass.streams.resolve_stream_url(
queue_item=queue_item,
# for now just hardcode flac as we assume that every (modern)
# slimproto based player can handle that just fine
- output_codec=ContentType.FLAC,
+ output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
seek_position=seek_position,
fade_in=fade_in,
flow_mode=False,
)
await self._handle_play_url(
- client,
+ slimplayer,
url=url,
queue_item=queue_item,
send_flush=True,
This is a special feature from the Universal Group provider.
"""
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
# fix race condition where resync and play media are called at more or less the same time
if self._resync_handle:
self._resync_handle.cancel()
# forward command to player and any connected sync members
sync_clients = self._get_sync_clients(player_id)
async with asyncio.TaskGroup() as tg:
- for client in sync_clients:
+ for slimplayer in sync_clients:
tg.create_task(
self._handle_play_url(
- client,
+ slimplayer,
url=stream_job.resolve_stream_url(
- client.player_id, output_codec=ContentType.FLAC
+ slimplayer.player_id,
+ output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
),
queue_item=None,
send_flush=True,
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
- # we don't have to do anything,
- # enqueuing the next item is handled in the buffer ready callback
+ if not (slimplayer := self.slimproto.get_player(player_id)):
+ return
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
+ flow_mode=False,
+ )
+ await self._handle_play_url(
+ slimplayer,
+ url=url,
+ queue_item=queue_item,
+ enqueue=True,
+ send_flush=False,
+ auto_play=True,
+ )
async def _handle_play_url(
self,
- client: SlimClient,
+ slimplayer: SlimClient,
url: str,
queue_item: QueueItem | None,
+ enqueue: bool = False,
send_flush: bool = True,
auto_play: bool = False,
) -> None:
"""Handle playback of an url on slimproto player(s)."""
- player_id = client.player_id
+ player_id = slimplayer.player_id
if crossfade := await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE):
transition_duration = await self.mass.config.get_player_config_value(
player_id, CONF_CROSSFADE_DURATION
else:
transition_duration = 0
- await client.play_url(
+ if queue_item and queue_item.media_item:
+ album = getattr(queue_item.media_item, "album", None)
+ metadata = {
+ "item_id": queue_item.queue_item_id,
+ "title": queue_item.media_item.name,
+ "album": album.name if album else "",
+ "artist": getattr(queue_item.media_item, "artist_str", "Music Assistant"),
+ "image_url": self.mass.metadata.get_image_url(
+ queue_item.image,
+ size=512,
+ prefer_proxy=True,
+ )
+ if queue_item.image
+ else MASS_LOGO_ONLINE,
+ "duration": queue_item.duration,
+ }
+ elif queue_item:
+ metadata = {
+ "item_id": queue_item.queue_item_id,
+ "title": queue_item.name,
+ "artist": "Music Assistant",
+ "image_url": self.mass.metadata.get_image_url(
+ queue_item.image,
+ size=512,
+ prefer_proxy=True,
+ )
+ if queue_item.image
+ else MASS_LOGO_ONLINE,
+ "duration": queue_item.duration,
+ }
+ else:
+ metadata = {
+ "item_id": "flow",
+ "title": "Music Assistant",
+ "image_url": MASS_LOGO_ONLINE,
+ }
+ queue = self.mass.player_queues.get(queue_item.queue_id if queue_item else player_id)
+ slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
+ slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
+ # slimplayer.extra_data["can_seek"] = 1 if queue_item else 0
+ await slimplayer.play_url(
url=url,
mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
- metadata=(
- {"item_id": queue_item.queue_item_id, "title": queue_item.name}
- if queue_item
- else {"item_id": client.player_id, "title": "Music Assistant"}
- ),
+ metadata=metadata,
+ enqueue=enqueue,
send_flush=send_flush,
transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
transition_duration=transition_duration,
"""Send PAUSE command to given player."""
# forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
- for client in self._get_sync_clients(player_id):
- if client.state not in (
- SlimPlayerState.PLAYING,
- SlimPlayerState.BUFFERING,
- SlimPlayerState.BUFFER_READY,
- ):
- continue
- tg.create_task(client.pause())
+ for slimplayer in self._get_sync_clients(player_id):
+ tg.create_task(slimplayer.pause())
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
- if client := self._socket_clients.get(player_id):
- await client.power(powered)
+ if slimplayer := self.slimproto.get_player(player_id):
+ await slimplayer.power(powered)
# store last state in cache
await self.mass.cache.set(
- f"{CACHE_KEY_PREV_STATE}.{player_id}", (powered, client.volume_level)
+ f"{CACHE_KEY_PREV_STATE}.{player_id}", (powered, slimplayer.volume_level)
)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
- if client := self._socket_clients.get(player_id):
- await client.volume_set(volume_level)
+ if slimplayer := self.slimproto.get_player(player_id):
+ await slimplayer.volume_set(volume_level)
# store last state in cache
await self.mass.cache.set(
- f"{CACHE_KEY_PREV_STATE}.{player_id}", (client.powered, volume_level)
+ f"{CACHE_KEY_PREV_STATE}.{player_id}", (slimplayer.powered, volume_level)
)
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
+ if slimplayer := self.slimproto.get_player(player_id):
+ await slimplayer.mute(muted)
+
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player."""
child_player = self.mass.players.get(player_id)
# check if we should (re)start or join a stream session
active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
if active_queue.state == PlayerState.PLAYING:
- # playback needs to be restarted to form a new multi client stream session
+ # playback needs to be restarted to form a new multi slimplayer stream session
def resync() -> None:
self._resync_handle = None
self.mass.create_task(
self.mass.players.update(child_player.player_id)
self.mass.players.update(parent_player.player_id)
- def register_virtual_provider(
+ def _client_callback(
self,
- player_model: str,
- register_callback: Coroutine,
- update_callback: Callable,
+ event: SlimEvent,
) -> None:
- """Register a virtual provider based on slimproto, such as the airplay bridge."""
- self._virtual_providers[player_model] = (
- register_callback,
- update_callback,
- )
+ if self.mass.closing:
+ return
- def unregister_virtual_provider(
- self,
- player_model: str,
- ) -> None:
- """Unregister a virtual provider."""
- self._virtual_providers.pop(player_model, None)
+ if event.type == SlimEventType.PLAYER_DISCONNECTED:
+ if mass_player := self.mass.players.get(event.player_id):
+ mass_player.available = False
+ self.mass.players.update(mass_player.player_id)
+ return
+
+ if not (slimplayer := self.slimproto.get_player(event.player_id)):
+ return
- async def _handle_player_update(self, client: SlimClient) -> None:
+ if event.type == SlimEventType.PLAYER_CONNECTED:
+ self.mass.create_task(self._handle_connected(slimplayer))
+ return
+
+ if event.type == SlimEventType.PLAYER_BUFFER_READY:
+ self.mass.create_task(self._handle_buffer_ready(slimplayer))
+ return
+
+ if event.type == SlimEventType.PLAYER_HEARTBEAT:
+ self._handle_player_heartbeat(slimplayer)
+ return
+
+ if event.type in (SlimEventType.PLAYER_BTN_EVENT, SlimEventType.PLAYER_CLI_EVENT):
+ self.mass.create_task(self._handle_player_cli_event(slimplayer, event))
+ return
+
+ # forward player update to MA player controller
+ self.mass.create_task(self._handle_player_update(slimplayer))
+
+ async def _handle_player_update(self, slimplayer: SlimClient) -> None:
"""Process SlimClient update/add to Player controller."""
- player_id = client.player_id
- virtual_provider_info = self._virtual_providers.get(client.device_model)
+ player_id = slimplayer.player_id
player = self.mass.players.get(player_id, raise_unavailable=False)
if not player:
# player does not yet exist, create it
player_id=player_id,
provider=self.instance_id,
type=PlayerType.PLAYER,
- name=client.name,
+ name=slimplayer.name,
available=True,
- powered=client.powered,
+ powered=slimplayer.powered,
device_info=DeviceInfo(
- model=client.device_model,
- address=client.device_address,
- manufacturer=client.device_type,
+ model=slimplayer.device_model,
+ address=slimplayer.device_address,
+ manufacturer=slimplayer.device_type,
),
supported_features=(
PlayerFeature.POWER,
PlayerFeature.SYNC,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.ENQUEUE_NEXT,
+ ),
+ max_sample_rate=int(slimplayer.max_sample_rate),
+ supports_24bit=int(slimplayer.max_sample_rate) > 44100,
+ can_sync_with=tuple(
+ x.player_id for x in self.slimproto.players if x.player_id != player_id
),
- max_sample_rate=int(client.max_sample_rate),
- supports_24bit=int(client.max_sample_rate) > 44100,
)
- if virtual_provider_info:
- # if this player is part of a virtual provider run the callback
- await virtual_provider_info[0](player)
self.mass.players.register_or_update(player)
# update player state on player events
player.available = True
player.current_item_id = (
- client.current_metadata.get("item_id")
- if client.current_metadata
- else client.current_url
+ slimplayer.current_media.metadata.get("item_id")
+ if slimplayer.current_media and slimplayer.current_media.metadata
+ else slimplayer.current_url
)
player.active_source = player.player_id
- player.name = client.name
- player.powered = client.powered
- player.state = STATE_MAP[client.state]
- player.volume_level = client.volume_level
- # set all existing player ids in `can_sync_with` field
- player.can_sync_with = tuple(
- x.player_id for x in self._socket_clients.values() if x.player_id != player_id
- )
- if virtual_provider_info:
- # if this player is part of a virtual provider run the callback
- virtual_provider_info[1](player)
+ player.name = slimplayer.name
+ player.powered = slimplayer.powered
+ player.state = STATE_MAP[slimplayer.state]
+ player.volume_level = slimplayer.volume_level
+ player.volume_muted = slimplayer.muted
self.mass.players.update(player_id)
- def _handle_player_heartbeat(self, client: SlimClient) -> None:
+ def _handle_player_heartbeat(self, slimplayer: SlimClient) -> None:
"""Process SlimClient elapsed_time update."""
- if client.state == SlimPlayerState.STOPPED:
+ if slimplayer.state == SlimPlayerState.STOPPED:
# ignore server heartbeats when stopped
return
# elapsed time change on the player will be auto picked up
# by the player manager.
- player = self.mass.players.get(client.player_id)
- player.elapsed_time = client.elapsed_seconds
+ player = self.mass.players.get(slimplayer.player_id)
+ player.elapsed_time = slimplayer.elapsed_seconds
player.elapsed_time_last_updated = time.time()
# handle sync
if player.synced_to:
- self._handle_client_sync(client)
-
- async def _handle_output_underrun(self, client: SlimClient) -> None:
- """Process SlimClient Output Underrun Event."""
- player = self.mass.players.get(client.player_id)
- self.logger.error("Player %s ran out of buffer", player.display_name)
- player.state = PlayerState.IDLE
- self.mass.players.update(client.player_id)
-
- def _handle_client_sync(self, client: SlimClient) -> None:
- """Synchronize audio of a sync client."""
- player = self.mass.players.get(client.player_id)
+ self._handle_client_sync(slimplayer)
+
+ async def _handle_player_cli_event(self, slimplayer: SlimClient, event: SlimEvent) -> None:
+ """Process CLI Event."""
+ if not event.data:
+ return
+ queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
+ if event.data.startswith("button preset_") and event.data.endswith(".single"):
+ preset_id = event.data.split("preset_")[1].split(".")[0]
+ preset_index = int(preset_id) - 1
+ if len(slimplayer.presets) >= preset_index + 1:
+ preset = slimplayer.presets[preset_index]
+ await self.mass.player_queues.play_media(queue.queue_id, preset.uri)
+ elif event.data == "button repeat":
+ if queue.repeat_mode == RepeatMode.OFF:
+ repeat_mode = RepeatMode.ONE
+ elif queue.repeat_mode == RepeatMode.ONE:
+ repeat_mode = RepeatMode.ALL
+ else:
+ repeat_mode = RepeatMode.OFF
+ self.mass.player_queues.set_repeat(queue.queue_id, repeat_mode)
+ slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
+ slimplayer.signal_update()
+ elif event.data == "button shuffle":
+ self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
+ slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
+ slimplayer.signal_update()
+ elif event.data.startswith("time "):
+ # seek request
+ _, param = event.data.split(" ", 1)
+ if param.isnumeric():
+ await self.mass.player_queues.seek(queue.queue_id, int(param))
+ self.logger.debug("CLI Event: %s", event.data)
+
+ def _handle_client_sync(self, slimplayer: SlimClient) -> None:
+ """Synchronize audio of a sync slimplayer."""
+ player = self.mass.players.get(slimplayer.player_id)
sync_master_id = player.synced_to
if not sync_master_id:
# we only correct sync members, not the sync master itself
return
- if sync_master_id not in self._socket_clients:
+ if not (sync_master := self.slimproto.get_player(sync_master_id)):
return # just here as a guard as bad things can happen
- sync_master = self._socket_clients[sync_master_id]
-
if sync_master.state != SlimPlayerState.PLAYING:
return
- if client.state != SlimPlayerState.PLAYING:
+ if slimplayer.state != SlimPlayerState.PLAYING:
return
- if backoff_time := self._do_not_resync_before.get(client.player_id):
+ if backoff_time := self._do_not_resync_before.get(slimplayer.player_id):
# player has set a timestamp we should backoff from syncing it
if time.time() < backoff_time:
return
# we collect a few playpoints of the player to determine
# average lag/drift so we can adjust accordingly
sync_playpoints = self._sync_playpoints.setdefault(
- client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
+ slimplayer.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
)
- active_queue = self.mass.player_queues.get_active_queue(client.player_id)
+ active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
if not stream_job:
# should not happen, but just in case
diff = int(
self._get_corrected_elapsed_milliseconds(sync_master)
- - self._get_corrected_elapsed_milliseconds(client)
+ - self._get_corrected_elapsed_milliseconds(slimplayer)
)
# we can now append the current playpoint to our list
# get the average diff
avg_diff = statistics.fmean(x.diff for x in sync_playpoints)
- delta = abs(avg_diff)
+ delta = int(abs(avg_diff))
if delta < MIN_DEVIATION_ADJUST:
return
# resync the player by skipping ahead or pause for x amount of (milli)seconds
sync_playpoints.clear()
- if avg_diff > 0:
+ self._do_not_resync_before[slimplayer.player_id] = time.time() + (delta / 1000) + 2
+ if avg_diff > MAX_SKIP_AHEAD_MS:
+ # player lagging behind more than MAX_SKIP_AHEAD_MS,
+ # we need to correct the sync_master
+ self.logger.warning(
+ "%s is lagging behind more than %s milliseconds!",
+ player.display_name,
+ MAX_SKIP_AHEAD_MS,
+ )
+ self.mass.create_task(sync_master.pause_for(delta))
+ elif avg_diff > 0:
# handle player lagging behind, fix with skip_ahead
self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
- self._do_not_resync_before[client.player_id] = time.time() + 2
- self.mass.create_task(self._skip_over(client.player_id, delta))
+ self.mass.create_task(slimplayer.skip_over(delta))
else:
# handle player is drifting too far ahead, use pause_for to adjust
self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
- self._do_not_resync_before[client.player_id] = time.time() + (delta / 1000) + 2
- self.mass.create_task(self._pause_for(client.player_id, delta))
+ self.mass.create_task(slimplayer.pause_for(delta))
- async def _handle_decoder_ready(self, client: SlimClient) -> None:
- """Handle decoder ready event, player is ready for the next track."""
- if not client.current_metadata:
- return
- player = self.mass.players.get(client.player_id)
- if player.synced_to:
- # handled by sync master
- return
- if client.state == SlimPlayerState.STOPPED:
- return
- if player.active_source != player.player_id:
- return
- with suppress(QueueEmpty):
- next_item = await self.mass.player_queues.preload_next_item(client.player_id)
- url = await self.mass.streams.resolve_stream_url(
- queue_item=next_item,
- output_codec=ContentType.FLAC,
- flow_mode=False,
- )
- await self._handle_play_url(
- client,
- url=url,
- queue_item=next_item,
- send_flush=False,
- auto_play=True,
- )
+ async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
+ """Handle buffer ready event, player has buffered a (new) track.
- async def _handle_buffer_ready(self, client: SlimClient) -> None:
- """Handle buffer ready event, player has buffered a (new) track."""
- player = self.mass.players.get(client.player_id)
+ Only used when autoplay=0 for coordinated start of synced players.
+ """
+ player = self.mass.players.get(slimplayer.player_id)
if player.synced_to:
# unpause of sync child is handled by sync master
return
if not player.group_childs:
# not a sync group, continue
- await client.play()
+ await slimplayer.play()
return
count = 0
while count < 40:
# all child's ready (or timeout) - start play
async with asyncio.TaskGroup() as tg:
for _client in self._get_sync_clients(player.player_id):
- timestamp = _client.jiffies + 20
+ timestamp = _client.jiffies + 200
sync_delay = self.mass.config.get_raw_player_config_value(
_client.player_id, CONF_SYNC_ADJUST, 0
)
timestamp -= sync_delay
self._do_not_resync_before[_client.player_id] = time.time() + 1
- tg.create_task(client.send_strm(b"u", replay_gain=int(timestamp)))
-
- async def _handle_connected(self, client: SlimClient) -> None:
- """Handle a client connected event."""
- player_id = client.player_id
- self.logger.debug("Player %s connected", client.name or player_id)
- if existing := self._socket_clients.pop(player_id, None):
- # race condition: new socket client connected while
- # the old one has not yet been cleaned up
- self.logger.warning(
- "Player %s connected while previous session still existing!",
- client.name or player_id,
- )
- with suppress(RuntimeError):
- existing.disconnect()
-
- self._socket_clients[player_id] = client
+ tg.create_task(_client.unpause_at(int(timestamp)))
+
+ async def _handle_connected(self, slimplayer: SlimClient) -> None:
+ """Handle a slimplayer connected event."""
+ player_id = slimplayer.player_id
+ self.logger.info("Player %s connected", slimplayer.name or player_id)
+ # set presets and display
+ await self._set_preset_items(slimplayer)
+ await self._set_display(slimplayer)
# update all attributes
- await self._handle_player_update(client)
+ await self._handle_player_update(slimplayer)
# update existing players so they can update their `can_sync_with` field
- for item in list(self._socket_clients.values()):
- if item.player_id == player_id:
- continue
- await self._handle_player_update(item)
+ for _player in self.players:
+ _player.can_sync_with = tuple(
+ x.player_id for x in self.slimproto.players if x.player_id != _player.player_id
+ )
+ self.mass.players.update(_player.player_id)
# restore volume and power state
if last_state := await self.mass.cache.get(f"{CACHE_KEY_PREV_STATE}.{player_id}"):
init_power = last_state[0]
else:
init_volume = DEFAULT_PLAYER_VOLUME
init_power = False
- await client.power(init_power)
- await client.volume_set(init_volume)
+ await slimplayer.power(init_power)
+ await slimplayer.volume_set(init_volume)
- async def _handle_disconnected(self, client: SlimClient) -> None:
- """Handle a client disconnected event."""
- if self.mass.closing:
- return
- player_id = client.player_id
- if client := self._socket_clients.pop(player_id, None):
- # store last state in cache
- await self.mass.cache.set(
- f"{CACHE_KEY_PREV_STATE}.{player_id}",
- (client.powered, client.volume_level),
- )
- self.logger.info(
- "Player %s disconnected",
- client.name or player_id,
- )
- if player := self.mass.players.get(player_id):
- player.available = False
- self.mass.players.update(player_id)
-
- async def _pause_for(self, client_id: str, millis: int) -> None:
- """Handle pause for x amount of time to help with syncing."""
- client = self._socket_clients[client_id]
- # https://wiki.slimdevices.com/index.php/SlimProto_TCP_protocol.html#u.2C_p.2C_a_.26_t_commands_and_replay_gain_field§
- await client.send_strm(b"p", replay_gain=int(millis))
-
- async def _skip_over(self, client_id: str, millis: int) -> None:
- """Handle skip for x amount of time to help with syncing."""
- client = self._socket_clients[client_id]
- # https://wiki.slimdevices.com/index.php/SlimProto_TCP_protocol.html#u.2C_p.2C_a_.26_t_commands_and_replay_gain_field
- await client.send_strm(b"a", replay_gain=int(millis))
-
- def _get_sync_clients(self, player_id: str) -> list[SlimClient]:
+ def _get_sync_clients(self, player_id: str) -> Iterator[SlimClient]:
"""Get all sync clients for a player."""
player = self.mass.players.get(player_id)
- sync_clients: list[SlimClient] = []
# we need to return the player itself too
group_child_ids = {player_id}
group_child_ids.update(player.group_childs)
for child_id in group_child_ids:
- if client := self._socket_clients.get(child_id):
- sync_clients.append(client)
- return sync_clients
+ if slimplayer := self.slimproto.get_player(child_id):
+ yield slimplayer
- def _get_corrected_elapsed_milliseconds(self, client: SlimClient) -> int:
+ def _get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
"""Return corrected elapsed milliseconds."""
sync_delay = self.mass.config.get_raw_player_config_value(
- client.player_id, CONF_SYNC_ADJUST, 0
+ slimplayer.player_id, CONF_SYNC_ADJUST, 0
+ )
+ return slimplayer.elapsed_milliseconds - sync_delay
+
+ async def _set_preset_items(self, slimplayer: SlimClient) -> None:
+ """Set the presets for a player."""
+ preset_items: list[SlimPreset] = []
+ for preset_index in range(1, 11):
+ if preset_conf := self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, f"preset_{preset_index}"
+ ):
+ try:
+ media_item = await self.mass.music.get_item_by_uri(preset_conf)
+ preset_items.append(
+ SlimPreset(
+ uri=media_item.uri,
+ text=media_item.name,
+ icon=self.mass.metadata.get_image_url(media_item.image),
+ )
+ )
+ except MusicAssistantError:
+ # non-existing media item or some other edge case
+ preset_items.append(
+ SlimPreset(
+ uri=f"preset_{preset_index}",
+ text=f"ERROR <preset {preset_index}>",
+ icon="",
+ )
+ )
+ else:
+ break
+ slimplayer.presets = preset_items
+
+ async def _set_display(self, slimplayer: SlimClient) -> None:
+ """Set the display config for a player."""
+ display_enabled = self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, CONF_DISPLAY, True
+ )
+ visualization = self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, CONF_VISUALIZATION, DEFAULT_VISUALIZATION
+ )
+ await slimplayer.configure_display(
+ visualisation=SlimVisualisationType(visualization), disabled=not display_enabled
)
- current_millis = client.elapsed_milliseconds
- if sync_delay != 0:
- return current_millis - sync_delay
- return current_millis
+++ /dev/null
-"""
-CLI interface which is more or less compatible with Logitech Media Server.
-
-Implemented protocols: CometD, Telnet and JSON-RPC.
-
-NOTE: This only implements the bare minimum to have functional players.
-Output is adjusted to conform to Music Assistant logic or just for simplification.
-Goal is player compatibility, not API compatibility.
-Users that need more, should just stay with a full blown LMS server.
-"""
-
-from __future__ import annotations
-
-import asyncio
-import contextlib
-import time
-import urllib.parse
-from dataclasses import dataclass, field
-from typing import TYPE_CHECKING, Any
-
-import shortuuid
-from aiohttp import web
-
-from music_assistant.common.helpers.json import json_dumps, json_loads
-from music_assistant.common.helpers.util import empty_queue, select_free_port
-from music_assistant.common.models.enums import (
- EventType,
- PlayerState,
- QueueOption,
- RepeatMode,
-)
-from music_assistant.common.models.errors import MusicAssistantError
-
-from .models import (
- PLAYMODE_MAP,
- REPEATMODE_MAP,
- CometDResponse,
- CommandErrorMessage,
- CommandMessage,
- CommandResultMessage,
- PlayerItem,
- PlayersResponse,
- PlayerStatusResponse,
- ServerStatusResponse,
- SlimMenuItem,
- SlimSubscribeMessage,
- menu_item_from_media_item,
- menu_item_from_queue_item,
- player_item_from_mass,
- playlist_item_from_mass,
-)
-
-if TYPE_CHECKING:
- from collections.abc import Callable
-
- from music_assistant.common.models.config_entries import (
- ConfigEntry,
- ConfigValueType,
- )
- from music_assistant.common.models.event import MassEvent
- from music_assistant.common.models.media_items import MediaItemType
- from music_assistant.common.models.queue_item import QueueItem
- from music_assistant.server import MusicAssistant
-
- from . import SlimprotoProvider
-
-
-# ruff: noqa: ARG002, E501, ERA001
-# pylint: disable=keyword-arg-before-vararg
-
-ArgsType = list[int | str]
-KwargsType = dict[str, Any]
-
-
-@dataclass
-class CometDClient:
- """Representation of a connected CometD client."""
-
- client_id: str
- player_id: str = ""
- queue: asyncio.Queue[CometDResponse] = field(default_factory=asyncio.Queue)
- last_seen: int = int(time.time())
- first_event: CometDResponse | None = None
- meta_subscriptions: set[str] = field(default_factory=set)
- slim_subscriptions: dict[str, SlimSubscribeMessage] = field(default_factory=dict)
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- # ruff: noqa: ARG001
- return () # we do not have any config entries (yet)
-
-
-def parse_value(raw_value: int | str) -> int | str | tuple[str, int | str]:
- """
- Transform API param into a usable value.
-
- Integer values are sometimes sent as string so we try to parse that.
- """
- if isinstance(raw_value, str):
- if ":" in raw_value:
- # this is a key:value value
- key, val = raw_value.split(":", 1)
- if val.isnumeric():
- val = int(val)
- return (key, val)
- if raw_value.isnumeric():
- # this is an integer sent as string
- return int(raw_value)
- return raw_value
-
-
-def parse_args(raw_values: list[int | str]) -> tuple[ArgsType, KwargsType]:
- """Pargse Args and Kwargs from raw CLI params."""
- args: ArgsType = []
- kwargs: KwargsType = {}
- for raw_value in raw_values:
- value = parse_value(raw_value)
- if isinstance(value, tuple):
- kwargs[value[0]] = value[1]
- else:
- args.append(value)
- return (args, kwargs)
-
-
-class LmsCli:
- """Basic LMS CLI (json rpc and telnet) implementation, (partly) compatible with Logitech Media Server."""
-
- cli_port: int = 9090
- _unsub_callback: Callable | None = None
- _periodic_task: asyncio.Task | None = None
-
- def __init__(
- self, slimproto: SlimprotoProvider, enable_telnet: bool, enable_json: bool
- ) -> None:
- """Initialize."""
- self.slimproto = slimproto
- self.enable_telnet = enable_telnet
- self.enable_json = enable_json
- self.logger = self.slimproto.logger.getChild("cli")
- self.mass = self.slimproto.mass
- self._cometd_clients: dict[str, CometDClient] = {}
-
- async def setup(self) -> None:
- """Handle async initialization of the plugin."""
- if self.enable_json:
- self.logger.info("Registering jsonrpc endpoints on the webserver")
- self.mass.streams.register_dynamic_route("/jsonrpc.js", self._handle_jsonrpc)
- self.mass.streams.register_dynamic_route("/cometd", self._handle_cometd)
- self._unsub_callback = self.mass.subscribe(
- self._on_mass_event,
- (EventType.PLAYER_UPDATED, EventType.QUEUE_UPDATED),
- )
- self._periodic_task = self.mass.create_task(self._do_periodic())
- if self.enable_telnet:
- self.cli_port = await select_free_port(9090, 9190)
- self.logger.info("Starting (telnet) CLI on port %s", self.cli_port)
- await asyncio.start_server(self._handle_cli_client, "0.0.0.0", self.cli_port)
-
- async def unload(self) -> None:
- """
- Handle unload/close of the provider.
-
- Called when provider is deregistered (e.g. MA exiting or config reloading).
- """
- self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
- self.mass.streams.unregister_dynamic_route("/cometd")
- if self._unsub_callback:
- self._unsub_callback()
- self._unsub_callback = None
- if self._periodic_task:
- self._periodic_task.cancel()
- self._periodic_task = None
-
- async def _handle_cli_client(
- self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
- ) -> None:
- """Handle new connection on the legacy CLI."""
- # https://raw.githubusercontent.com/Logitech/slimserver/public/7.8/HTML/EN/html/docs/cli-api.html
- # https://github.com/elParaguayo/LMS-CLI-Documentation/blob/master/LMS-CLI.md
- self.logger.debug("Client connected on Telnet CLI")
- try:
- while True:
- raw_request = await reader.readline()
- raw_request = raw_request.strip().decode("utf-8")
- if not raw_request:
- break
- # request comes in as url encoded strings, separated by space
- raw_params = [urllib.parse.unquote(x) for x in raw_request.split(" ")]
- # the first param is either a macaddress or a command
- if ":" in raw_params[0]:
- # assume this is a mac address (=player_id)
- player_id = raw_params[0]
- command = raw_params[1]
- command_params = raw_params[2:]
- else:
- player_id = ""
- command = raw_params[0]
- command_params = raw_params[1:]
-
- args, kwargs = parse_args(command_params)
-
- response: str = raw_request
-
- # check if we have a handler for this command
- # note that we only have support for very limited commands
- # just enough for compatibility with players but not to be used as api
- # with 3rd party tools!
- if handler := getattr(self, f"_handle_{command}", None):
- self.logger.debug(
- "Handling CLI-request (player: %s command: %s - args: %s - kwargs: %s)",
- player_id,
- command,
- str(args),
- str(kwargs),
- )
- cmd_result: list[str] = handler(player_id, *args, **kwargs)
- if asyncio.iscoroutine(cmd_result):
- cmd_result = await cmd_result
-
- if isinstance(cmd_result, dict):
- result_parts = dict_to_strings(cmd_result)
- result_str = " ".join(urllib.parse.quote(x) for x in result_parts)
- elif not cmd_result:
- result_str = ""
- else:
- result_str = str(cmd_result)
- response += " " + result_str
- else:
- self.logger.warning(
- "No handler for %s (player: %s - args: %s - kwargs: %s)",
- command,
- player_id,
- str(args),
- str(kwargs),
- )
- # echo back the request and the result (if any)
- response += "\n"
- writer.write(response.encode("utf-8"))
- await writer.drain()
- except ConnectionResetError:
- pass
- except Exception as err:
- self.logger.debug("Error handling CLI command", exc_info=err)
- finally:
- self.logger.debug("Client disconnected from Telnet CLI")
-
- async def _handle_jsonrpc(self, request: web.Request) -> web.Response:
- """Handle request on JSON-RPC endpoint."""
- command_msg: CommandMessage = await request.json(loads=json_loads)
- self.logger.debug("Received request: %s", command_msg)
- cmd_result = await self._handle_request(command_msg["params"])
- if cmd_result is None:
- result: CommandErrorMessage = {
- **command_msg,
- "error": {"code": -1, "message": "Invalid command"},
- }
- else:
- result: CommandResultMessage = {
- **command_msg,
- "result": cmd_result,
- }
- # return the response to the client
- return web.json_response(result, dumps=json_dumps)
-
- async def _handle_cometd(self, request: web.Request) -> web.Response:
- """
- Handle CometD request on the json CLI.
-
- https://github.com/Logitech/slimserver/blob/public/8.4/Slim/Web/Cometd.pm
- """
- logger = self.logger.getChild("cometd")
- # ruff: noqa: PLR0915
- clientid: str = ""
- response = []
- streaming = False
- json_msg: list[dict[str, Any]] = await request.json()
- # cometd message is an array of commands/messages
- for cometd_msg in json_msg:
- channel = cometd_msg.get("channel")
- # try to figure out clientid
- if not clientid:
- clientid = cometd_msg.get("clientId")
- if not clientid and channel == "/meta/handshake":
- # generate new clientid
- clientid = shortuuid.uuid()
- self._cometd_clients[clientid] = CometDClient(
- client_id=clientid,
- )
- elif not clientid and channel in ("/slim/subscribe", "/slim/request"):
- # pull clientId out of response channel
- clientid = cometd_msg["data"]["response"].split("/")[1]
- elif not clientid and channel == "/slim/unsubscribe":
- # pull clientId out of unsubscribe
- clientid = cometd_msg["data"]["unsubscribe"].split("/")[1]
- assert clientid, "No clientID provided"
- logger.debug("Incoming message for channel '%s' - clientid: %s", channel, clientid)
-
- # messageid is optional but if provided we must pass it along
- msgid = cometd_msg.get("id", "")
-
- if clientid not in self._cometd_clients:
- # If a client sends any request and we do not have a valid clid record
- # because the streaming connection has been lost for example, re-handshake them
- return web.json_response(
- [
- {
- "id": msgid,
- "channel": channel,
- "clientId": None,
- "successful": False,
- "timestamp": time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime()),
- "error": "invalid clientId",
- "advice": {
- "reconnect": "handshake",
- "interval": 0,
- },
- }
- ]
- )
-
- # get the cometd_client object for the clientid
- cometd_client = self._cometd_clients[clientid]
- cometd_client.last_seen = int(time.time())
-
- if channel == "/meta/handshake":
- # handshake message
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "version": "1.0",
- "supportedConnectionTypes": ["long-polling", "streaming"],
- "clientId": clientid,
- "successful": True,
- "advice": {
- "reconnect": "retry", # one of "none", "retry", "handshake"
- "interval": 0, # initial interval is 0 to support long-polling's connect request
- "timeout": 60000,
- },
- }
- )
- # playerid (mac) and uuid belonging to the client is sent in the ext field
- if player_id := cometd_msg.get("ext", {}).get("mac"):
- cometd_client.player_id = player_id
- if (uuid := cometd_msg.get("ext", {}).get("uuid")) and (
- player := self.mass.players.get(player_id)
- ):
- player.extra_data["uuid"] = uuid
-
- elif channel in ("/meta/connect", "/meta/reconnect"):
- # (re)connect message
- logger.debug("Client (re-)connected: %s", clientid)
- streaming = cometd_msg["connectionType"] == "streaming"
- # confirm the connection
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- "timestamp": time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime()),
- "advice": {
- # update interval for streaming mode
- "interval": 5000 if streaming else 0
- },
- }
- )
- # TODO: do we want to implement long-polling support too ?
- # https://github.com/Logitech/slimserver/blob/d9ebda7ebac41e82f1809dd85b0e4446e0c9be36/Slim/Web/Cometd.pm#L292
-
- elif channel == "/meta/disconnect":
- # disconnect message
- logger.debug("CometD Client disconnected: %s", clientid)
- self._cometd_clients.pop(clientid)
- return web.json_response(
- [
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- "timestamp": time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime()),
- }
- ]
- )
-
- elif channel == "/meta/subscribe":
- cometd_client.meta_subscriptions.add(cometd_msg["subscription"])
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- "subscription": cometd_msg["subscription"],
- }
- )
-
- elif channel == "/meta/unsubscribe":
- if cometd_msg["subscription"] in cometd_client.meta_subscriptions:
- cometd_client.meta_subscriptions.remove(cometd_msg["subscription"])
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- "subscription": cometd_msg["subscription"],
- }
- )
- elif channel == "/slim/subscribe":
- # A request to execute & subscribe to some Logitech Media Server event
- # A valid /slim/subscribe message looks like this:
- # {
- # channel => '/slim/subscribe',
- # id => <unique id>,
- # data => {
- # response => '/slim/serverstatus', # the channel all messages should be sent back on
- # request => [ '', [ 'serverstatus', 0, 50, 'subscribe:60' ],
- # priority => <value>, # optional priority value, is passed-through with the response
- # }
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- }
- )
- cometd_client.slim_subscriptions[cometd_msg["data"]["response"]] = cometd_msg
- # Return one-off result now, rest is handled by the subscription logic
- self._handle_cometd_request(cometd_client, cometd_msg)
-
- elif channel == "/slim/unsubscribe":
- # A request to unsubscribe from a Logitech Media Server event, this is not the same as /meta/unsubscribe
- # A valid /slim/unsubscribe message looks like this:
- # {
- # channel => '/slim/unsubscribe',
- # data => {
- # unsubscribe => '/slim/serverstatus',
- # }
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- }
- )
- cometd_client.slim_subscriptions.pop(cometd_msg["data"]["unsubscribe"], None)
-
- elif channel == "/slim/request":
- # A request to execute a one-time Logitech Media Server event
- # A valid /slim/request message looks like this:
- # {
- # channel => '/slim/request',
- # id => <unique id>, (optional)
- # data => {
- # response => '/slim/<clientId>/request',
- # request => [ '', [ 'menu', 0, 100, ],
- # priority => <value>, # optional priority value, is passed-through with the response
- # }
- if not msgid:
- # If the caller does not want the response, id will be undef
- logger.debug("Not sending response to request, caller does not want it")
- else:
- # This response is optional, but we do it anyway
- response.append(
- {
- "id": msgid,
- "channel": channel,
- "clientId": clientid,
- "successful": True,
- }
- )
- self._handle_cometd_request(cometd_client, cometd_msg)
- else:
- logger.warning("Unhandled channel %s", channel)
- # always reply with the (default) response to every message
- response.append(
- {
- "channel": channel,
- "id": msgid,
- "clientId": clientid,
- "successful": True,
- }
- )
- # append any remaining messages from the queue
- while True:
- try:
- msg = cometd_client.queue.get_nowait()
- response.append(msg)
- except asyncio.QueueEmpty:
- break
- # send response
- headers = {
- "Server": "Logitech Media Server (7.9.9 - 1667251155)",
- "Cache-Control": "no-cache",
- "Pragma": "no-cache",
- "Expires": "-1",
- "Connection": "keep-alive",
- }
- # regular command/handshake messages are just replied and connection closed
- if not streaming:
- return web.json_response(response, headers=headers)
-
- # streaming mode: send messages from the queue to the client
- # the subscription connection is kept open and events are streamed to the client
- headers.update(
- {
- "Content-Type": "application/json",
- }
- )
- resp = web.StreamResponse(
- status=200,
- reason="OK",
- headers=headers,
- )
- resp.enable_chunked_encoding()
- await resp.prepare(request)
- chunk = json_dumps(response).encode("utf8")
- await resp.write(chunk)
-
- # keep delivering messages to the client until it disconnects
- # keep sending messages/events from the client's queue
- while True:
- # make sure we always send an array of messages
- msg = [await cometd_client.queue.get()]
- try:
- chunk = json_dumps(msg).encode("utf8")
- await resp.write(chunk)
- cometd_client.last_seen = int(time.time())
- except ConnectionResetError:
- return resp
- return resp
-
- def _handle_cometd_request(self, client: CometDClient, cometd_request: dict[str, Any]) -> None:
- """Handle request for CometD client (and put result on client queue)."""
-
- async def _handle() -> None:
- result = await self._handle_request(cometd_request["data"]["request"])
- await client.queue.put(
- {
- "channel": cometd_request["data"]["response"],
- "id": cometd_request["id"],
- "data": result,
- "ext": {"priority": cometd_request["data"].get("priority")},
- }
- )
-
- self.mass.create_task(_handle())
-
- async def _handle_request(self, params: tuple[str, list[str | int]]) -> Any:
- """Handle command for either JSON or CometD request."""
- # Slim request handler
- # {"method":"slim.request","id":1,"params":["aa:aa:ca:5a:94:4c",["status","-", 2, "tags:xcfldatgrKN"]]}
- self.logger.debug(
- "Handling request: %s",
- str(params),
- )
- player_id = params[0]
- command = str(params[1][0])
- args, kwargs = parse_args(params[1][1:])
- if player_id and "seq_no" in kwargs and (player := self.mass.players.get(player_id)):
- player.extra_data["seq_no"] = int(kwargs["seq_no"])
- if handler := getattr(self, f"_handle_{command}", None):
- # run handler for command
- cmd_result = handler(player_id, *args, **kwargs)
- if asyncio.iscoroutine(cmd_result):
- cmd_result = await cmd_result
- if cmd_result is None:
- cmd_result = {}
- elif not isinstance(cmd_result, dict):
- # individual values are returned with underscore ?!
- cmd_result = {f"_{command}": cmd_result}
- return cmd_result
- # no handler found
- self.logger.warning("No handler for %s", command)
- return None
-
- def _handle_players(
- self,
- player_id: str,
- start_index: int | str = 0,
- limit: int = 999,
- **kwargs,
- ) -> PlayersResponse:
- """Handle players command."""
- players: list[PlayerItem] = []
- for index, mass_player in enumerate(self.mass.players.all()):
- if not isinstance(start_index, int):
- start_index = 0
- if isinstance(start_index, int) and index < start_index:
- continue
- if len(players) > limit:
- break
- players.append(player_item_from_mass(start_index + index, mass_player))
- return PlayersResponse(count=len(players), players_loop=players)
-
- async def _handle_status(
- self,
- player_id: str,
- offset: int | str = "-",
- limit: int = 2,
- menu: str = "",
- useContextMenu: int | bool = False, # noqa: N803
- tags: str = "xcfldatgrKN",
- **kwargs,
- ) -> PlayerStatusResponse:
- """Handle player status command."""
- player = self.mass.players.get(player_id)
- if player is None:
- return None
- queue = self.mass.player_queues.get_active_queue(player_id)
- assert queue is not None
- start_index = queue.current_index or 0 if offset == "-" else offset
- queue_items: list[QueueItem] = []
- index = 0
- async for item in self.mass.player_queues.items(queue.queue_id):
- if index >= start_index:
- queue_items.append(item)
- if len(queue_items) == limit:
- break
- index += 1
- # base details
- result = {
- "player_name": player.display_name,
- "player_connected": int(player.available),
- "player_needs_upgrade": False,
- "player_is_upgrading": False,
- "power": int(player.powered),
- "signalstrength": 0,
- "waitingToPlay": 0, # TODO?
- }
- # additional details if player powered
- if player.powered:
- result = {
- **result,
- "mode": PLAYMODE_MAP[queue.state],
- "remote": 1,
- "current_title": "Music Assistant",
- "time": queue.elapsed_time,
- "rate": 1,
- "duration": queue.current_item.duration if queue.current_item else 0,
- "sleep": 0,
- "will_sleep_in": 0,
- "sync_master": player.synced_to,
- "sync_slaves": ",".join(x for x in player.group_childs if x != player_id),
- "mixer volume": player.volume_level,
- "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
- "playlist shuffle": int(queue.shuffle_enabled),
- "playlist_timestamp": queue.elapsed_time_last_updated,
- "playlist_cur_index": queue.current_index,
- "playlist_tracks": queue.items,
- "seq_no": player.extra_data.get("seq_no", 0),
- "player_ip": player.device_info.address,
- "digital_volume_control": 1,
- "can_seek": 1,
- "playlist mode": "off",
- "playlist_loop": [
- playlist_item_from_mass(
- self.mass,
- item,
- queue.current_index + index,
- queue.current_index == (queue.current_index + index),
- )
- for index, item in enumerate(queue_items)
- ],
- }
- # additional details if menu requested
- if menu == "menu":
- # in menu-mode the regular playlist_loop is replaced by item_loop
- result.pop("playlist_loop", None)
- presets = await self._get_preset_items(player_id)
- preset_data: list[dict] = []
- preset_loop: list[int] = []
- for _, media_item in presets:
- preset_data.append(
- {
- "URL": media_item["params"]["uri"],
- "text": media_item["track"],
- "type": "audio",
- }
- )
- preset_loop.append(1)
- while len(preset_loop) < 10:
- preset_data.append({})
- preset_loop.append(0)
- result = {
- **result,
- "alarm_state": "none",
- "alarm_snooze_seconds": 540,
- "alarm_timeout_seconds": 3600,
- "count": len(queue_items),
- "offset": offset,
- "base": {
- "actions": {
- "more": {
- "itemsParams": "params",
- "window": {"isContextMenu": 1},
- "cmd": ["contextmenu"],
- "player": 0,
- "params": {"context": "playlist", "menu": "track"},
- }
- }
- },
- "preset_loop": preset_loop,
- "preset_data": preset_data,
- "item_loop": [
- menu_item_from_queue_item(
- self.mass,
- item,
- queue.current_index + index,
- queue.current_index == (queue.current_index + index),
- )
- for index, item in enumerate(queue_items)
- ],
- }
- # additional details if contextmenu requested
- if bool(useContextMenu):
- result = {
- **result,
- # TODO ?!,
- }
-
- return result
-
- async def _handle_serverstatus(
- self,
- player_id: str,
- start_index: int = 0,
- limit: int = 2,
- **kwargs,
- ) -> ServerStatusResponse:
- """Handle server status command."""
- if start_index == "-":
- start_index = 0
- players: list[PlayerItem] = []
- for index, mass_player in enumerate(self.mass.players.all()):
- if isinstance(start_index, int) and index < start_index:
- continue
- if len(players) > limit:
- break
- players.append(player_item_from_mass(start_index + index, mass_player))
- return ServerStatusResponse(
- {
- "httpport": self.mass.streams.publish_port,
- "ip": self.mass.streams.publish_ip,
- "version": "7.999.999",
- "uuid": self.mass.server_id,
- # TODO: set these vars ?
- "info total duration": 0,
- "info total genres": 0,
- "sn player count": 0,
- "lastscan": 1685548099,
- "info total albums": 0,
- "info total songs": 0,
- "info total artists": 0,
- "players_loop": players,
- "player count": len(players),
- "other player count": 0,
- "other_players_loop": [],
- }
- )
-
- async def _handle_firmwareupgrade(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> ServerStatusResponse:
- """Handle firmwareupgrade command."""
- return {
- "firmwareUpgrade": 0,
- "relativeFirmwareUrl": "/firmware/baby_7.7.3_r16676.bin",
- }
-
- async def _handle_artworkspec(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> ServerStatusResponse:
- """Handle firmwareupgrade command."""
- # https://github.com/Logitech/slimserver/blob/e9c2f88e7ca60b3648b66116240f3f5fe6ca3188/Slim/Control/Commands.pm#L224
- return None
-
- def _handle_mixer(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player mixer command."""
- arg = args[0] if args else "?"
- player = self.mass.players.get(player_id)
- assert player is not None
-
- # <playerid> mixer volume <0 .. 100|-100 .. +100|?>
- if subcommand == "volume" and isinstance(arg, int):
- if "seq_no" in kwargs:
- # handle a (jive based) squeezebox that already executed the command
- # itself and just reports the new state
- player.volume_level = arg
- # self.mass.players.update(player_id)
- else:
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, arg)
- return None
- if subcommand == "volume" and arg == "?":
- return player.volume_level
- if subcommand == "volume" and "+" in arg:
- volume_level = min(100, player.volume_level + int(arg.split("+")[1]))
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
- return None
- if subcommand == "volume" and "-" in arg:
- volume_level = max(0, player.volume_level - int(arg.split("-")[1]))
- self.mass.create_task(self.mass.players.cmd_volume_set, player_id, volume_level)
- return None
-
- # <playerid> mixer muting <0|1|toggle|?|>
- if subcommand == "muting" and isinstance(arg, int):
- self.mass.create_task(self.mass.players.cmd_volume_mute, player_id, int(arg))
- return None
- if subcommand == "muting" and arg == "toggle":
- self.mass.create_task(
- self.mass.players.cmd_volume_mute, player_id, not player.volume_muted
- )
- return None
- if subcommand == "muting":
- return int(player.volume_muted)
- self.logger.warning(
- "No handler for mixer/%s (player: %s - args: %s - kwargs: %s)",
- subcommand,
- player_id,
- str(args),
- str(kwargs),
- )
- return None
-
- def _handle_time(self, player_id: str, number: str | int) -> int | None:
- """Handle player `time` command."""
- # <playerid> time <number|-number|+number|?>
- # The "time" command allows you to query the current number of seconds that the
- # current song has been playing by passing in a "?".
- # You may jump to a particular position in a song by specifying a number of seconds
- # to seek to. You may also jump to a relative position within a song by putting an
- # explicit "-" or "+" character before a number of seconds you would like to seek.
- player_queue = self.mass.player_queues.get_active_queue(player_id)
- assert player_queue is not None
-
- if number == "?":
- return int(player_queue.corrected_elapsed_time)
-
- if isinstance(number, str) and ("+" in number or "-" in number):
- jump = int(number.split("+")[1])
- self.mass.create_task(self.mass.player_queues.skip, player_queue.queue_id, jump)
- return None
- else:
- self.mass.create_task(self.mass.player_queues.seek, player_queue.queue_id, int(number))
- return None
-
- def _handle_power(self, player_id: str, value: str | int, *args, **kwargs) -> int | None:
- """Handle player `time` command."""
- # <playerid> power <0|1|?|>
- # The "power" command turns the player on or off.
- # Use 0 to turn off, 1 to turn on, ? to query and
- # no parameter to toggle the power state of the player.
- player = self.mass.players.get(player_id)
- assert player is not None
-
- if value == "?":
- return int(player.powered)
- if "seq_no" in kwargs:
- # handle a (jive based) squeezebox that already executed the command
- # itself and just reports the new state
- player.powered = bool(value)
- # self.mass.players.update(player_id)
- return None
-
- self.mass.create_task(self.mass.players.cmd_power, player_id, bool(value))
- return None
-
- def _handle_playlist(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `playlist` command."""
- arg = args[0] if args else "?"
- queue = self.mass.player_queues.get_active_queue(player_id)
- assert queue is not None
-
- # <playerid> playlist index <index|+index|-index|?> <fadeInSecs>
- if subcommand == "index" and isinstance(arg, int):
- self.mass.create_task(self.mass.player_queues.play_index, player_id, arg)
- return None
- if subcommand == "index" and arg == "?":
- return queue.current_index
- if subcommand == "index" and "+" in arg:
- next_index = (queue.current_index or 0) + int(arg.split("+")[1])
- self.mass.create_task(self.mass.player_queues.play_index, player_id, next_index)
- return None
- if subcommand == "index" and "-" in arg:
- next_index = (queue.current_index or 0) - int(arg.split("-")[1])
- self.mass.create_task(self.mass.player_queues.play_index, player_id, next_index)
- return None
- if subcommand == "shuffle" and arg == "?":
- return queue.shuffle_enabled
- if subcommand == "shuffle":
- self.mass.player_queues.set_shuffle(queue.queue_id, bool(arg))
- return None
- if subcommand == "repeat" and arg == "?":
- return str(REPEATMODE_MAP[queue.repeat_mode])
- if subcommand == "repeat":
- repeat_map = {val: key for key, val in REPEATMODE_MAP.items()}
- new_repeat_mode = repeat_map.get(int(arg))
- self.mass.player_queues.set_repeat(queue.queue_id, new_repeat_mode)
- return None
-
- self.logger.warning("Unhandled command: playlist/%s", subcommand)
- return None
-
- def _handle_playlistcontrol(
- self,
- player_id: str,
- *args,
- cmd: str,
- uri: str,
- **kwargs,
- ) -> int | None:
- """Handle player `playlistcontrol` command."""
- queue = self.mass.player_queues.get_active_queue(player_id)
- if cmd == "play":
- self.mass.create_task(
- self.mass.player_queues.play_media(queue.queue_id, uri, QueueOption.PLAY)
- )
- return
- if cmd == "load":
- self.mass.create_task(
- self.mass.player_queues.play_media(queue.queue_id, uri, QueueOption.REPLACE)
- )
- return
- if cmd == "add":
- self.mass.create_task(
- self.mass.player_queues.play_media(queue.queue_id, uri, QueueOption.ADD)
- )
- return
- if cmd == "insert":
- self.mass.create_task(
- self.mass.player_queues.play_media(queue.queue_id, uri, QueueOption.NEXT)
- )
- return
- self.logger.warning("Unhandled command: playlistcontrol/%s", cmd)
-
- def _handle_play(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `play` command."""
- queue = self.mass.player_queues.get_active_queue(player_id)
- assert queue is not None
- self.mass.create_task(self.mass.player_queues.play, player_id)
-
- def _handle_stop(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `stop` command."""
- queue = self.mass.player_queues.get_active_queue(player_id)
- assert queue is not None
- self.mass.create_task(self.mass.player_queues.stop, player_id)
-
- def _handle_pause(
- self,
- player_id: str,
- force: int = 0,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player `stop` command."""
- queue = self.mass.player_queues.get_active_queue(player_id)
- assert queue is not None
-
- if force or queue.state == PlayerState.PLAYING:
- self.mass.create_task(self.mass.player_queues.pause, player_id)
- else:
- self.mass.create_task(self.mass.player_queues.play, player_id)
-
- def _handle_mode(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player 'mode' command."""
- if subcommand == "play":
- return self._handle_play(player_id, *args, **kwargs)
- if subcommand == "pause":
- return self._handle_pause(player_id, *args, **kwargs)
- if subcommand == "stop":
- return self._handle_stop(player_id, *args, **kwargs)
-
- self.logger.warning(
- "No handler for mode/%s (player: %s - args: %s - kwargs: %s)",
- subcommand,
- player_id,
- str(args),
- str(kwargs),
- )
- return None
-
- def _handle_button(
- self,
- player_id: str,
- subcommand: str,
- *args,
- **kwargs,
- ) -> int | None:
- """Handle player 'button' command."""
- player = self.mass.players.get(player_id)
- assert player is not None
-
- if subcommand == "volup":
- self.mass.create_task(self.mass.players.cmd_volume_up, player_id)
- return
- if subcommand == "voldown":
- self.mass.create_task(self.mass.players.cmd_volume_down, player_id)
- return
- if subcommand == "power":
- self.mass.create_task(self.mass.players.cmd_power, player_id, not player.powered)
- return
- # queue related button commands
- queue = self.mass.player_queues.get_active_queue(player_id)
- if subcommand == "fwd":
- self.mass.create_task(self.mass.player_queues.next, queue.queue_id)
- return
- if subcommand == "rew":
- self.mass.create_task(self.mass.player_queues.previous, queue.queue_id)
- return
- if subcommand == "jump_fwd":
- self.mass.create_task(self.mass.player_queues.skip, queue.queue_id, 10)
- return
- if subcommand == "jump_rew":
- self.mass.create_task(self.mass.player_queues.skip, queue.queue_id, -10)
- return
- if subcommand == "shuffle":
- self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
- return
- if subcommand == "repeat":
- if queue.repeat_mode == RepeatMode.ALL:
- new_repeat_mode = RepeatMode.OFF
- elif queue.repeat_mode == RepeatMode.OFF:
- new_repeat_mode = RepeatMode.ONE
- else:
- new_repeat_mode = RepeatMode.ALL
- self.mass.player_queues.set_repeat(queue.queue_id, new_repeat_mode)
- return
- if subcommand.startswith("preset_"):
- preset_index = subcommand.split("preset_")[1].split(".")[0]
- if preset_uri := self.mass.config.get_raw_player_config_value(
- player_id, f"preset_{preset_index}"
- ):
- option = QueueOption.REPLACE if "playlist" in preset_uri else QueueOption.PLAY
- self.mass.create_task(
- self.mass.player_queues.play_media,
- queue.queue_id,
- preset_uri,
- option,
- )
- return
-
- self.logger.warning(
- "No handler for button/%s (player: %s - args: %s - kwargs: %s)",
- subcommand,
- player_id,
- str(args),
- str(kwargs),
- )
-
- async def _handle_menu(
- self,
- player_id: str,
- offset: int = 0,
- limit: int = 10,
- **kwargs,
- ) -> dict[str, Any]:
- """Handle menu request from CLI."""
- menu_items = []
- # we keep it simple for now and only add the presets to the 'My Music' menu
- for preset_id, media_item in await self._get_preset_items(player_id):
- menu_items.append(
- {
- **media_item,
- "id": f"preset_{preset_id}",
- "node": "myMusic",
- # prefer short title in menu structure
- "text": media_item["track"],
- "homeMenuText": media_item["text"],
- "weight": 80,
- }
- )
- # add basic queue settings such as shuffle and repeat
- menu_items += [
- {
- "node": "settings",
- "isANode": 1,
- "id": "settingsAudio",
- "text": "Audio",
- "weight": 35,
- },
- {
- "selectedIndex": 1,
- "actions": {
- "do": {
- "choices": [
- {"player": 0, "cmd": ["playlist", "repeat", "0"]},
- {"player": 0, "cmd": ["playlist", "repeat", "1"]},
- {"player": 0, "cmd": ["playlist", "repeat", "2"]},
- ]
- }
- },
- "choiceStrings": ["Off", "Song", "Playlist"],
- "id": "settingsRepeat",
- "node": "settings",
- "text": "Repeat",
- "weight": 20,
- },
- {
- "actions": {
- "do": {
- "choices": [
- {"cmd": ["playlist", "shuffle", "0"], "player": 0},
- {"cmd": ["playlist", "shuffle", "1"], "player": 0},
- ]
- }
- },
- "choiceStrings": ["Off", "On"],
- "selectedIndex": 1,
- "id": "settingsShuffle",
- "node": "settings",
- "weight": 10,
- "text": "Shuffle",
- },
- {
- "actions": {
- "do": {
- "choices": [
- {"cmd": ["playlist", "crossfade", "0"], "player": 0},
- {"cmd": ["playlist", "crossfade", "1"], "player": 0},
- ]
- }
- },
- "choiceStrings": ["Off", "On"],
- "selectedIndex": 1,
- "iconStyle": "hm_settingsAudio",
- "id": "settingsXfade",
- "node": "settings",
- "weight": 10,
- "text": "Crossfade",
- },
- ]
- return {
- "item_loop": menu_items[offset:limit],
- "offset": offset,
- "count": len(menu_items[offset:limit]),
- }
-
- async def _handle_browselibrary(
- self,
- player_id: str,
- subcommand: str,
- offset: int = 0,
- limit: int = 10,
- mode: str = "playlists",
- *args,
- **kwargs,
- ) -> dict[str, Any]:
- """Handle menustatus request from CLI."""
- if mode == "albumartists":
- items = (
- await self.mass.music.artists.album_artists(True, limit=limit, offset=offset)
- ).items
- elif mode == "artists":
- items = (
- await self.mass.music.artists.library_items(True, limit=limit, offset=offset)
- ).items
- elif mode == "artist" and "uri" in kwargs:
- artist = await self.mass.music.get_item_by_uri(kwargs["uri"])
- items = await self.mass.music.artists.tracks(artist.item_id, artist.provider)
- elif mode == "albums":
- items = (
- await self.mass.music.albums.library_items(True, limit=limit, offset=offset)
- ).items
- elif mode == "album" and "uri" in kwargs:
- album = await self.mass.music.get_item_by_uri(kwargs["uri"])
- items = await self.mass.music.albums.tracks(album.item_id, album.provider)
- elif mode == "playlists":
- items = (
- await self.mass.music.playlists.library_items(True, limit=limit, offset=offset)
- ).items
- elif mode == "radios":
- items = (
- await self.mass.music.radio.library_items(True, limit=limit, offset=offset)
- ).items
- elif mode == "playlist" and "uri" in kwargs:
- playlist = await self.mass.music.get_item_by_uri(kwargs["uri"])
- items = [
- x
- async for x in self.mass.music.playlists.tracks(playlist.item_id, playlist.provider)
- ]
- else:
- items = []
- return {
- "base": {
- "actions": {
- "go": {
- "params": {"menu": 1, "mode": "playlisttracks"},
- "itemsParams": "commonParams",
- "player": 0,
- "cmd": ["browselibrary", "items"],
- },
- "add": {
- "player": 0,
- "itemsParams": "commonParams",
- "params": {"menu": 1, "cmd": "add"},
- "cmd": ["playlistcontrol"],
- },
- "more": {
- "player": 0,
- "itemsParams": "commonParams",
- "params": {"menu": 1, "cmd": "add"},
- "cmd": ["playlistcontrol"],
- },
- "play": {
- "cmd": ["playlistcontrol"],
- "itemsParams": "commonParams",
- "params": {"menu": 1, "cmd": "play"},
- "player": 0,
- "nextWindow": "nowPlaying",
- },
- "play-hold": {
- "cmd": ["playlistcontrol"],
- "itemsParams": "commonParams",
- "params": {"menu": 1, "cmd": "load"},
- "player": 0,
- "nextWindow": "nowPlaying",
- },
- "add-hold": {
- "itemsParams": "commonParams",
- "params": {"menu": 1, "cmd": "insert"},
- "player": 0,
- "cmd": ["playlistcontrol"],
- },
- }
- },
- "window": {"windowStyle": "icon_list"},
- "item_loop": [
- {
- **menu_item_from_media_item(self.mass, item, include_actions=True),
- "presetParams": {
- "favorites_title": item.name,
- "favorites_url": item.uri,
- "favorites_type": item.media_type.value,
- "icon": (
- self.mass.metadata.get_image_url(item.image, 256) if item.image else ""
- ),
- },
- "textkey": item.name[0].upper(),
- "commonParams": {
- "uri": item.uri,
- "noEdit": 1,
- f"{item.media_type.value}_id": item.item_id,
- },
- }
- for item in items
- ],
- "offset": offset,
- "count": len(items),
- }
-
- def _handle_menustatus(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> dict[str, Any]:
- """Handle menustatus request from CLI."""
- return None
-
- def _handle_displaystatus(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> dict[str, Any]:
- """Handle displaystatus request from CLI."""
- return None
-
- def _handle_date(
- self,
- player_id: str,
- *args,
- **kwargs,
- ) -> dict[str, Any]:
- """Handle date request from CLI."""
- return {"date_epoch": int(time.time()), "date": "0000-00-00T00:00:00+00:00"}
-
- async def _on_mass_event(self, event: MassEvent) -> None:
- """Forward ."""
- player_id = event.object_id
- if not player_id:
- return
- for client in self._cometd_clients.values():
- if sub := client.slim_subscriptions.get(
- f"/{client.client_id}/slim/playerstatus/{player_id}"
- ):
- self._handle_cometd_request(client, sub)
-
- async def _do_periodic(self) -> None:
- """Execute periodic sending of state and cleanup."""
- while True:
- # cleanup orphaned clients
- disconnected_clients = set()
- for cometd_client in self._cometd_clients.values():
- if (time.time() - cometd_client.last_seen) > 80:
- disconnected_clients.add(cometd_client.client_id)
- continue
- for clientid in disconnected_clients:
- client = self._cometd_clients.pop(clientid)
- empty_queue(client.queue)
- self.logger.debug("Cleaned up disconnected CometD Client: %s", clientid)
- # handle client subscriptions
- for cometd_client in self._cometd_clients.values():
- for sub in cometd_client.slim_subscriptions.values():
- self._handle_cometd_request(cometd_client, sub)
-
- await asyncio.sleep(60)
-
- async def _get_preset_items(self, player_id: str) -> list[tuple[int, SlimMenuItem]]:
- """Return all presets for a player."""
- preset_items: list[tuple[int, MediaItemType]] = []
- for preset_index in range(1, 100):
- if preset_conf := self.mass.config.get_raw_player_config_value(
- player_id, f"preset_{preset_index}"
- ):
- with contextlib.suppress(MusicAssistantError):
- media_item = await self.mass.music.get_item_by_uri(preset_conf)
- slim_media_item = menu_item_from_media_item(self.mass, media_item, True)
- preset_items.append((preset_index, slim_media_item))
- else:
- break
- return preset_items
-
-
-def dict_to_strings(source: dict) -> list[str]:
- """Convert dict to key:value strings (used in slimproto cli)."""
- result: list[str] = []
-
- for key, value in source.items():
- if value in (None, ""):
- continue
- if isinstance(value, list):
- for subval in value:
- if isinstance(subval, dict):
- result += dict_to_strings(subval)
- else:
- result.append(str(subval))
- elif isinstance(value, dict):
- result += dict_to_strings(value)
- else:
- result.append(f"{key}:{value!s}")
- return result
{
"type": "player",
"domain": "slimproto",
- "name": "Slimproto",
+ "name": "Slimproto (Squeezebox players)",
"description": "Support for slimproto based players (e.g. squeezebox, squeezelite).",
"codeowners": [
"@music-assistant"
],
"requirements": [
- "aioslimproto==2.3.3"
+ "aioslimproto==3.0.0"
],
"documentation": "https://music-assistant.io/player-support/slimproto/",
"multi_instance": false,
+++ /dev/null
-"""Models used for the JSON-RPC API."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Any, TypedDict
-
-from music_assistant.common.models.enums import MediaType, PlayerState, RepeatMode
-
-if TYPE_CHECKING:
- from music_assistant.common.models.media_items import MediaItemType
- from music_assistant.common.models.player import Player
- from music_assistant.common.models.queue_item import QueueItem
- from music_assistant.server import MusicAssistant
-
-# ruff: noqa: UP013
-
-PLAYMODE_MAP = {
- PlayerState.IDLE: "stop",
- PlayerState.PLAYING: "play",
- PlayerState.PAUSED: "pause",
-}
-
-REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
-
-
-class CommandMessage(TypedDict):
- """Representation of Base JSON RPC Command Message."""
-
- # https://www.jsonrpc.org/specification
-
- id: int | str
- method: str
- params: tuple[str, list[str | int]]
-
-
-class CommandResultMessage(CommandMessage):
- """Representation of JSON RPC Result Message."""
-
- result: Any
-
-
-class ErrorDetails(TypedDict):
- """Representation of JSON RPC ErrorDetails."""
-
- code: int
- message: str
-
-
-class CommandErrorMessage(CommandMessage, TypedDict):
- """Base Representation of JSON RPC Command Message."""
-
- id: int | str | None
- error: ErrorDetails
-
-
-class CometDResponse(TypedDict):
- """CometD Response Message."""
-
- channel: str
- id: str
- data: dict[str, Any]
-
-
-class SlimSubscribeData(CometDResponse):
- """CometD SlimSubscribe Data."""
-
- response: str # e.g. '/slim/serverstatus', the channel all messages should be sent back on
- request: tuple[str, list[str | int]] # [ '', [ 'serverstatus', 0, 50, 'subscribe:60' ]
- priority: int # # optional priority value, is passed-through with the response
-
-
-class SlimSubscribeMessage(CometDResponse):
- """CometD SlimSubscribe Message."""
-
- channel: str
- id: str
- data: SlimSubscribeData
-
-
-PlayerItem = TypedDict(
- "PlayerItem",
- {
- "playerindex": str,
- "playerid": str,
- "name": str,
- "modelname": str,
- "connected": int,
- "isplaying": int,
- "power": int,
- "model": str,
- "canpoweroff": int,
- "firmware": str,
- "isplayer": int,
- "displaytype": str,
- "uuid": str | None,
- "seq_no": str,
- "ip": str,
- },
-)
-
-
-def player_item_from_mass(playerindex: int, player: Player) -> PlayerItem:
- """Parse PlayerItem for the Json RPC interface from MA QueueItem."""
- return {
- "playerindex": str(playerindex),
- "playerid": player.player_id,
- "name": player.display_name,
- "modelname": player.device_info.model,
- "connected": int(player.available),
- "isplaying": 1 if player.state == PlayerState.PLAYING else 0,
- "power": int(player.powered),
- "model": player.provider,
- "canpoweroff": 1,
- "firmware": "unknown",
- "isplayer": 1,
- "displaytype": "none",
- "uuid": player.extra_data.get("uuid"),
- "seq_no": str(player.extra_data.get("seq_no", 0)),
- "ip": player.device_info.address,
- }
-
-
-PlayersResponse = TypedDict(
- "PlayersResponse",
- {
- "count": int,
- "players_loop": list[PlayerItem],
- },
-)
-
-
-PlaylistItem = TypedDict(
- "PlaylistItem",
- {
- "playlist index": int,
- "id": str,
- "title": str,
- "artist": str,
- "remote": int,
- "remote_title": str,
- "artwork_url": str,
- "bitrate": str,
- "samplerate": str,
- "samplesize": str,
- "duration": str | int | None,
- "coverid": str,
- "params": dict,
- },
-)
-
-
-def playlist_item_from_mass(
- mass: MusicAssistant, queue_item: QueueItem, index: int = 0, is_cur_index: bool = False
-) -> PlaylistItem:
- """Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
- samplerate = "44100"
- samplesize = "16"
- bitrate = "1411"
- if (
- is_cur_index
- and queue_item.streamdetails
- and queue_item.streamdetails.stream_title
- and " - " in queue_item.streamdetails.stream_title
- ):
- # radio with remote stream title present
- # artist and title parsed from stream title
- artist, title = queue_item.streamdetails.stream_title.split(" - ")
- album = queue_item.name
- elif queue_item.media_item and queue_item.media_item.media_type == MediaType.TRACK:
- # track with all metadata
- artist = queue_item.media_item.artists[0].name if queue_item.media_item.artists else ""
- album = queue_item.media_item.album.name if queue_item.media_item.album else ""
- title = queue_item.media_item.name
- elif queue_item.media_item and queue_item.media_item.metadata.description:
- # (radio) item with description field
- album = queue_item.media_item.metadata.description
- artist = ""
- title = queue_item.media_item.name
- else:
- title = queue_item.name
- artist = ""
- album = queue_item.media_type.value
- if queue_item.streamdetails:
- samplerate = str(queue_item.streamdetails.audio_format.sample_rate)
- samplesize = str(queue_item.streamdetails.audio_format.bit_depth)
- bitrate = str(queue_item.streamdetails.audio_format.bit_rate)
- return {
- "playlist index": index,
- "id": "-187651250107376",
- "title": title,
- "artist": artist,
- "album": album,
- "remote": 1,
- "artwork_url": (
- mass.metadata.get_image_url(queue_item.image, 512) if queue_item.image else ""
- ),
- "coverid": "-187651250107376",
- "duration": queue_item.duration,
- "bitrate": bitrate,
- "samplerate": samplerate,
- "samplesize": samplesize,
- }
-
-
-MenuItemParams = TypedDict(
- "MediaItemParams",
- {
- "track_id": str | int,
- "playlist_index": int,
- },
-)
-
-
-class SlimMenuItem(TypedDict):
- """Representation of MediaItem details."""
-
- style: str
- track: str
- album: str
- trackType: str
- icon: str
- artist: str
- text: str
- params: MenuItemParams
- type: str
- actions: dict # optional
-
-
-def menu_item_from_queue_item(
- mass: MusicAssistant, queue_item: QueueItem, index: int = 0, is_cur_index: bool = False
-) -> SlimMenuItem:
- """Parse SlimMenuItem from MA QueueItem."""
- if queue_item.media_item:
- # media item
- media_details = menu_item_from_media_item(mass, queue_item.media_item)
- media_details["params"]["playlist_index"] = index
- else:
- # fallback/generic queue item
- media_details = SlimMenuItem(
- style="itemplay",
- track=queue_item.name,
- album="",
- trackType="radio",
- icon=mass.metadata.get_image_url(queue_item.image, 512) if queue_item.image else "",
- artist="",
- text=queue_item.name,
- params={
- "playlist_index": index,
- "item_id": queue_item.queue_item_id,
- "uri": queue_item.uri,
- },
- type=queue_item.media_type,
- )
- if (
- is_cur_index
- and queue_item.streamdetails
- and queue_item.streamdetails.stream_title
- and " - " in queue_item.streamdetails.stream_title
- ):
- # radio with remote stream title present
- # artist and title parsed from stream title
- artist, track = queue_item.streamdetails.stream_title.split(" - ")
- media_details["artist"] = artist
- media_details["track"] = track
- media_details["album"] = queue_item.name
- media_details["text"] = f"{track}\n{artist} - {queue_item.name}"
- return media_details
-
-
-def menu_item_from_media_item(
- mass: MusicAssistant, media_item: MediaItemType, include_actions: bool = False
-) -> PlaylistItem:
- """Parse (menu) MediaItem from MA MediaItem."""
- if media_item.media_type == MediaType.TRACK:
- # track with all metadata
- artist = media_item.artists[0].name if media_item.artists else ""
- album = media_item.album.name if media_item.album else ""
- title = media_item.name
- text = f"{title}\n{artist} - {album}" if album else f"{title}\n{artist}"
- elif media_item.media_type == MediaType.ALBUM:
- # album with all metadata
- artist = media_item.artists[0].name if media_item.artists else ""
- title = media_item.name
- text = f"{title}\n{artist}" if artist else f"{title}\nalbum"
- elif media_item and media_item.metadata.description:
- # (radio) item with description field
- album = media_item.metadata.description
- artist = ""
- title = media_item.name
- text = f"{media_item.metadata.description}\n{media_item.name}"
- else:
- title = media_item.name
- artist = ""
- album = media_item.media_type.value
- text = f"{title}\n{album}"
- image_url = mass.metadata.get_image_url(media_item.image, 512) if media_item.image else ""
- if media_item.media_type in (MediaType.TRACK, MediaType.RADIO):
- go_action = {
- "cmd": ["playlistcontrol"],
- "itemsParams": "commonParams",
- "params": {"uri": media_item.uri, "cmd": "play"},
- "player": 0,
- "nextWindow": "nowPlaying",
- }
- else:
- go_action = {
- "params": {
- "uri": media_item.uri,
- "mode": media_item.media_type.value,
- },
- "itemsParams": "commonParams",
- "player": 0,
- "cmd": ["browselibrary", "items"],
- }
- details = SlimMenuItem(
- track=title,
- album=album,
- trackType="radio",
- icon=image_url,
- artist=artist,
- text=text,
- params={
- "track_id": media_item.item_id,
- "item_id": media_item.item_id,
- "uri": media_item.uri,
- },
- type=media_item.media_type.value,
- )
- # optionally include actions
- if include_actions:
- details["actions"] = {
- "go": go_action,
- "add": {
- "player": 0,
- "itemsParams": "commonParams",
- "params": {"uri": media_item.uri, "cmd": "add"},
- "cmd": ["playlistcontrol"],
- "nextWindow": "refresh",
- },
- "more": {
- "player": 0,
- "itemsParams": "commonParams",
- "params": {"uri": media_item.uri, "cmd": "add"},
- "cmd": ["playlistcontrol"],
- "nextWindow": "refresh",
- },
- "play": {
- "cmd": ["playlistcontrol"],
- "itemsParams": "commonParams",
- "params": {
- "uri": media_item.uri,
- "cmd": "load" if media_item.media_type == MediaType.PLAYLIST else "play",
- },
- "player": 0,
- "nextWindow": "nowPlaying",
- },
- "play-hold": {
- "cmd": ["playlistcontrol"],
- "itemsParams": "commonParams",
- "params": {"uri": media_item.uri, "cmd": "load"},
- "player": 0,
- "nextWindow": "nowPlaying",
- },
- "add-hold": {
- "itemsParams": "commonParams",
- "params": {"uri": media_item.uri, "cmd": "insert"},
- "player": 0,
- "cmd": ["playlistcontrol"],
- "nextWindow": "refresh",
- },
- }
- if media_item.media_type in (MediaType.TRACK, MediaType.RADIO):
- details["style"] = "itemplay"
- details["nextWindow"] = "nowPlaying"
- return details
-
-
-PlayerStatusResponse = TypedDict(
- "PlayerStatusResponse",
- {
- "time": int,
- "mode": str,
- "sync_slaves": str,
- "playlist_cur_index": int | None,
- "player_name": str,
- "sync_master": str,
- "player_connected": int,
- "power": int,
- "mixer volume": int,
- "playlist repeat": int,
- "playlist shuffle": int,
- "playlist mode": str,
- "player_ip": str,
- "remoteMeta": dict | None,
- "digital_volume_control": int,
- "playlist_timestamp": float,
- "current_title": str,
- "duration": int,
- "seq_no": int,
- "remote": int,
- "can_seek": int,
- "signalstrength": int,
- "rate": int,
- "uuid": str,
- "playlist_tracks": int,
- "item_loop": list[PlaylistItem],
- },
-)
-
-
-ServerStatusResponse = TypedDict(
- "ServerStatusMessage",
- {
- "ip": str,
- "httpport": str,
- "version": str,
- "uuid": str,
- "info total genres": int,
- "sn player count": int,
- "lastscan": str,
- "info total duration": int,
- "info total albums": int,
- "info total songs": int,
- "info total artists": int,
- "players_loop": list[PlayerItem],
- "player count": int,
- "other player count": int,
- "other_players_loop": list[PlayerItem],
- },
-)
aiofiles==23.2.1
aiohttp==3.9.3
aiorun==2023.7.2
-aioslimproto==2.3.3
+aioslimproto==3.0.0
aiosqlite==0.20.0
async-upnp-client==0.38.2
asyncio-throttle==1.0.2