from music_assistant.common.models.errors import MusicAssistantError
from music_assistant.common.models.event import MassEvent
from music_assistant.common.models.media_items import MediaItemType
+from music_assistant.common.models.queue_item import QueueItem
from .models import (
+ PLAYMODE_MAP,
+ REPEATMODE_MAP,
CometDResponse,
CommandErrorMessage,
CommandMessage,
PlayersResponse,
PlayerStatusResponse,
ServerStatusResponse,
- SlimMediaItem,
+ SlimMenuItem,
SlimSubscribeMessage,
- get_media_details_from_mass,
+ menu_item_from_media_item,
+ menu_item_from_queue_item,
player_item_from_mass,
- player_status_from_mass,
+ playlist_item_from_mass,
)
if TYPE_CHECKING:
- from music_assistant.common.models.config_entries import ProviderConfig
- from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
- from music_assistant.server.models import ProviderInstanceType
from . import SlimprotoProvider
slim_subscriptions: dict[str, SlimSubscribeMessage] = field(default_factory=dict)
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- prov = LmsCli(mass, manifest, config)
- await prov.handle_setup()
- return prov
-
-
async def get_config_entries(
mass: MusicAssistant,
instance_id: str | None = None,
player_id: str,
offset: int | str = "-",
limit: int = 2,
+ menu: str = "",
+ useContextMenu: int | bool = False, # noqa: N803
tags: str = "xcfldatgrKN",
**kwargs,
) -> PlayerStatusResponse:
queue = self.mass.players.queues.get_active_queue(player_id)
assert queue is not None
start_index = queue.current_index or 0 if offset == "-" else offset
- queue_items = []
+ queue_items: list[QueueItem] = []
index = 0
async for item in self.mass.players.queues.items(queue.queue_id):
if index >= start_index:
if len(queue_items) == limit:
break
index += 1
- # we ignore the tags, just always send all info
- presets = await self._get_preset_items(player_id)
- return player_status_from_mass(
- self.mass,
- player=player,
- queue=queue,
- queue_items=queue_items,
- offset=offset,
- presets=presets,
- )
+ # base details
+ result = {
+ "player_name": player.display_name,
+ "player_connected": int(player.available),
+ "player_needs_upgrade": False,
+ "player_is_upgrading": False,
+ "power": int(player.powered),
+ "signalstrength": 0,
+ "waitingToPlay": 0, # TODO?
+ }
+ # additional details if player powered
+ if player.powered:
+ result = {
+ **result,
+ "mode": PLAYMODE_MAP[queue.state],
+ "remote": 1,
+ "current_title": "Music Assistant",
+ "time": queue.elapsed_time,
+ "rate": 1,
+ "duration": queue.current_item.duration if queue.current_item else 0,
+ "sleep": 0,
+ "will_sleep_in": 0,
+ "sync_master": player.synced_to,
+ "sync_slaves": ",".join(player.group_childs),
+ "mixer volume": player.volume_level,
+ "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
+ "playlist shuffle": int(queue.shuffle_enabled),
+ "playlist_timestamp": queue.elapsed_time_last_updated,
+ "playlist_cur_index": queue.current_index,
+ "playlist_tracks": queue.items,
+ "seq_no": player.extra_data.get("seq_no", 0),
+ "player_ip": player.device_info.address,
+ "digital_volume_control": 1,
+ "can_seek": 1,
+ "playlist mode": "off",
+ "playlist_loop": [
+ playlist_item_from_mass(
+ self.mass,
+ item,
+ queue.current_index + index,
+ queue.current_index == (queue.current_index + index),
+ )
+ for index, item in enumerate(queue_items)
+ ],
+ }
+ # additional details if menu requested
+ if menu == "menu":
+ # in menu-mode the regular playlist_loop is replaced by item_loop
+ result.pop("playlist_loop", None)
+ presets = await self._get_preset_items(player_id)
+ preset_data: list[dict] = []
+ preset_loop: list[int] = []
+ for _, media_item in presets:
+ preset_data.append(
+ {
+ "URL": media_item["params"]["uri"],
+ "text": media_item["track"],
+ "type": "audio",
+ }
+ )
+ preset_loop.append(1)
+ while len(preset_loop) < 10:
+ preset_data.append({})
+ preset_loop.append(0)
+ result = {
+ **result,
+ "alarm_state": "none",
+ "alarm_snooze_seconds": 540,
+ "alarm_timeout_seconds": 3600,
+ "count": len(queue_items),
+ "offset": offset,
+ "base": {
+ "actions": {
+ "more": {
+ "itemsParams": "params",
+ "window": {"isContextMenu": 1},
+ "cmd": ["contextmenu"],
+ "player": 0,
+ "params": {"context": "playlist", "menu": "track"},
+ }
+ }
+ },
+ "preset_loop": preset_loop,
+ "preset_data": preset_data,
+ "item_loop": [
+ menu_item_from_queue_item(
+ self.mass,
+ item,
+ queue.current_index + index,
+ queue.current_index == (queue.current_index + index),
+ )
+ for index, item in enumerate(queue_items)
+ ],
+ }
+ # additional details if contextmenu requested
+ if bool(useContextMenu):
+ result = {
+ **result,
+ # TODO ?!,
+ }
+
+ return result
async def _handle_serverstatus(
self,
**kwargs,
) -> ServerStatusResponse:
"""Handle server status command."""
+ if start_index == "-":
+ start_index = 0
players: list[PlayerItem] = []
for index, mass_player in enumerate(self.mass.players.all()):
if isinstance(start_index, int) and index < start_index:
"httpport": self.mass.webserver.port,
"ip": self.mass.base_ip,
"version": "7.999.999",
- # "uuid": self.mass.server_id,
- "uuid": "aioslimproto",
+ "uuid": self.mass.server_id,
# TODO: set these vars ?
"info total duration": 0,
"info total genres": 0,
"window": {"windowStyle": "icon_list"},
"item_loop": [
{
- **get_media_details_from_mass(self.mass, item),
+ **menu_item_from_media_item(self.mass, item, include_actions=True),
"presetParams": {
"favorites_title": item.name,
"favorites_url": item.uri,
return {"date_epoch": int(time.time()), "date": "0000-00-00T00:00:00+00:00"}
async def _on_mass_event(self, event: MassEvent) -> None:
- """Handle incoming Mass Event."""
+ """Forward ."""
player_id = event.object_id
if not player_id:
return
for client in self._cometd_clients.values():
- if sub := client.slim_subscriptions.get(f"/{client.client_id}/slim/serverstatus"):
- await client.queue.put(
- {
- "channel": sub["data"]["response"],
- "id": sub["id"],
- "data": await self._handle_serverstatus(player_id),
- }
- )
if sub := client.slim_subscriptions.get(
f"/{client.client_id}/slim/playerstatus/{player_id}"
):
- await client.queue.put(
- {
- "channel": sub["data"]["response"],
- "id": sub["id"],
- "data": await self._handle_status(player_id),
- }
- )
+ self._handle_cometd_request(client, sub)
async def _do_periodic(self) -> None:
"""Execute periodic sending of state and cleanup."""
await asyncio.sleep(60)
- async def _get_preset_items(self, player_id: str) -> list[tuple[int, SlimMediaItem]]:
+ async def _get_preset_items(self, player_id: str) -> list[tuple[int, SlimMenuItem]]:
"""Return all presets for a player."""
preset_items: list[tuple[int, MediaItemType]] = []
for preset_index in range(1, 100):
):
with contextlib.suppress(MusicAssistantError):
media_item = await self.mass.music.get_item_by_uri(preset_conf)
- slim_media_item = get_media_details_from_mass(self.mass, media_item)
+ slim_media_item = menu_item_from_media_item(self.mass, media_item, True)
preset_items.append((preset_index, slim_media_item))
else:
break
if TYPE_CHECKING:
from music_assistant.common.models.player import Player
- from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
mass: MusicAssistant, queue_item: QueueItem, index: int = 0, is_cur_index: bool = False
) -> PlaylistItem:
"""Parse PlaylistItem for the Json RPC interface from MA QueueItem."""
+ if (
+ is_cur_index
+ and queue_item.streamdetails
+ and queue_item.streamdetails.stream_title
+ and " - " in queue_item.streamdetails.stream_title
+ ):
+ # radio with remote stream title present
+ # artist and title parsed from stream title
+ artist, title = queue_item.streamdetails.stream_title.split(" - ")
+ album = queue_item.name
+ elif queue_item.media_item and queue_item.media_item.media_type == MediaType.TRACK:
+ # track with all metadata
+ artist = queue_item.media_item.artists[0].name if queue_item.media_item.artists else ""
+ album = queue_item.media_item.album.name if queue_item.media_item.album else ""
+ title = queue_item.media_item.name
+ elif queue_item.media_item and queue_item.media_item.metadata.description:
+ # (radio) item with description field
+ album = queue_item.media_item.metadata.description
+ artist = ""
+ title = queue_item.media_item.name
+ else:
+ title = queue_item.name
+ artist = ""
+ album = queue_item.media_type.value
+ return {
+ "playlist index": index,
+ "id": "-187651250107376",
+ "title": title,
+ "artist": artist,
+ "album": album,
+ "remote": 1,
+ "artwork_url": mass.metadata.get_image_url(queue_item.image, 512)
+ if queue_item.image
+ else "",
+ "coverid": "-187651250107376",
+ "duration": queue_item.duration,
+ "bitrate": "",
+ }
+
+
+MenuItemParams = TypedDict(
+ "MediaItemParams",
+ {
+ "track_id": str | int,
+ "playlist_index": int,
+ },
+)
+
+
+class SlimMenuItem(TypedDict):
+ """Representation of MediaItem details."""
+
+ style: str
+ track: str
+ album: str
+ trackType: str # noqa: N815
+ icon: str
+ artist: str
+ text: str
+ params: MenuItemParams
+ type: str
+ actions: dict # optional
+
+
+def menu_item_from_queue_item(
+ mass: MusicAssistant, queue_item: QueueItem, index: int = 0, is_cur_index: bool = False
+) -> SlimMenuItem:
+ """Parse SlimMenuItem from MA QueueItem."""
if queue_item.media_item:
# media item
- media_details = get_media_details_from_mass(mass, queue_item.media_item)
+ media_details = menu_item_from_media_item(mass, queue_item.media_item)
+ media_details["params"]["playlist_index"] = index
else:
# fallback/generic queue item
- media_details = {
- "text": queue_item.name,
- "style": "itemplay",
- "trackType": "radio",
- "icon": mass.metadata.get_image_url(queue_item.image, 512) if queue_item.image else "",
- "params": {
+ media_details = SlimMenuItem(
+ style="itemplay",
+ track=queue_item.name,
+ album="",
+ trackType="radio",
+ icon=mass.metadata.get_image_url(queue_item.image, 512) if queue_item.image else "",
+ artist="",
+ text=queue_item.name,
+ params={
"playlist_index": index,
"item_id": queue_item.queue_item_id,
"uri": queue_item.uri,
},
- }
+ type=queue_item.media_type,
+ )
if (
is_cur_index
and queue_item.streamdetails
media_details["track"] = track
media_details["album"] = queue_item.name
media_details["text"] = f"{track}\n{artist} - {queue_item.name}"
- # remove default item actions
- media_details.pop("actions")
- media_details["params"]["playlist_index"] = index
return media_details
-class SlimMediaItem(TypedDict):
- """Representation of MediaItem details."""
-
- style: str
- track: str
- album: str
- trackType: str # noqa: N815
- icon: str
- artist: str
- text: str
- params: dict
- type: str
- actions: dict
-
-
-def get_media_details_from_mass(mass: MusicAssistant, media_item: MediaItemType) -> SlimMediaItem:
- """Get media item details formatted to display on Squeezebox hardware."""
+def menu_item_from_media_item(
+ mass: MusicAssistant, media_item: MediaItemType, include_actions: bool = False
+) -> PlaylistItem:
+ """Parse (menu) MediaItem from MA MediaItem."""
if media_item.media_type == MediaType.TRACK:
# track with all metadata
artist = media_item.artists[0].name if media_item.artists else ""
"player": 0,
"cmd": ["browselibrary", "items"],
}
- details = SlimMediaItem(
+ details = SlimMenuItem(
track=title,
album=album,
trackType="radio",
icon=image_url,
artist=artist,
text=text,
- params={"item_id": media_item.item_id, "uri": media_item.uri},
+ params={
+ "track_id": media_item.item_id,
+ "item_id": media_item.item_id,
+ "uri": media_item.uri,
+ },
type=media_item.media_type.value,
- actions={
+ )
+ # optionally include actions
+ if include_actions:
+ details["actions"] = {
"go": go_action,
"add": {
"player": 0,
"cmd": ["playlistcontrol"],
"nextWindow": "refresh",
},
- },
- )
+ }
if media_item.media_type in (MediaType.TRACK, MediaType.RADIO):
details["style"] = "itemplay"
details["nextWindow"] = "nowPlaying"
"can_seek": int,
"signalstrength": int,
"rate": int,
+ "uuid": str,
"playlist_tracks": int,
"item_loop": list[PlaylistItem],
- "uuid": str,
},
)
-def player_status_from_mass(
- mass: MusicAssistant,
- player: Player,
- queue: PlayerQueue,
- queue_items: list[QueueItem],
- offset: int | str,
- presets: list[tuple[int, SlimMediaItem]],
-) -> PlayerStatusResponse:
- """Parse PlayerStatusResponse for the Json RPC interface from MA info."""
- if queue.current_item:
- cur_item = playlist_item_from_mass(mass, queue.current_item, queue.current_index, True)
- remote_meta = {
- **cur_item,
- "id": cur_item["params"]["item_id"],
- "title": cur_item["text"],
- "artwork_url": cur_item["icon"],
- "coverid": cur_item["params"]["item_id"],
- "remote": 1,
- }
- else:
- remote_meta = None
- # handle preset data
- preset_data: list[dict] = []
- preset_loop: list[int] = []
- for _, media_item in presets:
- preset_data.append(
- {
- "URL": media_item["params"]["uri"],
- "text": media_item["track"],
- "type": "audio",
- }
- )
- preset_loop.append(1)
- while len(preset_loop) < 10:
- preset_data.append({})
- preset_loop.append(0)
- return {
- "alarm_next": 0,
- "playlist repeat": REPEATMODE_MAP[queue.repeat_mode],
- "signalstrength": 0,
- "remoteMeta": remote_meta,
- "rate": 1,
- "player_name": player.display_name,
- "preset_loop": preset_loop,
- "mode": PLAYMODE_MAP[queue.state],
- "playlist_cur_index": queue.current_index,
- "playlist shuffle": int(queue.shuffle_enabled),
- "time": queue.elapsed_time,
- "alarm_version": 2,
- "mixer volume": player.volume_level,
- "player_connected": int(player.available),
- "sync_slaves": ",".join(player.group_childs),
- "playlist_tracks": queue.items,
- # "count": queue.items,
- # some players have trouble grabbing a very large list so limit it for now
- "count": len(queue_items),
- "base": {"actions": {}},
- "seq_no": player.extra_data.get("seq_no", 0),
- "player_ip": player.device_info.address,
- "alarm_state": "none",
- "duration": queue.current_item.duration if queue.current_item else 0,
- "alarm_snooze_seconds": 540,
- "digital_volume_control": 1,
- "power": int(player.powered),
- "playlist_timestamp": queue.elapsed_time_last_updated,
- "offset": offset,
- "can_seek": 1,
- "alarm_timeout_seconds": 3600,
- "current_title": None,
- "remote": 1,
- "preset_data": preset_data,
- "playlist mode": "off",
- "item_loop": [
- playlist_item_from_mass(
- mass,
- item,
- queue.current_index + index,
- queue.current_index == (queue.current_index + index),
- )
- for index, item in enumerate(queue_items)
- ],
- }
-
-
ServerStatusResponse = TypedDict(
"ServerStatusMessage",
{