import logging
from typing import List
+from music_assistant.constants import CONF_CROSSFADE_DURATION
+from music_assistant.helpers.typing import MusicAssistantType
from music_assistant.models.config_entry import ConfigEntry
+from music_assistant.models.player import (
+ DeviceInfo,
+ PlaybackState,
+ Player,
+ PlayerFeature,
+)
+from music_assistant.models.player_queue import QueueItem
from music_assistant.models.playerprovider import PlayerProvider
+from music_assistant.utils import callback
from .constants import PROV_ID, PROV_NAME
from .discovery import DiscoveryProtocol
-from .socket_client import SqueezeSocketClient
+from .socket_client import SqueezeEvent, SqueezeSocketClient
CONF_LAST_POWER = "last_power"
CONF_LAST_VOLUME = "last_volume"
LOGGER = logging.getLogger(PROV_ID)
CONFIG_ENTRIES = [] # we don't have any provider config entries (for now)
+PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS]
+PLAYER_CONFIG_ENTRIES = [] # we don't have any player config entries (for now)
async def async_setup(mass):
async def __async_client_connected(self, reader, writer):
"""Handle a client connection on the socket."""
addr = writer.get_extra_info("peername")
- LOGGER.debug("New socket client connected: %s", addr)
+ LOGGER.debug("Socket client connected: %s", addr)
socket_client = SqueezeSocketClient(reader, writer)
- socket_client.mass = self.mass
+
+ def handle_event(event: SqueezeEvent, socket_client: SqueezeSocketClient):
+ player_id = socket_client.player_id
+ if not player_id:
+ return
+ # always check if we already have this player as it might be reconnected
+ player_state = self.mass.player_manager.get_player(player_id)
+ if player_state:
+ player = player_state.player
+ else:
+ player = SqueezePlayer(self.mass, socket_client)
+ player.set_socket_client(socket_client)
+ # just update, the playermanager will take care of adding it if it's a new player
+ player.handle_socket_client_event(event)
+
+ socket_client.register_callback(handle_event)
+
+
+class SqueezePlayer(Player):
+ """Squeezebox player."""
+
+ def __init__(self, mass: MusicAssistantType, socket_client: SqueezeSocketClient):
+ """Initialize."""
+ self.mass = mass
+ self._socket_client = socket_client
+
+ @property
+ def available(self) -> bool:
+ """Return current availablity of player."""
+ return self._socket_client.connected
+
+ @property
+ def should_poll(self) -> bool:
+ """Return True if this player should be polled for state updates."""
+ return False
+
+ @property
+ def socket_client(self):
+ """Return the uinderluing socket client for the player."""
+ return self._socket_client
+
+ def set_socket_client(self, socket_client: SqueezeSocketClient):
+ """Set a (new) socket client to this player."""
+ self._socket_client = socket_client
+
+ async def async_on_remove(self) -> None:
+ """Call when player is removed from the player manager."""
+ self.socket_client.disconnect()
+
+ @property
+ def player_id(self) -> str:
+ """Return player id (=mac address) of the player."""
+ return self.socket_client.player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ return self.socket_client.name
+
+ @property
+ def volume_level(self):
+ """Return current volume level of player."""
+ return self.socket_client.volume_level
+
+ @property
+ def powered(self):
+ """Return current power state of player."""
+ return self.socket_client.powered
+
+ @property
+ def muted(self):
+ """Return current mute state of player."""
+ return self.socket_client.muted
+
+ @property
+ def state(self):
+ """Return current state of player."""
+ return PlaybackState(self.socket_client.state)
+
+ @property
+ def elapsed_time(self):
+ """Return elapsed_time of current playing track in (fractions of) seconds."""
+ return self.socket_client.elapsed_seconds
+
+ @property
+ def elapsed_milliseconds(self) -> int:
+ """Return (realtime) elapsed time of current playing media in milliseconds."""
+ return self.socket_client.elapsed_milliseconds
+
+ @property
+ def current_uri(self):
+ """Return uri of currently loaded track."""
+ return self.socket_client.current_uri
+
+ @property
+ def features(self) -> List[PlayerFeature]:
+ """Return list of features this player supports."""
+ return PLAYER_FEATURES
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return player specific config entries (if any)."""
+ return PLAYER_CONFIG_ENTRIES
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return the device info for this player."""
+ return DeviceInfo(
+ model=self.socket_client.device_type,
+ address=self.socket_client.device_address,
+ )
+
+ async def async_cmd_stop(self):
+ """Send stop command to player."""
+ return await self.socket_client.async_cmd_stop()
+
+ async def async_cmd_play(self):
+ """Send play (unpause) command to player."""
+ return await self.socket_client.async_cmd_play()
+
+ async def async_cmd_pause(self):
+ """Send pause command to player."""
+ return await self.socket_client.async_cmd_pause()
+
+ async def async_cmd_power_on(self) -> None:
+ """Send POWER ON command to player."""
+ # save power and volume state in cache
+ cache_str = f"squeezebox_player_state_{self.player_id}"
+ await self.mass.cache.async_set(cache_str, (True, self.volume_level))
+ return await self.socket_client.async_cmd_power(True)
+
+ async def async_cmd_power_off(self) -> None:
+ """Send POWER OFF command to player."""
+ # save power and volume state in cache
+ cache_str = f"squeezebox_player_state_{self.player_id}"
+ await self.mass.cache.async_set(cache_str, (False, self.volume_level))
+ return await self.socket_client.async_cmd_power(False)
+
+ async def async_cmd_volume_set(self, volume_level: int):
+ """Send new volume level command to player."""
+ return await self.socket_client.async_cmd_volume_set(volume_level)
+
+ async def async_cmd_mute(self, muted: bool = False):
+ """Send mute command to player."""
+ return await self.socket_client.async_cmd_mute(muted)
+
+ async def async_cmd_play_uri(self, uri: str):
+ """Request player to start playing a single uri."""
+ crossfade = self.mass.config.player_settings[self.player_id][
+ CONF_CROSSFADE_DURATION
+ ]
+ return await self.socket_client.async_play_uri(
+ uri, crossfade_duration=crossfade
+ )
+
+ async def async_cmd_next(self):
+ """Send NEXT TRACK command to player."""
+ queue = self.mass.player_manager.get_player_queue(self.player_id)
+ if queue:
+ new_track = queue.get_item(queue.cur_index + 1)
+ if new_track:
+ return await self.async_cmd_play_uri(new_track.uri)
+
+ async def async_cmd_previous(self):
+ """Send PREVIOUS TRACK command to player."""
+ queue = self.mass.player_manager.get_player_queue(self.player_id)
+ if queue:
+ new_track = queue.get_item(queue.cur_index - 1)
+ if new_track:
+ return await self.async_cmd_play_uri(new_track.uri)
+
+ async def async_cmd_queue_play_index(self, index: int):
+ """
+ Play item at index X on player's queue.
+
+ :param index: (int) index of the queue item that should start playing
+ """
+ queue = self.mass.player_manager.get_player_queue(self.player_id)
+ if queue:
+ new_track = queue.get_item(index)
+ if new_track:
+ return await self.async_cmd_play_uri(new_track.uri)
+
+ async def async_cmd_queue_load(self, queue_items: List[QueueItem]):
+ """
+ Load/overwrite given items in the player's queue implementation.
+
+ :param queue_items: a list of QueueItems
+ """
+ if queue_items:
+ await self.async_cmd_play_uri(queue_items[0].uri)
+ return await self.async_cmd_play_uri(queue_items[0].uri)
+
+ async def async_cmd_queue_insert(
+ self, queue_items: List[QueueItem], insert_at_index: int
+ ):
+ """
+ Insert new items at position X into existing queue.
+
+ If insert_at_index 0 or None, will start playing newly added item(s)
+ :param queue_items: a list of QueueItems
+ :param insert_at_index: queue position to insert new items
+ """
+ # queue handled by built-in queue controller
+ # we only check the start index
+ queue = self.mass.player_manager.get_player_queue(self.player_id)
+ if queue and insert_at_index == queue.cur_index:
+ return await self.async_cmd_queue_play_index(insert_at_index)
+
+ async def async_cmd_queue_append(self, queue_items: List[QueueItem]):
+ """
+ Append new items at the end of the queue.
+
+ :param queue_items: a list of QueueItems
+ """
+ # automagically handled by built-in queue controller
+
+ async def async_cmd_queue_update(self, queue_items: List[QueueItem]):
+ """
+ Overwrite the existing items in the queue, used for reordering.
+
+ :param queue_items: a list of QueueItems
+ """
+ # automagically handled by built-in queue controller
+
+ async def async_cmd_queue_clear(self):
+ """Clear the player's queue."""
+ # queue is handled by built-in queue controller but send stop
+ return await self.async_cmd_stop()
+
+ async def async_restore_states(self):
+ """Restore power/volume states."""
+ cache_str = f"squeezebox_player_state_{self.player_id}"
+ cache_data = await self.mass.cache.async_get(cache_str)
+ last_power, last_volume = cache_data if cache_data else (False, 40)
+ await self.socket_client.async_cmd_volume_set(last_volume)
+ await self.socket_client.async_cmd_power(last_power)
+
+ @callback
+ def handle_socket_client_event(self, event: SqueezeEvent):
+ """Process incoming event from the socket client."""
+ if event == SqueezeEvent.CONNECTED:
+ # restore previous power/volume
+ self.mass.add_job(self.async_restore_states())
+ elif event == SqueezeEvent.DECODER_READY:
+ # tell player to load next queue track
+ queue = self.mass.player_manager.get_player_queue(self.player_id)
+ if queue:
+ next_item = queue.next_item
+ if next_item:
+ crossfade = self.mass.config.player_settings[self.player_id][
+ CONF_CROSSFADE_DURATION
+ ]
+ self.mass.add_job(
+ self.socket_client.async_play_uri(
+ next_item.uri,
+ send_flush=False,
+ crossfade_duration=crossfade,
+ )
+ )
+ self.update_state()
import re
import struct
import time
-from typing import List
-
-from music_assistant.constants import CONF_CROSSFADE_DURATION
-from music_assistant.models.config_entry import ConfigEntry
-from music_assistant.models.player import (
- DeviceInfo,
- PlaybackState,
- Player,
- PlayerFeature,
-)
-from music_assistant.models.player_queue import QueueItem
+from enum import Enum
+from typing import Callable
+
from music_assistant.utils import callback, run_periodic
from .constants import PROV_ID
LOGGER = logging.getLogger(PROV_ID)
-PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS]
-PLAYER_CONFIG_ENTRIES = [] # we don't have any player config entries (for now)
# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO
DEVICE_TYPE = {
12: "squeezeplay",
}
+STATE_PLAYING = "playing"
+STATE_STOPPED = "stopped"
+STATE_PAUSED = "paused"
+
+
+class SqueezeEvent(Enum):
+ """Enum with the events that can happen in the socket client."""
+
+ CONNECTED = 0
+ STATE_UPDATED = 1
+ DECODER_READY = 2
+ DISCONNECTED = 3
-class SqueezeSocketClient(Player):
+
+class SqueezeSocketClient:
"""Squeezebox socket client."""
- def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
+ def __init__(
+ self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+ event_callback: Callable = None,
+ ):
"""Initialize the socket client."""
self._reader = reader
self._writer = writer
self._last_volume = 0
self._last_heartbeat = 0
self._volume_control = PySqueezeVolume()
- self._volume_level = 0
self._powered = False
self._muted = False
- self._state = PlaybackState.Stopped
+ self._state = STATE_STOPPED
self._elapsed_seconds = 0
self._elapsed_milliseconds = 0
self._current_uri = ""
+ self._connected = True
+ self._event_callbacks = []
self._tasks = [
asyncio.create_task(self.__async_socket_reader()),
asyncio.create_task(self.__async_send_heartbeat()),
]
- async def async_on_remove(self) -> None:
- """Call when player is removed from the player manager."""
+ def disconnect(self) -> None:
+ """Disconnect socket client."""
for task in self._tasks:
if not task.cancelled():
task.cancel()
+ def register_callback(self, callb: Callable):
+ """Register event callback. Returns function to deregister."""
+
+ def unregister():
+ self._event_callbacks.remove(callb)
+
+ self._event_callbacks.append(callb)
+ return unregister
+
+ def signal_event(self, event):
+ """Signal event to registered listeners."""
+ for listener in self._event_callbacks:
+ listener(event, self)
+
+ @property
+ def connected(self):
+ """Return connection state of the socket."""
+ return self._connected
+
@property
def player_id(self) -> str:
"""Return player id (=mac address) of the player."""
return self._player_id
- @property
- def provider_id(self) -> str:
- """Return provider id of this player."""
- return PROV_ID
-
@property
def device_type(self) -> str:
"""Return device type of the player."""
@property
def volume_level(self):
"""Return current volume level of player."""
- return self._volume_level
+ return self._volume_control.volume
@property
def powered(self):
return self._state
@property
- def elapsed_time(self):
+ def elapsed_seconds(self):
"""Return elapsed_time of current playing track in (fractions of) seconds."""
return self._elapsed_seconds
"""Return uri of currently loaded track."""
return self._current_uri
- @property
- def features(self) -> List[PlayerFeature]:
- """Return list of features this player supports."""
- return PLAYER_FEATURES
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return player specific config entries (if any)."""
- return PLAYER_CONFIG_ENTRIES
-
- @property
- def device_info(self) -> DeviceInfo:
- """Return the device info for this player."""
- return DeviceInfo(model=self.device_type, address=self.device_address)
-
async def __async_initialize_player(self):
"""Set some startup settings for the player."""
# send version
data = self.__pack_stream(b"p", autostart=b"0", flags=0)
await self.__async_send_frame(b"strm", data)
- async def async_cmd_power_on(self) -> None:
- """Send POWER ON command to player."""
- return await self.async_cmd_power(True)
-
- async def async_cmd_power_off(self) -> None:
- """Send POWER OFF command to player."""
- await self.async_cmd_stop()
- return await self.async_cmd_power(False)
-
async def async_cmd_power(self, powered: bool = True):
"""Send power command to player."""
# power is not supported so abuse mute instead
power_int = 1 if powered else 0
await self.__async_send_frame(b"aude", struct.pack("2B", power_int, 1))
self._powered = powered
- self.update_state()
- # save power and volume state in cache
- cache_str = f"squeezebox_player_state_{self.player_id}"
- await self.mass.cache.async_set(cache_str, (True, self.volume_level))
async def async_cmd_volume_set(self, volume_level: int):
"""Send new volume level command to player."""
b"audg",
struct.pack("!LLBBLL", old_gain, old_gain, 1, 255, new_gain, new_gain),
)
- self._volume_level = volume_level
async def async_cmd_mute(self, muted: bool = False):
"""Send mute command to player."""
await self.__async_send_frame(b"aude", struct.pack("2B", muted_int, 0))
self.muted = muted
- async def async_cmd_play_uri(self, uri: str):
- """Request player to start playing a single uri."""
- crossfade = self.mass.config.player_settings[self.player_id][
- CONF_CROSSFADE_DURATION
- ]
- await self.__async_cmd_handle_play_uri(
- uri, send_flush=True, crossfade_duration=crossfade
- )
-
- async def __async_cmd_handle_play_uri(
+ async def async_play_uri(
self, uri: str, send_flush: bool = True, crossfade_duration: int = 0
):
"""Request player to start playing a single uri."""
data = data + request.encode("utf-8")
await self.__async_send_frame(b"strm", data)
- async def async_cmd_next(self):
- """Send NEXT TRACK command to player."""
- queue = self.mass.player_manager.get_player_queue(self.player_id)
- if queue:
- new_track = queue.get_item(queue.cur_index + 1)
- if new_track:
- await self.__async_cmd_handle_play_uri(new_track.uri)
-
- async def async_cmd_previous(self):
- """Send PREVIOUS TRACK command to player."""
- queue = self.mass.player_manager.get_player_queue(self.player_id)
- if queue:
- new_track = queue.get_item(queue.cur_index - 1)
- if new_track:
- await self.async_cmd_play_uri(new_track.uri)
-
- async def async_cmd_queue_play_index(self, index: int):
- """
- Play item at index X on player's queue.
-
- :param index: (int) index of the queue item that should start playing
- """
- queue = self.mass.player_manager.get_player_queue(self.player_id)
- if queue:
- new_track = queue.get_item(index)
- if new_track:
- await self.async_cmd_play_uri(new_track.uri)
-
- async def async_cmd_queue_load(self, queue_items: List[QueueItem]):
- """
- Load/overwrite given items in the player's queue implementation.
-
- :param queue_items: a list of QueueItems
- """
- if queue_items:
- await self.async_cmd_play_uri(queue_items[0].uri)
-
- async def async_cmd_queue_insert(
- self, queue_items: List[QueueItem], insert_at_index: int
- ):
- """
- Insert new items at position X into existing queue.
-
- If insert_at_index 0 or None, will start playing newly added item(s)
- :param queue_items: a list of QueueItems
- :param insert_at_index: queue position to insert new items
- """
- # queue handled by built-in queue controller
- # we only check the start index
- queue = self.mass.player_manager.get_player_queue(self.player_id)
- if queue and insert_at_index == queue.cur_index:
- return await self.async_cmd_queue_play_index(insert_at_index)
-
- async def async_cmd_queue_append(self, queue_items: List[QueueItem]):
- """
- Append new items at the end of the queue.
-
- :param queue_items: a list of QueueItems
- """
- # automagically handled by built-in queue controller
-
- async def async_cmd_queue_update(self, queue_items: List[QueueItem]):
- """
- Overwrite the existing items in the queue, used for reordering.
-
- :param queue_items: a list of QueueItems
- """
- # automagically handled by built-in queue controller
-
- async def async_cmd_queue_clear(self):
- """Clear the player's queue."""
- # queue is handled by built-in queue controller but send stop
- return await self.async_cmd_stop()
-
@run_periodic(5)
async def __async_send_heartbeat(self):
"""Send periodic heartbeat message to player."""
+ if not self._connected:
+ return
timestamp = int(time.time())
data = self.__pack_stream(b"t", replay_gain=timestamp, flags=0)
await self.__async_send_frame(b"strm", data)
"""Send command to Squeeze player."""
if self._reader.at_eof() or self._writer.is_closing():
LOGGER.debug("Socket is disconnected.")
- await self.mass.player_manager.async_remove_player(self.player_id)
+ self._connected = False
+ return
packet = struct.pack("!H", len(data) + 4) + command + data
try:
self._writer.write(packet)
await self._writer.drain()
except ConnectionResetError:
- pass
+ self._connected = False
+ self.signal_event(SqueezeEvent.DISCONNECTED)
async def __async_socket_reader(self):
"""Handle incoming data from socket."""
else:
handler(packet)
# EOF reached: socket is disconnected
- LOGGER.info("Socket disconnected: %s", self._writer.get_extra_info("peername"))
- await self.mass.player_manager.async_remove_player(self.player_id)
+ LOGGER.debug("Socket disconnected: %s", self._writer.get_extra_info("peername"))
+ self._connected = False
+ self.signal_event(SqueezeEvent.DISCONNECTED)
@callback
@staticmethod
device_mac = ":".join("%02x" % x for x in mac)
self._player_id = str(device_mac).lower()
self._device_type = DEVICE_TYPE.get(dev_id, "unknown device")
- LOGGER.info("Player connected: %s", self.name)
+ LOGGER.debug("Player connected: %s", self.name)
asyncio.create_task(self.__async_initialize_player())
- # add player to player manager
- asyncio.create_task(self.mass.player_manager.async_add_player(self))
- asyncio.create_task(self.async_restore_states())
-
- async def async_restore_states(self):
- """Restore power/volume states."""
- cache_str = f"squeezebox_player_state_{self.player_id}"
- cache_data = await self.mass.cache.async_get(cache_str)
- last_power, last_volume = cache_data if cache_data else (False, 40)
- await self.async_cmd_volume_set(last_volume)
- await self.async_cmd_power(last_power)
+ self.signal_event(SqueezeEvent.CONNECTED)
@callback
def _process_stat(self, data):
powered = spdif_enable or dac_enable
self._powered = powered
self._muted = not powered
- self.update_state()
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_stat_audg(self, data):
"""Process incoming stat AUDg message (volume level)."""
# TODO: process volume level
LOGGER.debug("AUDg received - Volume level: %s", data)
- self._volume_level = self._volume_control.volume
- self.update_state()
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_stat_stmd(self, data):
"""Process incoming stat STMd message (decoder ready)."""
# pylint: disable=unused-argument
LOGGER.debug("STMu received - Decoder Ready for next track.")
- queue = self.mass.player_manager.get_player_queue(self.player_id)
- if queue:
- next_item = queue.next_item
- if next_item:
- crossfade = self.mass.config.player_settings[self.player_id][
- CONF_CROSSFADE_DURATION
- ]
- asyncio.create_task(
- self.__async_cmd_handle_play_uri(
- next_item.uri, send_flush=False, crossfade_duration=crossfade
- )
- )
+ self.signal_event(SqueezeEvent.DECODER_READY)
@callback
def _process_stat_stmf(self, data):
"""Process incoming stat STMf message (connection closed)."""
# pylint: disable=unused-argument
LOGGER.debug("STMf received - connection closed.")
- self._state = PlaybackState.Stopped
+ self._state = STATE_STOPPED
self._elapsed_milliseconds = 0
self._elapsed_seconds = 0
- self.update_state()
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
@classmethod
"""
# pylint: disable=unused-argument
LOGGER.debug("STMo received - output underrun.")
- LOGGER.debug("Output Underrun")
@callback
def _process_stat_stmp(self, data):
"""Process incoming stat STMp message: Pause confirmed."""
# pylint: disable=unused-argument
LOGGER.debug("STMp received - pause confirmed.")
- self._state = PlaybackState.Paused
- self.update_state()
+ self._state = STATE_PAUSED
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_stat_stmr(self, data):
"""Process incoming stat STMr message: Resume confirmed."""
# pylint: disable=unused-argument
LOGGER.debug("STMr received - resume confirmed.")
- self._state = PlaybackState.Playing
- self.update_state()
+ self._state = STATE_PLAYING
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_stat_stms(self, data):
# pylint: disable=unused-argument
"""Process incoming stat STMs message: Playback of new track has started."""
LOGGER.debug("STMs received - playback of new track has started.")
- self._state = PlaybackState.Playing
- self.update_state()
+ self._state = STATE_PLAYING
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_stat_stmt(self, data):
server_timestamp,
error_code,
) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
- if self.state == PlaybackState.Playing:
+ if self.state == STATE_PLAYING:
# elapsed seconds is weird when player is buffering etc.
# only rely on it if player is playing
self._elapsed_milliseconds = elapsed_milliseconds
if self._elapsed_seconds != elapsed_seconds:
self._elapsed_seconds = elapsed_seconds
- self.update_state()
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_stat_stmu(self, data):
"""Process incoming stat STMu message: Buffer underrun: Normal end of playback."""
# pylint: disable=unused-argument
LOGGER.debug("STMu received - end of playback.")
- self.state = PlaybackState.Stopped
- self.update_state()
+ self.state = STATE_STOPPED
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
@callback
def _process_resp(self, data):
# received player name
data = data[1:].decode()
self._device_name = data
- self.update_state()
+ self.signal_event(SqueezeEvent.STATE_UPDATED)
class PySqueezeVolume: