CONF_KEY_PROVIDERS,
CONF_NAME,
EVENT_CONFIG_CHANGED,
+ CONF_CROSSFADE_DURATION,
+ CONF_FALLBACK_GAIN_CORRECT
)
# from music_assistant.mass import MusicAssistant
depends_on="volume_normalisation"
),
ConfigEntry(
- entry_key="fallback_gain_correct",
+ entry_key=CONF_FALLBACK_GAIN_CORRECT,
entry_type=ConfigEntryType.INT,
range=(-20, 0),
default_value=-12,
- description_key="fallback_gain_correct",
+ description_key=CONF_FALLBACK_GAIN_CORRECT,
depends_on="volume_normalisation"
),
ConfigEntry(
- entry_key="crossfade_duration",
+ entry_key=CONF_CROSSFADE_DURATION,
entry_type=ConfigEntryType.INT,
range=(0, 10),
default_value=0,
- description_key="crossfade_duration",
+ description_key=CONF_CROSSFADE_DURATION,
),
]
CONF_TOKEN = "token"
CONF_URL = "url"
CONF_NAME = "name"
+CONF_CROSSFADE_DURATION = "crossfade_duration"
+CONF_FALLBACK_GAIN_CORRECT = "fallback_gain_correct"
+
CONF_KEY_BASE = "base"
help_key: Optional[str] = None # key in the translations file
multi_value: bool = False # allow multiple values from the list
depends_on: str = "" # entry_key that needs to be set before this setting shows up in frontend
+ hidden: bool = False # hide from UI
value: Optional[Any] = None # set by the configuration manager
class DeviceInfo:
"""Model for a player's deviceinfo."""
- model: Optional[str]
- address: Optional[str]
- manufacturer: Optional[str]
+ model: Optional[str] = ""
+ address: Optional[str] = ""
+ manufacturer: Optional[str] = ""
class PlayerFeature(int, Enum):
--- /dev/null
+"""Squeezebox emulated player provider."""
+
+import asyncio
+import decimal
+import logging
+import os
+import random
+import socket
+import struct
+import sys
+import time
+from collections import OrderedDict
+from typing import List
+
+from music_assistant.constants import CONF_CROSSFADE_DURATION
+from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
+from music_assistant.models.player import DeviceInfo, Player, PlayerFeature, PlayerState
+from music_assistant.models.player_queue import QueueItem
+from music_assistant.models.playerprovider import PlayerProvider
+from music_assistant.utils import get_hostname, get_ip, run_periodic, try_parse_int
+
+from .constants import PROV_ID, PROV_NAME
+from .discovery import DiscoveryProtocol
+from .socket_client import Event, SqueezeSocketClient
+
+CONF_LAST_POWER = "last_power"
+CONF_LAST_VOLUME = "last_volume"
+
+LOGGER = logging.getLogger(PROV_ID)
+
+CONFIG_ENTRIES = []
+PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS]
+PLAYER_CONFIG_ENTRIES = [
+ ConfigEntry(entry_key=CONF_LAST_POWER, entry_type=ConfigEntryType.BOOL, hidden=True),
+ ConfigEntry(entry_key=CONF_LAST_VOLUME, entry_type=ConfigEntryType.INT, hidden=True),
+]
+
+
+async def async_setup(mass):
+ """Perform async setup of this Plugin/Provider."""
+ prov = PySqueezeProvider()
+ await mass.async_register_provider(prov)
+
+
+class PySqueezeProvider(PlayerProvider):
+ """Python implementation of SlimProto server"""
+
+ _socket_clients = {}
+ _tasks = []
+
+ @property
+ def id(self) -> str:
+ """Return provider ID for this provider."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return provider Name for this provider."""
+ return PROV_NAME
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return Config Entries for this provider."""
+ return CONFIG_ENTRIES
+
+ async def async_on_start(self) -> bool:
+ """Called on startup. Handle initialization of the provider."""
+ # start slimproto server
+ self._tasks.append(
+ self.mass.add_job(asyncio.start_server(self.__async_client_connected, "0.0.0.0", 3483))
+ )
+ # setup discovery
+ self._tasks.append(self.mass.add_job(self.async_start_discovery()))
+
+ async def async_on_stop(self):
+ """Called on shutdown. Handle correct close/cleanup of the provider on exit."""
+ for task in self._tasks:
+ task.cancel()
+ for client in self._socket_clients.values():
+ client.close()
+
+ async def async_cmd_play_uri(self, player_id: str, uri: str):
+ """
+ Play the specified uri/url on the goven player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ crossfade = self.mass.config.player_settings[player_id][CONF_CROSSFADE_DURATION]
+ await socket_client.async_cmd_play_uri(
+ uri, send_flush=True, crossfade_duration=crossfade
+ )
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_stop(self, player_id: str):
+ """
+ Send STOP command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_stop()
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_play(self, player_id: str):
+ """
+ Send STOP command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_play()
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_pause(self, player_id: str):
+ """
+ Send PAUSE command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_pause()
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_next(self, player_id: str):
+ """
+ Send NEXT TRACK command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ queue = self.mass.player_manager.get_player_queue(player_id)
+ if queue:
+ new_track = queue.get_item(queue.cur_index + 1)
+ if new_track:
+ await self.async_cmd_play_uri(player_id, new_track.uri)
+
+ async def async_cmd_previous(self, player_id: str):
+ """
+ Send PREVIOUS TRACK command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ queue = self.mass.player_manager.get_player_queue(player_id)
+ if queue:
+ new_track = queue.get_item(queue.cur_index - 1)
+ if new_track:
+ await self.async_cmd_play_uri(player_id, new_track.uri)
+
+ async def async_cmd_power_on(self, player_id: str):
+ """
+ Send POWER ON command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_power(True)
+ # store last power state as we need it when the player (re)connects
+ self.mass.config.player_settings[player_id][CONF_LAST_POWER] = True
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_power_off(self, player_id: str):
+ """
+ Send POWER OFF command to given player.
+ :param player_id: player_id of the player to handle the command.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_power(False)
+ # store last power state as we need it when the player (re)connects
+ self.mass.config.player_settings[player_id][CONF_LAST_POWER] = False
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ """
+ Send volume level command to given player.
+ :param player_id: player_id of the player to handle the command.
+ :param volume_level: volume level to set (0..100).
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_volume_set(volume_level)
+ # store last volume state as we need it when the player (re)connects
+ self.mass.config.player_settings[player_id][CONF_LAST_VOLUME] = volume_level
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_volume_mute(self, player_id: str, is_muted=False):
+ """
+ Send volume MUTE command to given player.
+ :param player_id: player_id of the player to handle the command.
+ :param is_muted: bool with new mute state.
+ """
+ socket_client = self._socket_clients.get(player_id)
+ if socket_client:
+ await socket_client.async_cmd_mute(is_muted)
+ else:
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
+
+ async def async_cmd_queue_play_index(self, player_id: str, index: int):
+ """
+ Play item at index X on player's queue
+ :param player_id: player_id of the player to handle the command.
+ :param index: (int) index of the queue item that should start playing
+ """
+ queue = self.mass.player_manager.get_player_queue(player_id)
+ if queue:
+ new_track = queue.get_item(index)
+ if new_track:
+ await self.async_cmd_play_uri(player_id, new_track.uri)
+
+ async def async_cmd_queue_load(self, player_id: str, queue_items: List[QueueItem]):
+ """
+ Load/overwrite given items in the player's queue implementation
+ :param player_id: player_id of the player to handle the command.
+ :param queue_items: a list of QueueItems
+ """
+ if queue_items:
+ await self.async_cmd_play_uri(player_id, queue_items[0].uri)
+
+ async def async_cmd_queue_insert(
+ self, player_id: str, queue_items: List[QueueItem], insert_at_index: int
+ ):
+ """
+ Insert new items at position X into existing queue.
+ If insert_at_index 0 or None, will start playing newly added item(s)
+ :param player_id: player_id of the player to handle the command.
+ :param queue_items: a list of QueueItems
+ :param insert_at_index: queue position to insert new items
+ """
+ # queue handled by built-in queue controller
+ # we only check the start index
+ queue = self.mass.player_manager.get_player_queue(player_id)
+ if queue and insert_at_index == queue.cur_index:
+ return await self.async_cmd_queue_play_index(player_id, insert_at_index)
+
+ async def async_cmd_queue_append(self, player_id: str, queue_items: List[QueueItem]):
+ """
+ Append new items at the end of the queue.
+ :param player_id: player_id of the player to handle the command.
+ :param queue_items: a list of QueueItems
+ """
+ pass # automagically handled by built-in queue controller
+
+ async def async_cmd_queue_update(self, player_id: str, queue_items: List[QueueItem]):
+ """
+ Overwrite the existing items in the queue, used for reordering.
+ :param player_id: player_id of the player to handle the command.
+ :param queue_items: a list of QueueItems
+ """
+ pass # automagically handled by built-in queue controller
+
+ async def async_cmd_queue_clear(self, player_id: str):
+ """
+ Clear the player's queue.
+ :param player_id: player_id of the player to handle the command.
+ """
+ # queue is handled by built-in queue controller but send stop
+ return await self.async_cmd_stop(player_id)
+
+ async def async_start_discovery(self):
+ transport, protocol = await self.mass.loop.create_datagram_endpoint(
+ lambda: DiscoveryProtocol(self.mass.web.http_port),
+ local_addr=("0.0.0.0", 3483),
+ )
+ try:
+ while True:
+ await asyncio.sleep(60) # serve forever
+ finally:
+ transport.close()
+
+ async def __async_client_connected(self, reader, writer):
+ """Handle a client connection on the socket."""
+ addr = writer.get_extra_info("peername")
+ LOGGER.debug("New socket client connected: %s", addr)
+ SqueezeSocketClient(reader, writer, self.__client_event)
+
+ async def __client_event(self, event: str, socket_client: SqueezeSocketClient):
+ """Received event from a socket client."""
+ if event == Event.EVENT_CONNECTED:
+ # set some attributes to make the socket client compatible with mass player format
+ socket_client.should_poll = False
+ socket_client.provider_id = PROV_ID
+ socket_client.available = True
+ socket_client.is_group_player = False
+ socket_client.group_childs = []
+ socket_client.device_info = DeviceInfo(
+ model=socket_client.device_type, address=socket_client.device_address
+ )
+ socket_client.features = PLAYER_FEATURES
+ socket_client.config_entries = PLAYER_CONFIG_ENTRIES
+ # restore power/volume states
+ conf = self.mass.config.player_settings[socket_client.player_id]
+ last_volume = conf.get(CONF_LAST_VOLUME, 40)
+ await socket_client.async_cmd_volume_set(last_volume)
+ last_power = conf.get(CONF_LAST_POWER, False)
+ await socket_client.async_cmd_power(last_power)
+ await self.mass.player_manager.async_add_player(socket_client)
+ self._socket_clients[socket_client.player_id] = socket_client
+ elif event == Event.EVENT_UPDATED:
+ await self.mass.player_manager.async_update_player(socket_client)
+ elif event == Event.EVENT_DISCONNECTED:
+ await self.mass.player_manager.async_remove_player(socket_client.player_id)
+ self._socket_clients.pop(socket_client.player_id)
+ elif event == Event.EVENT_DECODER_READY:
+ # player is ready for the next track (if any)
+ player_id = socket_client.player_id
+ queue = self.mass.player_manager.get_player_queue(socket_client.player_id)
+ if queue:
+ next_item = queue.next_item
+ if next_item:
+ crossfade = self.mass.config.player_settings[player_id][CONF_CROSSFADE_DURATION]
+ await self._socket_clients[player_id].async_cmd_play_uri(
+ next_item.uri, send_flush=False, crossfade_duration=crossfade
+ )
--- /dev/null
+"""Constants for Squeezebox emulation."""
+
+PROV_ID = "squeezebox"
+PROV_NAME = "Squeezebox emulation"
\ No newline at end of file
--- /dev/null
+"""Squeezebox emulation discovery implementation."""
+
+from collections import OrderedDict
+import socket
+import logging
+import struct
+
+from music_assistant.utils import (
+ get_hostname,
+ get_ip
+)
+
+LOGGER = logging.getLogger("squeezebox")
+
+class Datagram():
+ """Description of a discovery datagram."""
+ @classmethod
+ def decode(self, data):
+ if data[0] == "e":
+ return TLVDiscoveryRequestDatagram(data)
+ elif data[0] == "E":
+ return TLVDiscoveryResponseDatagram(data)
+ elif data[0] == "d":
+ return ClientDiscoveryDatagram(data)
+ elif data[0] == "h":
+ pass # Hello!
+ elif data[0] == "i":
+ pass # IR
+ elif data[0] == "2":
+ pass # i2c?
+ elif data[0] == "a":
+ pass # ack!
+
+
+class ClientDiscoveryDatagram(Datagram):
+ """Description of a client discovery datagram."""
+
+ device = None
+ firmware = None
+ client = None
+
+ def __init__(self, data):
+ s = struct.unpack("!cxBB8x6B", data.encode())
+ assert s[0] == "d"
+ self.device = s[1]
+ self.firmware = hex(s[2])
+ self.client = ":".join(["%02x" % (x,) for x in s[3:]])
+
+ def __repr__(self):
+ return "<%s device=%r firmware=%r client=%r>" % (
+ self.__class__.__name__,
+ self.device,
+ self.firmware,
+ self.client,
+ )
+
+
+class DiscoveryResponseDatagram(Datagram):
+ """Description of a discovery response datagram."""
+ def __init__(self, hostname, port):
+ hostname = hostname[:16].encode("UTF-8")
+ hostname += (16 - len(hostname)) * "\x00"
+ self.packet = struct.pack("!c16s", "D", hostname).decode()
+
+
+class TLVDiscoveryRequestDatagram(Datagram):
+ """Description of a discovery request datagram."""
+ def __init__(self, data):
+ requestdata = OrderedDict()
+ assert data[0] == "e"
+ idx = 1
+ length = len(data) - 5
+ while idx <= length:
+ typ, l = struct.unpack_from("4sB", data.encode(), idx)
+ if l:
+ val = data[idx + 5 : idx + 5 + l]
+ idx += 5 + l
+ else:
+ val = None
+ idx += 5
+ typ = typ.decode()
+ requestdata[typ] = val
+ self.data = requestdata
+
+ def __repr__(self):
+ return "<%s data=%r>" % (self.__class__.__name__, self.data.items())
+
+
+class TLVDiscoveryResponseDatagram(Datagram):
+ """Description of a TLV discovery response datagram."""
+ def __init__(self, responsedata):
+ parts = ["E"] # new discovery format
+ for typ, value in responsedata.items():
+ if value is None:
+ value = ""
+ elif len(value) > 255:
+ # Response too long, truncating to 255 bytes
+ value = value[:255]
+ parts.extend((typ, chr(len(value)), value))
+ self.packet = "".join(parts)
+
+
+class DiscoveryProtocol:
+ """Description of a discovery protocol."""
+ def __init__(self, web_port):
+ self.web_port = web_port
+
+ def connection_made(self, transport):
+ self.transport = transport
+ # Allow receiving multicast broadcasts
+ sock = self.transport.get_extra_info("socket")
+ group = socket.inet_aton("239.255.255.250")
+ mreq = struct.pack("4sL", group, socket.INADDR_ANY)
+ sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+ def error_received(self, exc):
+ LOGGER.error(exc)
+
+ def connection_lost(self, *args, **kwargs):
+ LOGGER.debug("Connection lost to discovery")
+
+ def build_TLV_response(self, requestdata):
+ responsedata = OrderedDict()
+ for typ, value in requestdata.items():
+ if typ == "NAME":
+ # send full host name - no truncation
+ value = get_hostname()
+ elif typ == "IPAD":
+ # send ipaddress as a string only if it is set
+ value = get_ip()
+ # :todo: IPv6
+ if value == "0.0.0.0":
+ # do not send back an ip address
+ typ = None
+ elif typ == "JSON":
+ # send port as a string
+ json_port = self.web_port
+ value = str(json_port)
+ elif typ == "VERS":
+ # send server version
+ value = "7.9"
+ elif typ == "UUID":
+ # send server uuid
+ value = "musicassistant"
+ else:
+ LOGGER.debug("Unexpected information request: %r", typ)
+ typ = None
+ if typ:
+ responsedata[typ] = value
+ return responsedata
+
+ def datagram_received(self, data, addr):
+ try:
+ data = data.decode()
+ dgram = Datagram.decode(data)
+ if isinstance(dgram, ClientDiscoveryDatagram):
+ self.sendDiscoveryResponse(addr)
+ elif isinstance(dgram, TLVDiscoveryRequestDatagram):
+ resonsedata = self.build_TLV_response(dgram.data)
+ self.sendTLVDiscoveryResponse(resonsedata, addr)
+ except Exception as exc:
+ LOGGER.exception(exc)
+
+ def sendDiscoveryResponse(self, addr):
+ dgram = DiscoveryResponseDatagram(get_hostname(), 3483)
+ self.transport.sendto(dgram.packet.encode(), addr)
+
+ def sendTLVDiscoveryResponse(self, resonsedata, addr):
+ dgram = TLVDiscoveryResponseDatagram(resonsedata)
+ self.transport.sendto(dgram.packet.encode(), addr)
--- /dev/null
+"""Socketclient implementation for Squeezebox emulated player provider."""
+
+import asyncio
+import decimal
+import logging
+import os
+import random
+import re
+import socket
+import struct
+import sys
+import time
+from collections import OrderedDict
+from enum import Enum
+from typing import Awaitable, List, Tuple
+
+from music_assistant.utils import (
+ callback,
+ get_hostname,
+ get_ip,
+ run_periodic,
+ try_parse_int,
+)
+
+from .constants import PROV_ID
+
+LOGGER = logging.getLogger(PROV_ID)
+
+# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO
+DEVICE_TYPE = {
+ 2: "squeezebox",
+ 3: "softsqueeze",
+ 4: "squeezebox2",
+ 5: "transporter",
+ 6: "softsqueeze3",
+ 7: "receiver",
+ 8: "squeezeslave",
+ 9: "controller",
+ 10: "boom",
+ 11: "softboom",
+ 12: "squeezeplay",
+}
+
+
+class State(str, Enum):
+ """Enum for the playstate of a squeezebox player."""
+
+ Stopped = "stopped"
+ Paused = "paused"
+ Playing = "playing"
+
+
+class Event(Enum):
+ """Enum with the various events the socket client fires."""
+
+ EVENT_CONNECTED = "connected"
+ EVENT_DISCONNECTED = "disconnected"
+ EVENT_UPDATED = "updated"
+ EVENT_DECODER_READY = "decoder_ready"
+
+
+class SqueezeSocketClient:
+ """Squeezebox socket client"""
+
+ def __init__(
+ self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+ event_callback: Awaitable = None,
+ ):
+ """Initialize the socket client."""
+ self._reader = reader
+ self._writer = writer
+ self._event_callback = event_callback
+ self._player_id = ""
+ self._device_type = ""
+ self._device_name = ""
+ self._last_volume = 0
+ self._last_heartbeat = 0
+ self._cur_time_milliseconds = 0
+ self._volume_control = PySqueezeVolume()
+ self._volume_level = 0
+ self._powered = False
+ self._muted = False
+ self._state = State.Stopped
+ self._elapsed_time = 0
+ self._current_uri = ""
+ self._tasks = [
+ asyncio.create_task(self.__async_socket_reader()),
+ asyncio.create_task(self.__async_send_heartbeat()),
+ ]
+
+ def close(self):
+ """Cleanup when the socket client needs to closed."""
+ for task in self._tasks:
+ if not task.cancelled():
+ task.cancel()
+
+ @property
+ def player_id(self) -> str:
+ """Return player_id (=mac address) of the player."""
+ return self._player_id
+
+ @property
+ def device_type(self) -> str:
+ """Return device type of the player."""
+ return self._device_type
+
+ @property
+ def device_address(self) -> str:
+ """Return device IP address of the player"""
+ dev_address = self._writer.get_extra_info("peername")
+ return dev_address[0] if dev_address else ""
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ if self._device_name:
+ return self._device_name
+ return f"{self.device_type}: {self.player_id}"
+
+ @property
+ def volume_level(self):
+ """Return current volume level of player."""
+ return self._volume_level
+
+ @property
+ def powered(self):
+ """Return current power state of player."""
+ return self._powered
+
+ @property
+ def muted(self):
+ """Return current mute state of player."""
+ return self._muted
+
+ @property
+ def state(self):
+ """Return current state of player."""
+ return self._state
+
+ @property
+ def elapsed_time(self):
+ """Return elapsed_time of current playing track."""
+ return self._elapsed_time
+
+ @property
+ def current_uri(self):
+ """Return uri of currently loaded track."""
+ return self._current_uri
+
+ async def __async_initialize_player(self):
+ """Set some startup settings for the player."""
+ # send version
+ await self.__async_send_frame(b"vers", b"7.8")
+ await self.__async_send_frame(b"setd", struct.pack("B", 0))
+ await self.__async_send_frame(b"setd", struct.pack("B", 4))
+
+ async def async_cmd_stop(self):
+ """Send stop command to player."""
+ data = self.__pack_stream(b"q", autostart=b"0", flags=0)
+ await self.__async_send_frame(b"strm", data)
+
+ async def async_cmd_play(self):
+ """Send play (unpause) command to player."""
+ data = self.__pack_stream(b"u", autostart=b"0", flags=0)
+ await self.__async_send_frame(b"strm", data)
+
+ async def async_cmd_pause(self):
+ """Send pause command to player."""
+ data = self.__pack_stream(b"p", autostart=b"0", flags=0)
+ await self.__async_send_frame(b"strm", data)
+
+ async def async_cmd_power(self, powered: bool = True):
+ """Send power command to player."""
+ # power is not supported so abuse mute instead
+ power_int = 1 if powered else 0
+ await self.__async_send_frame(b"aude", struct.pack("2B", power_int, 1))
+ self._powered = powered
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ async def async_cmd_volume_set(self, volume_level: int):
+ """Send new volume level command to player."""
+ self._volume_control.volume = volume_level
+ og = self._volume_control.old_gain()
+ ng = self._volume_control.new_gain()
+ await self.__async_send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng))
+ self._volume_level = volume_level
+
+ async def async_cmd_mute(self, muted: bool = False):
+ """Send mute command to player."""
+ muted_int = 0 if muted else 1
+ await self.__async_send_frame(b"aude", struct.pack("2B", muted_int, 0))
+ self.muted = muted
+
+ async def async_cmd_play_uri(
+ self, uri: str, send_flush: bool = True, crossfade_duration: int = 0
+ ):
+ """Request player to start playing a single uri."""
+ if send_flush:
+ data = self.__pack_stream(b"f", autostart=b"0", flags=0)
+ await self.__async_send_frame(b"strm", data)
+ self._current_uri = uri
+ self._powered = True
+ enable_crossfade = crossfade_duration > 0
+ command = b"s"
+ autostart = (
+ b"3" # we use direct stream for now so let the player do the messy work with buffers
+ )
+ trans_type = b"1" if enable_crossfade else b"0"
+ formatbyte = b"f" # fixed to flac
+ uri = "/stream" + uri.split("/stream")[1]
+ data = self.__pack_stream(
+ command,
+ autostart=autostart,
+ flags=0x00,
+ formatbyte=formatbyte,
+ transType=trans_type,
+ transDuration=crossfade_duration,
+ )
+ # extract host and port from uri
+ regex = "(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*"
+ regex_result = re.search(regex, uri)
+ host = regex_result.group("host") # 'www.abc.com'
+ port = regex_result.group("port") # '123'
+ if not port and uri.startswith("https"):
+ port = 443
+ elif not port:
+ port = 80
+ headers = f"Connection: close\r\nAccept: */*\r\nHost: {host}:{port}\r\n"
+ request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers)
+ data = data + request.encode("utf-8")
+ await self.__async_send_frame(b"strm", data)
+
+ @run_periodic(5)
+ async def __async_send_heartbeat(self):
+ """Send periodic heartbeat message to player."""
+ timestamp = int(time.time())
+ data = self.__pack_stream(b"t", replayGain=timestamp, flags=0)
+ await self.__async_send_frame(b"strm", data)
+
+ async def __async_send_frame(self, command, data):
+ """Send command to Squeeze player."""
+ packet = struct.pack("!H", len(data) + 4) + command + data
+ self._writer.write(packet)
+ await self._writer.drain()
+
+ async def __async_socket_reader(self):
+ """Handle incoming data from socket."""
+ buffer = b""
+ # keep reading bytes from the socket
+ while not self._reader.at_eof():
+ data = await self._reader.read(64)
+ # handle incoming data from socket
+ buffer = buffer + data
+ del data
+ if len(buffer) > 8:
+ # construct operation and
+ operation, length = buffer[:4], buffer[4:8]
+ plen = struct.unpack("!I", length)[0] + 8
+ if len(buffer) >= plen:
+ packet, buffer = buffer[8:plen], buffer[plen:]
+ operation = operation.strip(b"!").strip().decode().lower()
+ handler = getattr(self, f"_process_{operation}", None)
+ if handler is None:
+ LOGGER.warning("No handler for %s" % operation)
+ else:
+ handler(packet)
+ # EOF reached: socket is disconnected
+ LOGGER.info("Socket disconnected: %s", self._writer.get_extra_info("peername"))
+ self.close()
+ asyncio.create_task(self._event_callback(Event.EVENT_DISCONNECTED, self))
+
+ @callback
+ def __pack_stream(
+ self,
+ command,
+ autostart=b"1",
+ formatbyte=b"o",
+ pcmargs=(b"?", b"?", b"?", b"?"),
+ threshold=200,
+ spdif=b"0",
+ transDuration=0,
+ transType=b"0",
+ flags=0x40,
+ outputThreshold=0,
+ replayGain=0,
+ serverPort=8095,
+ serverIp=0,
+ ):
+ """Create stream request message based on given arguments."""
+ return struct.pack(
+ "!cccccccBcBcBBBLHL",
+ command,
+ autostart,
+ formatbyte,
+ *pcmargs,
+ threshold,
+ spdif,
+ transDuration,
+ transType,
+ flags,
+ outputThreshold,
+ 0,
+ replayGain,
+ serverPort,
+ serverIp,
+ )
+
+ @callback
+ def _process_helo(self, data):
+ """Process incoming HELO event from player (player connected)."""
+ # player connected
+ (dev_id, rev, mac) = struct.unpack("BB6s", data[:8])
+ device_mac = ":".join("%02x" % x for x in mac)
+ self._player_id = str(device_mac).lower()
+ self._device_type = DEVICE_TYPE.get(dev_id, "unknown device")
+ LOGGER.info("Player connected: %s", self.name)
+ asyncio.create_task(self.__async_initialize_player())
+ asyncio.create_task(self._event_callback(Event.EVENT_CONNECTED, self))
+
+ @callback
+ def _process_stat(self, data):
+ """Redirect incoming STAT event from player to correct method."""
+ event = data[:4].decode()
+ event_data = data[4:]
+ if event == b"\x00\x00\x00\x00":
+ # Presumed informational stat message
+ return
+ event_handler = getattr(self, "_process_stat_%s" % event.lower(), None)
+ if event_handler is None:
+ LOGGER.debug("Unhandled event: %s - event_data: %s", event, event_data)
+ else:
+ event_handler(data[4:])
+
+ @callback
+ def _process_stat_aude(self, data):
+ """Process incoming stat AUDe message (power level and mute)."""
+ LOGGER.debug("AUDe received (spdif_enable, dac_enable): %s", data)
+ (spdif_enable, dac_enable) = struct.unpack("2B", data[:4])
+ powered = spdif_enable or dac_enable
+ self._powered = powered
+ self._muted = not powered
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_audg(self, data):
+ """Process incoming stat AUDg message (volume level)."""
+ # TODO: process volume level
+ LOGGER.debug("AUDg received - Volume level: %s", data)
+ self._volume_level = self._volume_control.volume
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_stmd(self, data):
+ """Process incoming stat STMd message (decoder ready)."""
+ LOGGER.debug("STMu received - Decoder Ready for next track: %s", data)
+ asyncio.create_task(self._event_callback(Event.EVENT_DECODER_READY, self))
+
+ @callback
+ def _process_stat_stmf(self, data):
+ """Process incoming stat STMf message (connection closed)."""
+ LOGGER.debug("STMf received - connection closed: %s", data)
+ self._state = State.Stopped
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_stmo(self, data):
+ """Process incoming stat STMo message:
+ No more decoded (uncompressed) data to play; triggers rebuffering."""
+ LOGGER.debug("STMo received - output underrun: %s", data)
+ LOGGER.debug("Output Underrun")
+
+ @callback
+ def _process_stat_stmp(self, data):
+ """Process incoming stat STMp message: Pause confirmed."""
+ LOGGER.debug("STMp received - pause confirmed: %s", data)
+ self._state = State.Paused
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_stmr(self, data):
+ """Process incoming stat STMr message: Resume confirmed."""
+ LOGGER.debug("STMr received - resume confirmed: %s", data)
+ self._state = State.Playing
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_stms(self, data):
+ """Process incoming stat STMs message: Playback of new track has started."""
+ LOGGER.debug("STMs received - playback of new track has started: %s", data)
+ self._state = State.Playing
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_stmt(self, data):
+ """Process incoming stat STMt message: heartbeat from client"""
+ # pylint: disable=unused-variable
+ timestamp = time.time()
+ self._last_heartbeat = timestamp
+ (
+ num_crlf,
+ mas_initialized,
+ mas_mode,
+ rptr,
+ wptr,
+ bytes_received_h,
+ bytes_received_l,
+ signal_strength,
+ jiffies,
+ output_buffer_size,
+ output_buffer_fullness,
+ elapsed_seconds,
+ voltage,
+ elapsed_milliseconds,
+ server_timestamp,
+ error_code,
+ ) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
+ if self._state == State.Playing and elapsed_seconds != self._elapsed_time:
+ self._elapsed_time = elapsed_seconds
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_stat_stmu(self, data):
+ """Process incoming stat STMu message: Buffer underrun: Normal end of playback."""
+ LOGGER.debug("STMu received - end of playback: %s", data)
+ self.state = State.Stopped
+ asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
+
+ @callback
+ def _process_resp(self, data):
+ """Process incoming RESP message: Response received at player."""
+ LOGGER.debug("RESP received: %s", data)
+ # send continue
+ asyncio.create_task(self.__async_send_frame(b"cont", b"0"))
+
+ @callback
+ def _process_setd(self, data):
+ """Process incoming SETD message: Get/set player firmware settings."""
+ LOGGER.debug("SETD received %s", data)
+ cmd_id = data[0]
+ if cmd_id == 0:
+ # received player name
+ data = data[1:].decode()
+ self._device_name = data
+
+
+class PySqueezeVolume(object):
+
+ """Represents a sound volume. This is an awful lot more complex than it
+ sounds."""
+
+ minimum = 0
+ maximum = 100
+ step = 1
+
+ # this map is taken from Slim::Player::Squeezebox2 in the squeezecenter source
+ # i don't know how much magic it contains, or any way I can test it
+ old_map = [
+ 0,
+ 1,
+ 1,
+ 1,
+ 2,
+ 2,
+ 2,
+ 3,
+ 3,
+ 4,
+ 5,
+ 5,
+ 6,
+ 6,
+ 7,
+ 8,
+ 9,
+ 9,
+ 10,
+ 11,
+ 12,
+ 13,
+ 14,
+ 15,
+ 16,
+ 16,
+ 17,
+ 18,
+ 19,
+ 20,
+ 22,
+ 23,
+ 24,
+ 25,
+ 26,
+ 27,
+ 28,
+ 29,
+ 30,
+ 32,
+ 33,
+ 34,
+ 35,
+ 37,
+ 38,
+ 39,
+ 40,
+ 42,
+ 43,
+ 44,
+ 46,
+ 47,
+ 48,
+ 50,
+ 51,
+ 53,
+ 54,
+ 56,
+ 57,
+ 59,
+ 60,
+ 61,
+ 63,
+ 65,
+ 66,
+ 68,
+ 69,
+ 71,
+ 72,
+ 74,
+ 75,
+ 77,
+ 79,
+ 80,
+ 82,
+ 84,
+ 85,
+ 87,
+ 89,
+ 90,
+ 92,
+ 94,
+ 96,
+ 97,
+ 99,
+ 101,
+ 103,
+ 104,
+ 106,
+ 108,
+ 110,
+ 112,
+ 113,
+ 115,
+ 117,
+ 119,
+ 121,
+ 123,
+ 125,
+ 127,
+ 128,
+ ]
+
+ # new gain parameters, from the same place
+ total_volume_range = -50 # dB
+ step_point = -1 # Number of steps, up from the bottom, where a 2nd volume ramp kicks in.
+ step_fraction = 1 # fraction of totalVolumeRange where alternate volume ramp kicks in.
+
+ def __init__(self):
+ self.volume = 50
+
+ def increment(self):
+ """Increment the volume"""
+ self.volume += self.step
+ if self.volume > self.maximum:
+ self.volume = self.maximum
+
+ def decrement(self):
+ """Decrement the volume"""
+ self.volume -= self.step
+ if self.volume < self.minimum:
+ self.volume = self.minimum
+
+ def old_gain(self):
+ """Return the "Old" gain value as required by the squeezebox"""
+ return self.old_map[self.volume]
+
+ def decibels(self):
+ """Return the "new" gain value."""
+
+ step_db = self.total_volume_range * self.step_fraction
+ max_volume_db = 0 # different on the boom?
+
+ # Equation for a line:
+ # y = mx+b
+ # y1 = mx1+b, y2 = mx2+b.
+ # y2-y1 = m(x2 - x1)
+ # y2 = m(x2 - x1) + y1
+ slope_high = max_volume_db - step_db / (100.0 - self.step_point)
+ slope_low = step_db - self.total_volume_range / (self.step_point - 0.0)
+ x2 = self.volume
+ if x2 > self.step_point:
+ m = slope_high
+ x1 = 100
+ y1 = max_volume_db
+ else:
+ m = slope_low
+ x1 = 0
+ y1 = self.total_volume_range
+ return m * (x2 - x1) + y1
+
+ def new_gain(self):
+ db = self.decibels()
+ floatmult = 10 ** (db / 20.0)
+ # avoid rounding errors somehow
+ if -30 <= db <= 0:
+ return int(floatmult * (1 << 8) + 0.5) * (1 << 8)
+ else:
+ return int((floatmult * (1 << 16)) + 0.5)
+++ /dev/null
-#!/usr/bin/env python3
-# -*- coding:utf-8 -*-
-
-import asyncio
-from collections import OrderedDict
-import decimal
-import os
-import random
-import socket
-import logging
-import struct
-import sys
-import time
-from typing import List
-
-from music_assistant.models.player import DeviceInfo, Player, PlayerFeature, PlayerState
-from music_assistant.models.player_queue import PlayerQueue, QueueItem
-from music_assistant.models.playerprovider import PlayerProvider
-from music_assistant.utils import (
- get_hostname,
- get_ip,
- run_periodic,
- try_parse_int,
-)
-
-PROV_ID = "squeezebox"
-PROV_NAME = "Squeezebox emulation"
-
-CONFIG_ENTRIES = []
-PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS]
-
-LOGGER = logging.getLogger(PROV_ID)
-
-async def async_setup(mass):
- """Perform async setup of this Plugin/Provider."""
- prov = PySqueezeProvider()
- await mass.async_register_provider(prov)
-
-class PySqueezeProvider(PlayerProvider):
- """Python implementation of SlimProto server"""
-
- @property
- def id(self) -> str:
- """Return provider ID for this provider."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return provider Name for this provider."""
- return PROV_NAME
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return Config Entries for this provider."""
- return CONFIG_ENTRIES
-
- async def async_on_start(self) -> bool:
- """Called on startup. Handle initialization of the provider based on config."""
- # start slimproto server
- self.mass.add_job(
- asyncio.start_server(self.__async_handle_socket_client, "0.0.0.0", 3483)
- )
- # setup discovery
- self.mass.add_job(self.async_start_discovery())
-
- async def async_start_discovery(self):
- transport, protocol = await self.mass.loop.create_datagram_endpoint(
- lambda: DiscoveryProtocol(self.mass.web.http_port),
- local_addr=("0.0.0.0", 3483),
- )
- try:
- while True:
- await asyncio.sleep(60) # serve forever
- finally:
- transport.close()
-
- async def __async_handle_socket_client(self, reader, writer):
- """handle a client connection on the socket"""
- buffer = b""
- player = None
- try:
- # keep reading bytes from the socket
- while True:
- data = await reader.read(64)
- if not data:
- # connection lost with client
- break
- # handle incoming data from socket
- buffer = buffer + data
- del data
- if len(buffer) > 8:
- operation, length = buffer[:4], buffer[4:8]
- plen = struct.unpack("!I", length)[0] + 8
- if len(buffer) >= plen:
- packet, buffer = buffer[8:plen], buffer[plen:]
- operation = operation.strip(b"!").strip().decode()
- if operation == "HELO":
- # player connected
- (dev_id, rev, mac) = struct.unpack("BB6s", packet[:8])
- device_mac = ":".join("%02x" % x for x in mac)
- player_id = str(device_mac).lower()
- device_type = devices.get(dev_id, "unknown device")
- player = PySqueezePlayer(
- self.mass, player_id, PROV_ID, device_type, writer
- )
- await self.mass.player_manager.add_player(player)
- elif player != None:
- await player.process_msg(operation, packet)
- except Exception as exc:
- # connection lost ?
- LOGGER.debug(exc)
- finally:
- # disconnect and cleanup
- if player:
- if player._heartbeat_task:
- player._heartbeat_task.cancel()
- await self.mass.player_manager.remove_player(player.player_id)
- self.mass.config.save()
-
-
-class PySqueezePlayer():
- """Squeezebox socket client"""
-
- def __init__(self, mass, player_id, prov_id, dev_type, writer):
- """Initialize the player."""
- self.player_id = player_id
- self.mass = mass
- self._writer = writer
- self.buffer = b""
- self.name = "%s - %s" % (dev_type, player_id)
- self._volume = PySqueezeVolume()
- self._last_volume = 0
- self._last_heartbeat = 0
- self._cur_time_milliseconds = 0
- # initialize player
- self.mass.add_job(self.async_initialize_player())
- self._heartbeat_task = self.mass.add_job(self.__send_heartbeat())
-
- async def async_initialize_player(self):
- """set some startup settings for the player."""
- # send version
- await self.__async_send_frame(b"vers", b"7.8")
- await self.__async_send_frame(b"setd", struct.pack("B", 0))
- await self.__async_send_frame(b"setd", struct.pack("B", 4))
- # TODO: handle display stuff
- # await self.setBrightness()
- # restore last volume and power state
- if self.settings.get("last_volume"):
- await self.async_volume_set(self.settings["last_volume"])
- else:
- await self.async_volume_set(40)
- if self.settings.get("last_power"):
- await self.power(self.settings["last_power"])
- else:
- await self.power_off()
-
- async def async_cmd_stop(self):
- """Send stop command to player."""
- data = await self.__pack_stream(b"q", autostart=b"0", flags=0)
- await self.__async_send_frame(b"strm", data)
-
- async def async_cmd_play(self):
- """Send play (unpause) command to player."""
- data = await self.__pack_stream(b"u", autostart=b"0", flags=0)
- await self.__async_send_frame(b"strm", data)
-
- async def async_cmd_pause(self):
- """Send pause command to player."""
- data = await self.__pack_stream(b"p", autostart=b"0", flags=0)
- await self.__async_send_frame(b"strm", data)
-
- async def async_cmd_power_on(self):
- """Send power ON command to player."""
- await self.__async_send_frame(b"aude", struct.pack("2B", 1, 1))
- self.settings["last_power"] = True
- self.powered = True
-
- async def async_cmd_power_off(self):
- """Send power TOGGLE command to player."""
- await self.cmd_stop()
- await self.__async_send_frame(b"aude", struct.pack("2B", 0, 0))
- self.settings["last_power"] = False
- self.powered = False
-
- async def async_cmd_volume_set(self, volume_level):
- """Send new volume level command to player."""
- self._volume.volume = volume_level
- og = self._volume.old_gain()
- ng = self._volume.new_gain()
- await self.__async_send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng))
- self.settings["last_volume"] = volume_level
- self.volume_level = volume_level
-
- async def async_cmd_volume_mute(self, is_muted=False):
- """Send mute command to player."""
- if is_muted:
- await self.__async_send_frame(b"aude", struct.pack("2B", 0, 0))
- else:
- await self.__async_send_frame(b"aude", struct.pack("2B", 1, 1))
- self.muted = is_muted
-
- async def async_cmd_queue_play_index(self, index: int):
- """
- play item at index X on player's queue
- :param index: (int) index of the queue item that should start playing
- """
- new_track = await self.queue.get_item(index)
- if new_track:
- await self.__send_flush()
- await self.__send_play(new_track.uri)
-
- async def async_cmd_queue_load(self, queue_items):
- """
- load/overwrite given items in the player's own queue implementation
- :param queue_items: a list of QueueItems
- """
- await self.__send_flush()
- if queue_items:
- await self.__send_play(queue_items[0].uri)
-
- async def async_cmd_queue_insert(self, queue_items, insert_at_index):
- # queue handled by built-in queue controller
- # we only check the start index
- if insert_at_index == self.queue.cur_index:
- return await self.cmd_queue_play_index(insert_at_index)
-
- async def async_cmd_queue_append(self, queue_items):
- pass # automagically handled by built-in queue controller
-
- async def async_cmd_play_uri(self, uri: str):
- """
- [MUST OVERRIDE]
- tell player to start playing a single uri
- """
- await self.__send_flush()
- await self.__send_play(uri)
-
- async def __async_send_flush(self):
- data = await self.__pack_stream(b"f", autostart=b"0", flags=0)
- await self.__async_send_frame(b"strm", data)
-
- async def __async_send_play(self, uri):
- """Play uri"""
- self.current_uri = uri
- self.powered = True
- enable_crossfade = self.settings["crossfade_duration"] > 0
- command = b"s"
- autostart = (
- b"3"
- ) # we use direct stream for now so let the player do the messy work with buffers
- transType = b"1" if enable_crossfade else b"0"
- transDuration = self.settings["crossfade_duration"]
- formatbyte = b"f" # fixed to flac
- uri = "/stream" + uri.split("/stream")[1]
- data = await self.__pack_stream(
- command,
- autostart=autostart,
- flags=0x00,
- formatbyte=formatbyte,
- transType=transType,
- transDuration=transDuration,
- )
- headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" % (
- self.mass.web.local_ip,
- self.mass.web.http_port,
- )
- request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers)
- data = data + request.encode("utf-8")
- await self.__async_send_frame(b"strm", data)
-
- def __delete__(self, instance):
- """make sure the heartbeat task is deleted"""
- if self._heartbeat_task:
- self._heartbeat_task.cancel()
-
- @run_periodic(5)
- async def __async_send_heartbeat(self):
- """Send periodic heartbeat message to player."""
- timestamp = int(time.time())
- data = await self.__pack_stream(b"t", replayGain=timestamp, flags=0)
- await self.__async_send_frame(b"strm", data)
-
- async def __async_send_frame(self, command, data):
- """Send command to Squeeze player."""
- packet = struct.pack("!H", len(data) + 4) + command + data
- self._writer.write(packet)
- await self._writer.drain()
-
- async def __async_pack_stream(
- self,
- command,
- autostart=b"1",
- formatbyte=b"o",
- pcmargs=(b"?", b"?", b"?", b"?"),
- threshold=200,
- spdif=b"0",
- transDuration=0,
- transType=b"0",
- flags=0x40,
- outputThreshold=0,
- replayGain=0,
- serverPort=8095,
- serverIp=0,
- ):
- return struct.pack(
- "!cccccccBcBcBBBLHL",
- command,
- autostart,
- formatbyte,
- *pcmargs,
- threshold,
- spdif,
- transDuration,
- transType,
- flags,
- outputThreshold,
- 0,
- replayGain,
- serverPort,
- serverIp
- )
-
- async def async_displayTrack(self, track):
- await self.render("%s by %s" % (track.title, track.artist))
-
- async def async_setBrightness(self, level=4):
- assert 0 <= level <= 4
- await self.__async_send_frame(b"grfb", struct.pack("!H", level))
-
- async def async_set_visualisation(self, visualisation):
- await self.__async_send_frame(b"visu", visualisation.pack())
-
- async def async_render(self, text):
- # self.display.clear()
- # self.display.renderText(text, "DejaVu-Sans", 16, (0,0))
- # self.updateDisplay(self.display.frame())
- pass
-
- async def async_updateDisplay(self, bitmap, transition="c", offset=0, param=0):
- frame = struct.pack("!Hcb", offset, transition, param) + bitmap
- await self.__async_send_frame(b"grfe", frame)
-
- async def async_process_msg(self, operation, packet):
- handler = getattr(self, "process_%s" % operation, None)
- if handler is None:
- LOGGER.debug("No handler for %s" % operation)
- else:
- await handler(packet)
-
- async def async_process_STAT(self, data):
- """process incoming event from player."""
- event = data[:4].decode()
- event_data = data[4:]
- if event == b"\x00\x00\x00\x00":
- # Presumed informational stat message
- return
- event_handler = getattr(self, "stat_%s" % event, None)
- if event_handler is None:
- LOGGER.debug("Got event %s - event_data: %s" % (event, event_data))
- else:
- await event_handler(data[4:])
-
- async def async_stat_aude(self, data):
- (spdif_enable, dac_enable) = struct.unpack("2B", data[:4])
- powered = spdif_enable or dac_enable
- self.powered = powered
- self.muted = not powered
- LOGGER.debug("ACK aude - Received player power: %s" % powered)
-
- async def async_stat_audg(self, data):
- # TODO: process volume level
- LOGGER.info("Received volume_level from player %s" % data)
- self.volume_level = self._volume.volume
-
- async def async_stat_STMd(self, data):
- LOGGER.debug("Decoder Ready for next track")
- next_item = self.queue.next_item
- if next_item:
- await self.__send_play(next_item.uri)
-
- async def async_stat_STMf(self, data):
- LOGGER.debug("Status Message: Connection closed")
- self.state = PlayerState.Stopped
-
- async def async_stat_STMo(self, data):
- """No more decoded (uncompressed) data to play; triggers rebuffering."""
- LOGGER.debug("Output Underrun")
-
- async def async_stat_STMp(self, data):
- """Pause confirmed"""
- self.state = PlayerState.Paused
-
- async def async_stat_STMr(self, data):
- """Resume confirmed"""
- self.state = PlayerState.Playing
-
- async def async_stat_STMs(self, data):
- """Playback of new track has started"""
- self.state = PlayerState.Playing
-
- async def async_stat_STMt(self, data):
- """heartbeat from client"""
- timestamp = time.time()
- self._last_heartbeat = timestamp
- (
- num_crlf,
- mas_initialized,
- mas_mode,
- rptr,
- wptr,
- bytes_received_h,
- bytes_received_l,
- signal_strength,
- jiffies,
- output_buffer_size,
- output_buffer_fullness,
- elapsed_seconds,
- voltage,
- cur_time_milliseconds,
- server_timestamp,
- error_code,
- ) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
- if self.state == PlayerState.Playing and elapsed_seconds != self.cur_time:
- self.cur_time = elapsed_seconds
- self._cur_time_milliseconds = cur_time_milliseconds
-
- async def async_stat_STMu(self, data):
- """Buffer underrun: Normal end of playback"""
- self.state = PlayerState.Stopped
-
- async def async_process_RESP(self, data):
- """response received at player, send continue"""
- LOGGER.debug("RESP received")
- await self.__async_send_frame(b"cont", b"0")
-
- async def async_process_IR(self, data):
- """Slightly involved codepath here. This raises an event, which may
- be picked up by the service and then the process_remote_* function in
- this player will be called. This is mostly relevant for volume changes
- - most other button presses will require some context to operate."""
- (time, code) = struct.unpack("!IxxI", data)
- LOGGER.info("IR code %s" % code)
- # command = Remote.codes.get(code, None)
- # if command is not None:
- # LOGGER.info("IR received: %r, %r" % (code, command))
- # #self.service.evreactor.fireEvent(RemoteButtonPressed(self, command))
- # else:
- # LOGGER.info("Unknown IR received: %r, %r" % (time, code))
-
- async def async_process_SETD(self, data):
- """Get/set player firmware settings"""
- LOGGER.debug("SETD received %s" % data)
- cmd_id = data[0]
- if cmd_id == 0:
- # received player name
- data = data[1:].decode()
- self.name = data
-
-
-# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO
-devices = {
- 2: "squeezebox",
- 3: "softsqueeze",
- 4: "squeezebox2",
- 5: "transporter",
- 6: "softsqueeze3",
- 7: "receiver",
- 8: "squeezeslave",
- 9: "controller",
- 10: "boom",
- 11: "softboom",
- 12: "squeezeplay",
-}
-
-
-class PySqueezeVolume(object):
-
- """Represents a sound volume. This is an awful lot more complex than it
- sounds."""
-
- minimum = 0
- maximum = 100
- step = 1
-
- # this map is taken from Slim::Player::Squeezebox2 in the squeezecenter source
- # i don't know how much magic it contains, or any way I can test it
- old_map = [
- 0,
- 1,
- 1,
- 1,
- 2,
- 2,
- 2,
- 3,
- 3,
- 4,
- 5,
- 5,
- 6,
- 6,
- 7,
- 8,
- 9,
- 9,
- 10,
- 11,
- 12,
- 13,
- 14,
- 15,
- 16,
- 16,
- 17,
- 18,
- 19,
- 20,
- 22,
- 23,
- 24,
- 25,
- 26,
- 27,
- 28,
- 29,
- 30,
- 32,
- 33,
- 34,
- 35,
- 37,
- 38,
- 39,
- 40,
- 42,
- 43,
- 44,
- 46,
- 47,
- 48,
- 50,
- 51,
- 53,
- 54,
- 56,
- 57,
- 59,
- 60,
- 61,
- 63,
- 65,
- 66,
- 68,
- 69,
- 71,
- 72,
- 74,
- 75,
- 77,
- 79,
- 80,
- 82,
- 84,
- 85,
- 87,
- 89,
- 90,
- 92,
- 94,
- 96,
- 97,
- 99,
- 101,
- 103,
- 104,
- 106,
- 108,
- 110,
- 112,
- 113,
- 115,
- 117,
- 119,
- 121,
- 123,
- 125,
- 127,
- 128,
- ]
-
- # new gain parameters, from the same place
- total_volume_range = -50 # dB
- step_point = (
- -1
- ) # Number of steps, up from the bottom, where a 2nd volume ramp kicks in.
- step_fraction = (
- 1
- ) # fraction of totalVolumeRange where alternate volume ramp kicks in.
-
- def __init__(self):
- self.volume = 50
-
- def increment(self):
- """Increment the volume"""
- self.volume += self.step
- if self.volume > self.maximum:
- self.volume = self.maximum
-
- def decrement(self):
- """Decrement the volume"""
- self.volume -= self.step
- if self.volume < self.minimum:
- self.volume = self.minimum
-
- def old_gain(self):
- """Return the "Old" gain value as required by the squeezebox"""
- return self.old_map[self.volume]
-
- def decibels(self):
- """Return the "new" gain value."""
-
- step_db = self.total_volume_range * self.step_fraction
- max_volume_db = 0 # different on the boom?
-
- # Equation for a line:
- # y = mx+b
- # y1 = mx1+b, y2 = mx2+b.
- # y2-y1 = m(x2 - x1)
- # y2 = m(x2 - x1) + y1
- slope_high = max_volume_db - step_db / (100.0 - self.step_point)
- slope_low = step_db - self.total_volume_range / (self.step_point - 0.0)
- x2 = self.volume
- if x2 > self.step_point:
- m = slope_high
- x1 = 100
- y1 = max_volume_db
- else:
- m = slope_low
- x1 = 0
- y1 = self.total_volume_range
- return m * (x2 - x1) + y1
-
- def new_gain(self):
- db = self.decibels()
- floatmult = 10 ** (db / 20.0)
- # avoid rounding errors somehow
- if -30 <= db <= 0:
- return int(floatmult * (1 << 8) + 0.5) * (1 << 8)
- else:
- return int((floatmult * (1 << 16)) + 0.5)
-
-
-##### UDP DISCOVERY STUFF #############
-
-
-class Datagram(object):
- @classmethod
- def decode(self, data):
- if data[0] == "e":
- return TLVDiscoveryRequestDatagram(data)
- elif data[0] == "E":
- return TLVDiscoveryResponseDatagram(data)
- elif data[0] == "d":
- return ClientDiscoveryDatagram(data)
- elif data[0] == "h":
- pass # Hello!
- elif data[0] == "i":
- pass # IR
- elif data[0] == "2":
- pass # i2c?
- elif data[0] == "a":
- pass # ack!
-
-
-class ClientDiscoveryDatagram(Datagram):
-
- device = None
- firmware = None
- client = None
-
- def __init__(self, data):
- s = struct.unpack("!cxBB8x6B", data.encode())
- assert s[0] == "d"
- self.device = s[1]
- self.firmware = hex(s[2])
- self.client = ":".join(["%02x" % (x,) for x in s[3:]])
-
- def __repr__(self):
- return "<%s device=%r firmware=%r client=%r>" % (
- self.__class__.__name__,
- self.device,
- self.firmware,
- self.client,
- )
-
-
-class DiscoveryResponseDatagram(Datagram):
- def __init__(self, hostname, port):
- hostname = hostname[:16].encode("UTF-8")
- hostname += (16 - len(hostname)) * "\x00"
- self.packet = struct.pack("!c16s", "D", hostname).decode()
-
-
-class TLVDiscoveryRequestDatagram(Datagram):
- def __init__(self, data):
- requestdata = OrderedDict()
- assert data[0] == "e"
- idx = 1
- length = len(data) - 5
- while idx <= length:
- typ, l = struct.unpack_from("4sB", data.encode(), idx)
- if l:
- val = data[idx + 5 : idx + 5 + l]
- idx += 5 + l
- else:
- val = None
- idx += 5
- typ = typ.decode()
- requestdata[typ] = val
- self.data = requestdata
-
- def __repr__(self):
- return "<%s data=%r>" % (self.__class__.__name__, self.data.items())
-
-
-class TLVDiscoveryResponseDatagram(Datagram):
- def __init__(self, responsedata):
- parts = ["E"] # new discovery format
- for typ, value in responsedata.items():
- if value is None:
- value = ""
- elif len(value) > 255:
- # Response too long, truncating to 255 bytes
- value = value[:255]
- parts.extend((typ, chr(len(value)), value))
- self.packet = "".join(parts)
-
-
-class DiscoveryProtocol:
- def __init__(self, web_port):
- self.web_port = web_port
-
- def connection_made(self, transport):
- self.transport = transport
- # Allow receiving multicast broadcasts
- sock = self.transport.get_extra_info("socket")
- group = socket.inet_aton("239.255.255.250")
- mreq = struct.pack("4sL", group, socket.INADDR_ANY)
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
-
- def error_received(self, exc):
- LOGGER.error(exc)
-
- def connection_lost(self, *args, **kwargs):
- LOGGER.debug("Connection lost to discovery")
-
- def build_TLV_response(self, requestdata):
- responsedata = OrderedDict()
- for typ, value in requestdata.items():
- if typ == "NAME":
- # send full host name - no truncation
- value = get_hostname()
- elif typ == "IPAD":
- # send ipaddress as a string only if it is set
- value = get_ip()
- # :todo: IPv6
- if value == "0.0.0.0":
- # do not send back an ip address
- typ = None
- elif typ == "JSON":
- # send port as a string
- json_port = self.web_port
- value = str(json_port)
- elif typ == "VERS":
- # send server version
- value = "7.9"
- elif typ == "UUID":
- # send server uuid
- value = "musicassistant"
- else:
- LOGGER.debug("Unexpected information request: %r", typ)
- typ = None
- if typ:
- responsedata[typ] = value
- return responsedata
-
- def datagram_received(self, data, addr):
- try:
- data = data.decode()
- dgram = Datagram.decode(data)
- if isinstance(dgram, ClientDiscoveryDatagram):
- self.sendDiscoveryResponse(addr)
- elif isinstance(dgram, TLVDiscoveryRequestDatagram):
- resonsedata = self.build_TLV_response(dgram.data)
- self.sendTLVDiscoveryResponse(resonsedata, addr)
- except Exception as exc:
- LOGGER.exception(exc)
-
- def sendDiscoveryResponse(self, addr):
- dgram = DiscoveryResponseDatagram(get_hostname(), 3483)
- self.transport.sendto(dgram.packet.encode(), addr)
-
- def sendTLVDiscoveryResponse(self, resonsedata, addr):
- dgram = TLVDiscoveryResponseDatagram(resonsedata)
- self.transport.sendto(dgram.packet.encode(), addr)