From: marcelveldt Date: Sun, 13 Oct 2019 20:55:40 +0000 (+0200) Subject: fix squeeze support X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=6e7cc4b4de410ed2f3cf8db3f74198f99907eb2d;p=music-assistant-server.git fix squeeze support --- diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index f9cac55a..f2b35e8e 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -9,7 +9,6 @@ import threading import urllib from memory_tempfile import MemoryTempfile import io -import soundfile as sf import pyloudnorm as pyln import aiohttp from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size @@ -396,6 +395,11 @@ class HTTPStreamer(): stdout=asyncio.subprocess.PIPE) audio_data, stderr = await process.communicate() # calculate BS.1770 R128 integrated loudness + try: + import soundfile as sf + except Exception as exc: + LOGGER.exception("Could not import soundfile, skip analyze job") + return if track_loudness == None: with io.BytesIO(audio_data) as tmpfile: data, rate = sf.read(tmpfile) diff --git a/music_assistant/playerproviders/squeezebox.py b/music_assistant/playerproviders/squeezebox.py index bbe93941..d885b2db 100644 --- a/music_assistant/playerproviders/squeezebox.py +++ b/music_assistant/playerproviders/squeezebox.py @@ -38,12 +38,9 @@ class PySqueezeServer(PlayerProvider): super().__init__(mass) self.prov_id = 'squeezebox' self.name = 'Squeezebox' - self._squeeze_players = {} - self.buffer = b'' - self.last_msg_received = 0 - # start slimproto server - mass.event_loop.create_task(asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483)) + mass.event_loop.create_task( + asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483)) # setup discovery mass.event_loop.create_task(self.start_discovery()) @@ -51,7 +48,7 @@ class PySqueezeServer(PlayerProvider): async def start_discovery(self): transport, protocol = await self.mass.event_loop.create_datagram_endpoint( - lambda: DiscoveryProtocol(self.mass.web._http_port), + lambda: DiscoveryProtocol(self.mass.web.http_port), local_addr=('0.0.0.0', 3483)) try: while True: @@ -59,341 +56,201 @@ class PySqueezeServer(PlayerProvider): finally: transport.close() - async def player_command(self, player_id, cmd:str, cmd_args=None): - ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' - if cmd == 'play': - if self._players[player_id].state == PlayerState.Stopped: - await self.__queue_play(player_id, None) - else: - self._squeeze_players[player_id].unpause() - elif cmd == 'pause': - self._squeeze_players[player_id].pause() - elif cmd == 'stop': - self._squeeze_players[player_id].stop() - elif cmd == 'next': - self._squeeze_players[player_id].next() - elif cmd == 'previous': - await self.__queue_previous(player_id) - elif cmd == 'power' and cmd_args == 'off': - self._squeeze_players[player_id].power_off() - elif cmd == 'power': - self._squeeze_players[player_id].power_on() - elif cmd == 'volume': - self._squeeze_players[player_id].volume_set(try_parse_int(cmd_args)) - elif cmd == 'mute' and cmd_args == 'off': - self._squeeze_players[player_id].unmute() - elif cmd == 'mute': - self._squeeze_players[player_id].mute() - - async def play_media(self, player_id, media_items, queue_opt='play'): - ''' - play media on a player - ''' - player = await self.get_player(player_id) - cur_index = player.cur_queue_index - - if queue_opt == 'replace' or not player.queue: - # overwrite queue with new items - player.queue = media_items - await self.__queue_play(player_id, 0, send_flush=True) - elif queue_opt == 'play': - # replace current item with new item(s) - player.queue = player.queue[player_id][:cur_index] + media_items + player.queue[player_id][cur_index+1:] - await self.__queue_play(player_id, cur_index, send_flush=True) - elif queue_opt == 'next': - # insert new items at current index +1 - player.queue[player_id] = player.queue[player_id][:cur_index+1] + media_items + player.queue[player_id][cur_index+1:] - elif queue_opt == 'add': - # add new items at end of queue - player.queue[player_id] = player.queue[player_id] + media_items - - ### Provider specific (helper) methods ##### - - async def __queue_play(self, player_id, index, send_flush=False): - ''' send play command to player ''' - if not player_id in player.queue or not player_id in player.queue_index: - return - if not player.queue[player_id]: - return - if index == None: - index = player.queue_index[player_id] - if len(player.queue[player_id]) >= index: - track = player.queue[player_id][index] - if send_flush: - self._squeeze_players[player_id].flush() - self._squeeze_players[player_id].play(track.uri) - player.queue_index[player_id] = index - - async def __queue_next(self, player_id): - ''' request next track from queue ''' - if not player_id in player.queue or not player_id in player.queue: - return - cur_queue_index = player.queue_index[player_id] - if len(player.queue[player_id]) > cur_queue_index: - new_queue_index = cur_queue_index + 1 - elif self._players[player_id].repeat_enabled: - new_queue_index = 0 - else: - LOGGER.warning("next track requested but no more tracks in queue") - return - return await self.__queue_play(player_id, new_queue_index) - - async def __queue_previous(self, player_id): - ''' request previous track from queue ''' - if not player_id in player.queue: - return - cur_queue_index = player.queue_index[player_id] - if cur_queue_index == 0 and len(player.queue[player_id]) > 1: - new_queue_index = len(player.queue[player_id]) -1 - elif cur_queue_index == 0: - new_queue_index = cur_queue_index - else: - new_queue_index -= 1 - player.queue_index[player_id] = new_queue_index - return await self.__queue_play(player_id, new_queue_index) - - async def __handle_player_event(self, player_id, event, event_data=None): - ''' handle event from player ''' - if not player_id: - return - LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data))) - Squeeze_player = self._squeeze_players[player_id] - if event == "next_track": - return await self.__queue_next(player_id) - player - if not player_id in self._players: - player = MusicPlayer() - player.player_id = player_id - player.player_provider = self.prov_id - self._players[player_id] = player - if not player_id in player.queue: - player.queue[player_id] = [] - if not player_id in player.queue_index: - player.queue_index[player_id] = 0 - else: - player = self._players[player_id] - # update player properties - player.name = Squeeze_player.player_name - player.volume_level = Squeeze_player.volume_level - player.cur_time = Squeeze_player._elapsed_seconds - if event == "disconnected": - return await self.mass.player.remove_player(player_id) - elif event == "power": - player.powered = event_data - elif event == "state": - player.state = event_data - if player.queue[player_id]: - cur_queue_index = player.queue_index[player_id] - player.cur_item = player.queue[player_id][cur_queue_index] - # update player details - await self.mass.player.update_player(player) - async def __handle_socket_client(self, reader, writer): ''' handle a client connection on the socket''' - LOGGER.debug("new socket client connected") - Squeeze_player = PySqueezePlayer(stream_host, stream_port) - - def send_frame(command, data): - ''' send command to Squeeze player''' - packet = struct.pack('!H', len(data) + 4) + command + data - writer.write(packet) - - def handle_event(event, event_data=None): - ''' handle events from player''' - if event == "connected": - self._squeeze_players[Squeeze_player.player_id] = Squeeze_player - Squeeze_player.player_settings = self.mass.config['player_settings'][Squeeze_player.player_id] - asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, event, event_data)) - - try: - @run_periodic(5) - async def send_heartbeat(): - timestamp = int(time.time()) - data = Squeeze_player.pack_stream(b"t", replayGain=timestamp, flags=0) - Squeeze_player.send_frame(b"strm", data) - - Squeeze_player.send_frame = send_frame - Squeeze_player.send_event = handle_event - heartbeat_task = asyncio.create_task(send_heartbeat()) - + buffer = b'' + player = None + + def client_connected(data): + ''' client connected ''' + (dev_id, rev, mac) = struct.unpack('BB6s', data[:8]) + device_mac = ':'.join("%02x" % x for x in mac) + player_id = str(device_mac).lower() + device_type = devices.get(dev_id, 'unknown device') + player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer) + self.mass.event_loop.create_task(self.mass.player.add_player(player)) + + try: # keep reading bytes from the socket while True: data = await reader.read(64) - if data: - Squeeze_player.dataReceived(data) - else: - break + # handle incoming data from socket + buffer = buffer + data + if len(buffer) > 8: + operation, length = buffer[:4], buffer[4:8] + plen = struct.unpack('!I', length)[0] + 8 + if len(buffer) >= plen: + packet, buffer = buffer[8:plen], buffer[plen:] + operation = operation.strip(b"!").strip().decode() + if operation == 'HELO': + client_connected(packet) + elif player: + _handler = getattr(player, "process_%s" % operation, None) + if _handler: + _handler(packet) + except Exception as exc: # connection lost ? LOGGER.warning(exc) # disconnect - heartbeat_task.cancel() - asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, 'disconnected')) - + await self.mass.player.remove_player(player) class PySqueezePlayer(Player): ''' Squeezebox socket client ''' - def __init__(self, stream_host, stream_port): + def __init__(self, mass, player_id, prov_id, dev_type, writer): + super().__init__(mass, player_id, prov_id) + self.supports_queue = True + self.supports_gapless = True + self.supports_crossfade = True + self.supports_replay_gain = True + self._writer = writer self.buffer = b'' - #self.display = Display() - self.send_frame = None - self.send_event = None - self.stream_host = stream_host - self.stream_port = stream_port - self.player_settings = {} - self.playback_millis = 0 + self.name = "%s - %s" %(dev_type, player_id) self._volume = PySqueezeVolume() - self._device_type = None - self._mac_address = None - self._player_name = None self._last_volume = 0 self._last_heartbeat = 0 - self._elapsed_seconds = 0 - self._elapsed_milliseconds = 0 - - @property - def player_name(self): - if self._player_name: - return self._player_name - return "%s - %s" %(self._device_type, self._mac_address) - - @property - def player_id(self): - return self._mac_address - - @property - def volume_level(self): - return self._volume.volume - - def dataReceived(self, data): - self.buffer = self.buffer + data - if len(self.buffer) > 8: - operation, length = self.buffer[:4], self.buffer[4:8] - length = struct.unpack('!I', length)[0] - plen = length + 8 - if len(self.buffer) >= plen: - packet, self.buffer = self.buffer[8:plen], self.buffer[plen:] - operation = operation.strip(b"!").strip().decode() - #LOGGER.info("operation: %s" % operation) - handler = getattr(self, "process_%s" % operation, None) - if handler is None: - raise NotImplementedError - handler(packet) - - def send_version(self): - self.send_frame(b'vers', b'7.8') + self._cur_time_milliseconds = 0 + # initialize player + self.send_version() + self.setBrightness() + #self.set_visualisation(SpectrumAnalyser()) + #self.display = Display() + self.send_frame(b"setd", struct.pack("B", 0)) + self.send_frame(b"setd", struct.pack("B", 4)) - def pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200, - spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0, - replayGain=0, serverPort = 8095, serverIp = 0): - return struct.pack("!cccccccBcBcBBBLHL", - command, autostart, formatbyte, *pcmargs, - threshold, spdif, transDuration, transType, - flags, outputThreshold, 0, replayGain, serverPort, serverIp) + # TODO: remember last volume and power state + self.mass.event_loop.create_task(self.volume_set(40)) + self.mass.event_loop.create_task(self.power_off()) + self._heartbeat_task = asyncio.create_task(self.__send_heartbeat()) - def stop(self): + async def cmd_stop(self): + ''' send stop command to player ''' data = self.pack_stream(b"q", autostart=b"0", flags=0) self.send_frame(b"strm", data) - def flush(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - - def pause(self): - data = self.pack_stream(b"p", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - LOGGER.info("Sending pause request") - - def unpause(self): + async def cmd_play(self): + ''' send play (unpause) command to player ''' data = self.pack_stream(b"u", autostart=b"0", flags=0) self.send_frame(b"strm", data) - LOGGER.info("Sending unpause request") - def next(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - self.send_event("next_track") - - def previous(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) + async def cmd_pause(self): + ''' send pause command to player ''' + data = self.pack_stream(b"p", autostart=b"0", flags=0) self.send_frame(b"strm", data) - self.send_event("previous_track") - - def power_on(self): + + async def cmd_power_on(self): + ''' [MUST OVERRIDE] send power ON command to player ''' self.send_frame(b"aude", struct.pack("2B", 1, 1)) - self.send_event("power", True) + self.powered = True - def power_off(self): - self.stop() + async def cmd_power_off(self): + ''' [MUST OVERRIDE] send power TOGGLE command to player ''' + await self.cmd_stop() self.send_frame(b"aude", struct.pack("2B", 0, 0)) - self.send_event("power", False) + self.powered = False - def mute_on(self): - self.send_frame(b"aude", struct.pack("2B", 0, 0)) - self.send_event("mute", True) + async def cmd_volume_set(self, volume_level): + ''' [MUST OVERRIDE] send new volume level command to player ''' + self._volume.volume = volume_level + og = self._volume.old_gain() + ng = self._volume.new_gain() + self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng)) + self.volume_level = volume_level + + async def cmd_volume_mute(self, is_muted=False): + ''' [MUST OVERRIDE] send mute command to player ''' + if is_muted: + self.send_frame(b"aude", struct.pack("2B", 0, 0)) + else: + self.send_frame(b"aude", struct.pack("2B", 1, 1)) + self.muted = is_muted - def mute_off(self): - self.send_frame(b"aude", struct.pack("2B", 1, 1)) - self.send_event("mute", False) + async def cmd_queue_play_index(self, index:int): + ''' + play item at index X on player's queue + :attrib index: (int) index of the queue item that should start playing + ''' + new_track = await self.queue.get_item(index) + self.flush() + self.play(new_track.uri) + + async def cmd_queue_load(self, queue_items): + ''' + load/overwrite given items in the player's own queue implementation + :param queue_items: a list of QueueItems + ''' + self.flush() + self.play(queue_items[0].uri) - def volume_up(self): - self._volume.increment() - self.send_volume() + async def cmd_queue_insert(self, queue_items, offset=0): + ''' nothing to do, handled by built-in queue ''' + pass - def volume_down(self): - self._volume.decrement() - self.send_volume() + async def cmd_queue_append(self, queue_items): + ''' nothing to do, handled by built-in queue ''' + pass - def volume_set(self, new_vol): - self._volume.volume = new_vol - self.send_volume() + async def cmd_play_uri(self, uri:str): + ''' + [MUST OVERRIDE] + tell player to start playing a single uri + ''' + self.flush() + self.play(uri) + + def flush(self): + data = self.pack_stream(b"f", autostart=b"0", flags=0) + self.send_frame(b"strm", data) def play(self, uri): - enable_crossfade = self.player_settings["crossfade_duration"] > 0 + enable_crossfade = self.settings["crossfade_duration"] > 0 command = b's' autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers transType= b'1' if enable_crossfade else b'0' - transDuration = self.player_settings["crossfade_duration"] + transDuration = self.settings["crossfade_duration"] formatbyte = b'f' # fixed to flac uri = '/stream' + uri.split('/stream')[1] data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration) - headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.stream_host, self.stream_port) + headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.mass.web.local_ip, self.mass.web.http_port) request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers) data = data + request.encode("utf-8") self.send_frame(b'strm', data) LOGGER.info("Requesting play from squeezebox" ) + def __delete__(self, instance): + ''' make sure the heartbeat task is deleted ''' + self._heartbeat_task.cancel() + + @run_periodic(5) + async def __send_heartbeat(self): + ''' send periodic heartbeat message to player ''' + timestamp = int(time.time()) + data = self.pack_stream(b"t", replayGain=timestamp, flags=0) + self.send_frame(b"strm", data) + + def send_frame(self, command, data): + ''' send command to Squeeze player''' + packet = struct.pack('!H', len(data) + 4) + command + data + self._writer.write(packet) + + def send_version(self): + self.send_frame(b'vers', b'7.8') + + def pack_stream(self, command, autostart=b"1", formatbyte = b'o', + pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200, + spdif = b'0', transDuration = 0, transType = b'0', + flags = 0x40, outputThreshold = 0, + replayGain=0, serverPort = 8095, serverIp = 0): + return struct.pack("!cccccccBcBcBBBLHL", + command, autostart, formatbyte, *pcmargs, + threshold, spdif, transDuration, transType, + flags, outputThreshold, 0, replayGain, serverPort, serverIp) + + def displayTrack(self, track): self.render("%s by %s" % (track.title, track.artist)) - - def process_HELO(self, data): - (devId, rev, mac) = struct.unpack('BB6s', data[:8]) - device_mac = ':'.join("%02x" % x for x in mac) - self._device_type = devices.get(devId, 'unknown device') - self._mac_address = str(device_mac).lower() - LOGGER.debug("HELO received from %s %s" % (self._mac_address, self._device_type)) - self.init_client() - - def init_client(self): - ''' initialize a new connected client ''' - self.send_event("connected") - self.send_version() - self.stop() - self.setBrightness() - #self.set_visualisation(SpectrumAnalyser()) - self.send_frame(b"setd", struct.pack("B", 0)) - self.send_frame(b"setd", struct.pack("B", 4)) - self.power_on() - self.volume_set(40) # TODO: remember last volume - def send_volume(self): - og = self._volume.old_gain() - ng = self._volume.new_gain() - LOGGER.info("Volume set to %d (%d/%d)" % (self._volume.volume, og, ng)) - d = self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng)) - self.send_event("volume", self._volume.volume) - def setBrightness(self, level=4): assert 0 <= level <= 4 self.send_frame(b"grfb", struct.pack("!H", level)) @@ -424,12 +281,13 @@ class PySqueezePlayer(Player): def stat_aude(self, data): (spdif_enable, dac_enable) = struct.unpack("2B", data[:4]) powered = spdif_enable or dac_enable - self.send_event("power", powered) + self.powered = powered + self.muted = not powered LOGGER.debug("ACK aude - Received player power: %s" % powered) def stat_audg(self, data): - LOGGER.info("Received volume_level from player %s" % data) - self.send_event("volume", self._volume.volume) + LOGGER.debug("Received volume_level from player %s" % data) + self.volume_level = self._volume.volume def stat_strm(self, data): LOGGER.debug("ACK strm") @@ -440,14 +298,15 @@ class PySqueezePlayer(Player): def stat_STMd(self, data): LOGGER.debug("Decoder Ready for next track") - self.send_event("next_track") + next_item = self.queue.next_item + self.play(next_item.uri) def stat_STMe(self, data): LOGGER.info("Connection established") def stat_STMf(self, data): LOGGER.info("Status Message: Connection closed") - self.send_event("state", PlayerState.Stopped) + self.state = PlayerState.Stopped def stat_STMh(self, data): LOGGER.info("Status Message: End of headers") @@ -461,38 +320,36 @@ class PySqueezePlayer(Player): def stat_STMp(self, data): '''Pause confirmed''' - self.send_event("state", PlayerState.Paused) + self.state = PlayerState.Paused def stat_STMr(self, data): '''Resume confirmed''' - self.send_event("state", PlayerState.Playing) + self.state = PlayerState.Playing def stat_STMs(self, data): '''Playback of new track has started''' - self.send_event("state", PlayerState.Playing) + self.state = PlayerState.Playing def stat_STMt(self, data): """ heartbeat from client """ timestamp = time.time() self._last_heartbeat = timestamp (num_crlf, mas_initialized, mas_mode, rptr, wptr, - bytes_received_h, bytes_received_l, signal_strength, - jiffies, output_buffer_size, output_buffer_fullness, - elapsed_seconds, voltage, elapsed_milliseconds, - server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data) - if elapsed_seconds != self._elapsed_seconds: - self.send_event("progress") - self._elapsed_seconds = elapsed_seconds - self._elapsed_milliseconds = elapsed_milliseconds + bytes_received_h, bytes_received_l, signal_strength, + jiffies, output_buffer_size, output_buffer_fullness, + elapsed_seconds, voltage, cur_time_milliseconds, + server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data) + if elapsed_seconds != self.cur_time: + self.cur_time = elapsed_seconds + self._cur_time_milliseconds = cur_time_milliseconds def stat_STMu(self, data): '''Normal end of playback''' LOGGER.info("End of playback - Underrun") - self.send_event("state", PlayerState.Stopped) + self.state = PlayerState.Stopped def process_BYE(self, data): LOGGER.info("BYE received") - self.send_event("disconnected") def process_RESP(self, data): LOGGER.info("RESP received") @@ -539,13 +396,12 @@ class PySqueezePlayer(Player): def process_SETD(self, data): ''' Get/set player firmware settings ''' - LOGGER.debug("SETD received %s" % data) + LOGGER.info("SETD received %s" % data) cmd_id = data[0] if cmd_id == 0: # received player name data = data[1:].decode() - self._player_name = data - self.send_event("name") + self.name = data def process_UREQ(self, data): LOGGER.info("UREQ received")