super().__init__(mass)
self.prov_id = 'squeezebox'
self.name = 'Squeezebox'
- self._squeeze_players = {}
- self.buffer = b''
- self.last_msg_received = 0
-
# start slimproto server
- mass.event_loop.create_task(asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
+ mass.event_loop.create_task(
+ asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
# setup discovery
mass.event_loop.create_task(self.start_discovery())
async def start_discovery(self):
transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
- lambda: DiscoveryProtocol(self.mass.web._http_port),
+ lambda: DiscoveryProtocol(self.mass.web.http_port),
local_addr=('0.0.0.0', 3483))
try:
while True:
finally:
transport.close()
- async def player_command(self, player_id, cmd:str, cmd_args=None):
- ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
- if cmd == 'play':
- if self._players[player_id].state == PlayerState.Stopped:
- await self.__queue_play(player_id, None)
- else:
- self._squeeze_players[player_id].unpause()
- elif cmd == 'pause':
- self._squeeze_players[player_id].pause()
- elif cmd == 'stop':
- self._squeeze_players[player_id].stop()
- elif cmd == 'next':
- self._squeeze_players[player_id].next()
- elif cmd == 'previous':
- await self.__queue_previous(player_id)
- elif cmd == 'power' and cmd_args == 'off':
- self._squeeze_players[player_id].power_off()
- elif cmd == 'power':
- self._squeeze_players[player_id].power_on()
- elif cmd == 'volume':
- self._squeeze_players[player_id].volume_set(try_parse_int(cmd_args))
- elif cmd == 'mute' and cmd_args == 'off':
- self._squeeze_players[player_id].unmute()
- elif cmd == 'mute':
- self._squeeze_players[player_id].mute()
-
- async def play_media(self, player_id, media_items, queue_opt='play'):
- '''
- play media on a player
- '''
- player = await self.get_player(player_id)
- cur_index = player.cur_queue_index
-
- if queue_opt == 'replace' or not player.queue:
- # overwrite queue with new items
- player.queue = media_items
- await self.__queue_play(player_id, 0, send_flush=True)
- elif queue_opt == 'play':
- # replace current item with new item(s)
- player.queue = player.queue[player_id][:cur_index] + media_items + player.queue[player_id][cur_index+1:]
- await self.__queue_play(player_id, cur_index, send_flush=True)
- elif queue_opt == 'next':
- # insert new items at current index +1
- player.queue[player_id] = player.queue[player_id][:cur_index+1] + media_items + player.queue[player_id][cur_index+1:]
- elif queue_opt == 'add':
- # add new items at end of queue
- player.queue[player_id] = player.queue[player_id] + media_items
-
- ### Provider specific (helper) methods #####
-
- async def __queue_play(self, player_id, index, send_flush=False):
- ''' send play command to player '''
- if not player_id in player.queue or not player_id in player.queue_index:
- return
- if not player.queue[player_id]:
- return
- if index == None:
- index = player.queue_index[player_id]
- if len(player.queue[player_id]) >= index:
- track = player.queue[player_id][index]
- if send_flush:
- self._squeeze_players[player_id].flush()
- self._squeeze_players[player_id].play(track.uri)
- player.queue_index[player_id] = index
-
- async def __queue_next(self, player_id):
- ''' request next track from queue '''
- if not player_id in player.queue or not player_id in player.queue:
- return
- cur_queue_index = player.queue_index[player_id]
- if len(player.queue[player_id]) > cur_queue_index:
- new_queue_index = cur_queue_index + 1
- elif self._players[player_id].repeat_enabled:
- new_queue_index = 0
- else:
- LOGGER.warning("next track requested but no more tracks in queue")
- return
- return await self.__queue_play(player_id, new_queue_index)
-
- async def __queue_previous(self, player_id):
- ''' request previous track from queue '''
- if not player_id in player.queue:
- return
- cur_queue_index = player.queue_index[player_id]
- if cur_queue_index == 0 and len(player.queue[player_id]) > 1:
- new_queue_index = len(player.queue[player_id]) -1
- elif cur_queue_index == 0:
- new_queue_index = cur_queue_index
- else:
- new_queue_index -= 1
- player.queue_index[player_id] = new_queue_index
- return await self.__queue_play(player_id, new_queue_index)
-
- async def __handle_player_event(self, player_id, event, event_data=None):
- ''' handle event from player '''
- if not player_id:
- return
- LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data)))
- Squeeze_player = self._squeeze_players[player_id]
- if event == "next_track":
- return await self.__queue_next(player_id)
- player
- if not player_id in self._players:
- player = MusicPlayer()
- player.player_id = player_id
- player.player_provider = self.prov_id
- self._players[player_id] = player
- if not player_id in player.queue:
- player.queue[player_id] = []
- if not player_id in player.queue_index:
- player.queue_index[player_id] = 0
- else:
- player = self._players[player_id]
- # update player properties
- player.name = Squeeze_player.player_name
- player.volume_level = Squeeze_player.volume_level
- player.cur_time = Squeeze_player._elapsed_seconds
- if event == "disconnected":
- return await self.mass.player.remove_player(player_id)
- elif event == "power":
- player.powered = event_data
- elif event == "state":
- player.state = event_data
- if player.queue[player_id]:
- cur_queue_index = player.queue_index[player_id]
- player.cur_item = player.queue[player_id][cur_queue_index]
- # update player details
- await self.mass.player.update_player(player)
-
async def __handle_socket_client(self, reader, writer):
''' handle a client connection on the socket'''
- LOGGER.debug("new socket client connected")
- Squeeze_player = PySqueezePlayer(stream_host, stream_port)
-
- def send_frame(command, data):
- ''' send command to Squeeze player'''
- packet = struct.pack('!H', len(data) + 4) + command + data
- writer.write(packet)
-
- def handle_event(event, event_data=None):
- ''' handle events from player'''
- if event == "connected":
- self._squeeze_players[Squeeze_player.player_id] = Squeeze_player
- Squeeze_player.player_settings = self.mass.config['player_settings'][Squeeze_player.player_id]
- asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, event, event_data))
-
- try:
- @run_periodic(5)
- async def send_heartbeat():
- timestamp = int(time.time())
- data = Squeeze_player.pack_stream(b"t", replayGain=timestamp, flags=0)
- Squeeze_player.send_frame(b"strm", data)
-
- Squeeze_player.send_frame = send_frame
- Squeeze_player.send_event = handle_event
- heartbeat_task = asyncio.create_task(send_heartbeat())
-
+ buffer = b''
+ player = None
+
+ def client_connected(data):
+ ''' client connected '''
+ (dev_id, rev, mac) = struct.unpack('BB6s', data[:8])
+ device_mac = ':'.join("%02x" % x for x in mac)
+ player_id = str(device_mac).lower()
+ device_type = devices.get(dev_id, 'unknown device')
+ player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer)
+ self.mass.event_loop.create_task(self.mass.player.add_player(player))
+
+ try:
# keep reading bytes from the socket
while True:
data = await reader.read(64)
- if data:
- Squeeze_player.dataReceived(data)
- else:
- break
+ # handle incoming data from socket
+ buffer = buffer + data
+ if len(buffer) > 8:
+ operation, length = buffer[:4], buffer[4:8]
+ plen = struct.unpack('!I', length)[0] + 8
+ if len(buffer) >= plen:
+ packet, buffer = buffer[8:plen], buffer[plen:]
+ operation = operation.strip(b"!").strip().decode()
+ if operation == 'HELO':
+ client_connected(packet)
+ elif player:
+ _handler = getattr(player, "process_%s" % operation, None)
+ if _handler:
+ _handler(packet)
+
except Exception as exc:
# connection lost ?
LOGGER.warning(exc)
# disconnect
- heartbeat_task.cancel()
- asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, 'disconnected'))
-
+ await self.mass.player.remove_player(player)
class PySqueezePlayer(Player):
''' Squeezebox socket client '''
- def __init__(self, stream_host, stream_port):
+ def __init__(self, mass, player_id, prov_id, dev_type, writer):
+ super().__init__(mass, player_id, prov_id)
+ self.supports_queue = True
+ self.supports_gapless = True
+ self.supports_crossfade = True
+ self.supports_replay_gain = True
+ self._writer = writer
self.buffer = b''
- #self.display = Display()
- self.send_frame = None
- self.send_event = None
- self.stream_host = stream_host
- self.stream_port = stream_port
- self.player_settings = {}
- self.playback_millis = 0
+ self.name = "%s - %s" %(dev_type, player_id)
self._volume = PySqueezeVolume()
- self._device_type = None
- self._mac_address = None
- self._player_name = None
self._last_volume = 0
self._last_heartbeat = 0
- self._elapsed_seconds = 0
- self._elapsed_milliseconds = 0
-
- @property
- def player_name(self):
- if self._player_name:
- return self._player_name
- return "%s - %s" %(self._device_type, self._mac_address)
-
- @property
- def player_id(self):
- return self._mac_address
-
- @property
- def volume_level(self):
- return self._volume.volume
-
- def dataReceived(self, data):
- self.buffer = self.buffer + data
- if len(self.buffer) > 8:
- operation, length = self.buffer[:4], self.buffer[4:8]
- length = struct.unpack('!I', length)[0]
- plen = length + 8
- if len(self.buffer) >= plen:
- packet, self.buffer = self.buffer[8:plen], self.buffer[plen:]
- operation = operation.strip(b"!").strip().decode()
- #LOGGER.info("operation: %s" % operation)
- handler = getattr(self, "process_%s" % operation, None)
- if handler is None:
- raise NotImplementedError
- handler(packet)
-
- def send_version(self):
- self.send_frame(b'vers', b'7.8')
+ self._cur_time_milliseconds = 0
+ # initialize player
+ self.send_version()
+ self.setBrightness()
+ #self.set_visualisation(SpectrumAnalyser())
+ #self.display = Display()
+ self.send_frame(b"setd", struct.pack("B", 0))
+ self.send_frame(b"setd", struct.pack("B", 4))
- def pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200,
- spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0,
- replayGain=0, serverPort = 8095, serverIp = 0):
- return struct.pack("!cccccccBcBcBBBLHL",
- command, autostart, formatbyte, *pcmargs,
- threshold, spdif, transDuration, transType,
- flags, outputThreshold, 0, replayGain, serverPort, serverIp)
+ # TODO: remember last volume and power state
+ self.mass.event_loop.create_task(self.volume_set(40))
+ self.mass.event_loop.create_task(self.power_off())
+ self._heartbeat_task = asyncio.create_task(self.__send_heartbeat())
- def stop(self):
+ async def cmd_stop(self):
+ ''' send stop command to player '''
data = self.pack_stream(b"q", autostart=b"0", flags=0)
self.send_frame(b"strm", data)
- def flush(self):
- data = self.pack_stream(b"f", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
-
- def pause(self):
- data = self.pack_stream(b"p", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
- LOGGER.info("Sending pause request")
-
- def unpause(self):
+ async def cmd_play(self):
+ ''' send play (unpause) command to player '''
data = self.pack_stream(b"u", autostart=b"0", flags=0)
self.send_frame(b"strm", data)
- LOGGER.info("Sending unpause request")
- def next(self):
- data = self.pack_stream(b"f", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
- self.send_event("next_track")
-
- def previous(self):
- data = self.pack_stream(b"f", autostart=b"0", flags=0)
+ async def cmd_pause(self):
+ ''' send pause command to player '''
+ data = self.pack_stream(b"p", autostart=b"0", flags=0)
self.send_frame(b"strm", data)
- self.send_event("previous_track")
-
- def power_on(self):
+
+ async def cmd_power_on(self):
+ ''' [MUST OVERRIDE] send power ON command to player '''
self.send_frame(b"aude", struct.pack("2B", 1, 1))
- self.send_event("power", True)
+ self.powered = True
- def power_off(self):
- self.stop()
+ async def cmd_power_off(self):
+ ''' [MUST OVERRIDE] send power TOGGLE command to player '''
+ await self.cmd_stop()
self.send_frame(b"aude", struct.pack("2B", 0, 0))
- self.send_event("power", False)
+ self.powered = False
- def mute_on(self):
- self.send_frame(b"aude", struct.pack("2B", 0, 0))
- self.send_event("mute", True)
+ async def cmd_volume_set(self, volume_level):
+ ''' [MUST OVERRIDE] send new volume level command to player '''
+ self._volume.volume = volume_level
+ og = self._volume.old_gain()
+ ng = self._volume.new_gain()
+ self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng))
+ self.volume_level = volume_level
+
+ async def cmd_volume_mute(self, is_muted=False):
+ ''' [MUST OVERRIDE] send mute command to player '''
+ if is_muted:
+ self.send_frame(b"aude", struct.pack("2B", 0, 0))
+ else:
+ self.send_frame(b"aude", struct.pack("2B", 1, 1))
+ self.muted = is_muted
- def mute_off(self):
- self.send_frame(b"aude", struct.pack("2B", 1, 1))
- self.send_event("mute", False)
+ async def cmd_queue_play_index(self, index:int):
+ '''
+ play item at index X on player's queue
+ :attrib index: (int) index of the queue item that should start playing
+ '''
+ new_track = await self.queue.get_item(index)
+ self.flush()
+ self.play(new_track.uri)
+
+ async def cmd_queue_load(self, queue_items):
+ '''
+ load/overwrite given items in the player's own queue implementation
+ :param queue_items: a list of QueueItems
+ '''
+ self.flush()
+ self.play(queue_items[0].uri)
- def volume_up(self):
- self._volume.increment()
- self.send_volume()
+ async def cmd_queue_insert(self, queue_items, offset=0):
+ ''' nothing to do, handled by built-in queue '''
+ pass
- def volume_down(self):
- self._volume.decrement()
- self.send_volume()
+ async def cmd_queue_append(self, queue_items):
+ ''' nothing to do, handled by built-in queue '''
+ pass
- def volume_set(self, new_vol):
- self._volume.volume = new_vol
- self.send_volume()
+ async def cmd_play_uri(self, uri:str):
+ '''
+ [MUST OVERRIDE]
+ tell player to start playing a single uri
+ '''
+ self.flush()
+ self.play(uri)
+
+ def flush(self):
+ data = self.pack_stream(b"f", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
def play(self, uri):
- enable_crossfade = self.player_settings["crossfade_duration"] > 0
+ enable_crossfade = self.settings["crossfade_duration"] > 0
command = b's'
autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers
transType= b'1' if enable_crossfade else b'0'
- transDuration = self.player_settings["crossfade_duration"]
+ transDuration = self.settings["crossfade_duration"]
formatbyte = b'f' # fixed to flac
uri = '/stream' + uri.split('/stream')[1]
data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration)
- headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.stream_host, self.stream_port)
+ headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.mass.web.local_ip, self.mass.web.http_port)
request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers)
data = data + request.encode("utf-8")
self.send_frame(b'strm', data)
LOGGER.info("Requesting play from squeezebox" )
+ def __delete__(self, instance):
+ ''' make sure the heartbeat task is deleted '''
+ self._heartbeat_task.cancel()
+
+ @run_periodic(5)
+ async def __send_heartbeat(self):
+ ''' send periodic heartbeat message to player '''
+ timestamp = int(time.time())
+ data = self.pack_stream(b"t", replayGain=timestamp, flags=0)
+ self.send_frame(b"strm", data)
+
+ def send_frame(self, command, data):
+ ''' send command to Squeeze player'''
+ packet = struct.pack('!H', len(data) + 4) + command + data
+ self._writer.write(packet)
+
+ def send_version(self):
+ self.send_frame(b'vers', b'7.8')
+
+ def pack_stream(self, command, autostart=b"1", formatbyte = b'o',
+ pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200,
+ spdif = b'0', transDuration = 0, transType = b'0',
+ flags = 0x40, outputThreshold = 0,
+ replayGain=0, serverPort = 8095, serverIp = 0):
+ return struct.pack("!cccccccBcBcBBBLHL",
+ command, autostart, formatbyte, *pcmargs,
+ threshold, spdif, transDuration, transType,
+ flags, outputThreshold, 0, replayGain, serverPort, serverIp)
+
+
def displayTrack(self, track):
self.render("%s by %s" % (track.title, track.artist))
-
- def process_HELO(self, data):
- (devId, rev, mac) = struct.unpack('BB6s', data[:8])
- device_mac = ':'.join("%02x" % x for x in mac)
- self._device_type = devices.get(devId, 'unknown device')
- self._mac_address = str(device_mac).lower()
- LOGGER.debug("HELO received from %s %s" % (self._mac_address, self._device_type))
- self.init_client()
-
- def init_client(self):
- ''' initialize a new connected client '''
- self.send_event("connected")
- self.send_version()
- self.stop()
- self.setBrightness()
- #self.set_visualisation(SpectrumAnalyser())
- self.send_frame(b"setd", struct.pack("B", 0))
- self.send_frame(b"setd", struct.pack("B", 4))
- self.power_on()
- self.volume_set(40) # TODO: remember last volume
- def send_volume(self):
- og = self._volume.old_gain()
- ng = self._volume.new_gain()
- LOGGER.info("Volume set to %d (%d/%d)" % (self._volume.volume, og, ng))
- d = self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng))
- self.send_event("volume", self._volume.volume)
-
def setBrightness(self, level=4):
assert 0 <= level <= 4
self.send_frame(b"grfb", struct.pack("!H", level))
def stat_aude(self, data):
(spdif_enable, dac_enable) = struct.unpack("2B", data[:4])
powered = spdif_enable or dac_enable
- self.send_event("power", powered)
+ self.powered = powered
+ self.muted = not powered
LOGGER.debug("ACK aude - Received player power: %s" % powered)
def stat_audg(self, data):
- LOGGER.info("Received volume_level from player %s" % data)
- self.send_event("volume", self._volume.volume)
+ LOGGER.debug("Received volume_level from player %s" % data)
+ self.volume_level = self._volume.volume
def stat_strm(self, data):
LOGGER.debug("ACK strm")
def stat_STMd(self, data):
LOGGER.debug("Decoder Ready for next track")
- self.send_event("next_track")
+ next_item = self.queue.next_item
+ self.play(next_item.uri)
def stat_STMe(self, data):
LOGGER.info("Connection established")
def stat_STMf(self, data):
LOGGER.info("Status Message: Connection closed")
- self.send_event("state", PlayerState.Stopped)
+ self.state = PlayerState.Stopped
def stat_STMh(self, data):
LOGGER.info("Status Message: End of headers")
def stat_STMp(self, data):
'''Pause confirmed'''
- self.send_event("state", PlayerState.Paused)
+ self.state = PlayerState.Paused
def stat_STMr(self, data):
'''Resume confirmed'''
- self.send_event("state", PlayerState.Playing)
+ self.state = PlayerState.Playing
def stat_STMs(self, data):
'''Playback of new track has started'''
- self.send_event("state", PlayerState.Playing)
+ self.state = PlayerState.Playing
def stat_STMt(self, data):
""" heartbeat from client """
timestamp = time.time()
self._last_heartbeat = timestamp
(num_crlf, mas_initialized, mas_mode, rptr, wptr,
- bytes_received_h, bytes_received_l, signal_strength,
- jiffies, output_buffer_size, output_buffer_fullness,
- elapsed_seconds, voltage, elapsed_milliseconds,
- server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
- if elapsed_seconds != self._elapsed_seconds:
- self.send_event("progress")
- self._elapsed_seconds = elapsed_seconds
- self._elapsed_milliseconds = elapsed_milliseconds
+ bytes_received_h, bytes_received_l, signal_strength,
+ jiffies, output_buffer_size, output_buffer_fullness,
+ elapsed_seconds, voltage, cur_time_milliseconds,
+ server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
+ if elapsed_seconds != self.cur_time:
+ self.cur_time = elapsed_seconds
+ self._cur_time_milliseconds = cur_time_milliseconds
def stat_STMu(self, data):
'''Normal end of playback'''
LOGGER.info("End of playback - Underrun")
- self.send_event("state", PlayerState.Stopped)
+ self.state = PlayerState.Stopped
def process_BYE(self, data):
LOGGER.info("BYE received")
- self.send_event("disconnected")
def process_RESP(self, data):
LOGGER.info("RESP received")
def process_SETD(self, data):
''' Get/set player firmware settings '''
- LOGGER.debug("SETD received %s" % data)
+ LOGGER.info("SETD received %s" % data)
cmd_id = data[0]
if cmd_id == 0:
# received player name
data = data[1:].decode()
- self._player_name = data
- self.send_event("name")
+ self.name = data
def process_UREQ(self, data):
LOGGER.info("UREQ received")