From 314700377856abd5f19cfa5baf029d184923bbf4 Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Fri, 25 Oct 2019 19:37:37 +0200 Subject: [PATCH] convert squeezebox to fully async --- music_assistant/database.py | 10 +- music_assistant/http_streamer.py | 9 +- music_assistant/models/musicprovider.py | 4 +- music_assistant/music_manager.py | 7 +- music_assistant/playerproviders/squeezebox.py | 218 ++++++++---------- music_assistant/web/pages/albumdetails.vue.js | 6 +- 6 files changed, 108 insertions(+), 146 deletions(-) diff --git a/music_assistant/database.py b/music_assistant/database.py index fa8a228d..1a2f5544 100755 --- a/music_assistant/database.py +++ b/music_assistant/database.py @@ -191,7 +191,7 @@ class Database(): async with db.execute('SELECT (playlist_id) FROM playlists WHERE name=? AND owner=?;', (playlist.name,playlist.owner)) as cursor: playlist_id = await cursor.fetchone() playlist_id = playlist_id[0] - LOGGER.info('added playlist %s to database: %s' %(playlist.name, playlist_id)) + LOGGER.debug('added playlist %s to database: %s' %(playlist.name, playlist_id)) # add/update metadata await self.__add_prov_ids(playlist_id, MediaType.Playlist, playlist.provider_ids, db) await self.__add_metadata(playlist_id, MediaType.Playlist, playlist.metadata, db) @@ -215,7 +215,7 @@ class Database(): async with db.execute('SELECT (radio_id) FROM radios WHERE name=?;', (radio.name,)) as cursor: radio_id = await cursor.fetchone() radio_id = radio_id[0] - LOGGER.info('added radio station %s to database: %s' %(radio.name, radio_id)) + LOGGER.debug('added radio station %s to database: %s' %(radio.name, radio_id)) # add/update metadata await self.__add_prov_ids(radio_id, MediaType.Radio, radio.provider_ids, db) await self.__add_metadata(radio_id, MediaType.Radio, radio.metadata, db) @@ -320,7 +320,7 @@ class Database(): await self.__add_external_ids(artist_id, MediaType.Artist, artist.external_ids, db) # save await db.commit() - LOGGER.info('added artist %s (%s) to database: %s' %(artist.name, artist.provider_ids, artist_id)) + LOGGER.debug('added artist %s (%s) to database: %s' %(artist.name, artist.provider_ids, artist_id)) return artist_id async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Album]: @@ -404,7 +404,7 @@ class Database(): await self.__add_external_ids(album_id, MediaType.Album, album.external_ids, db) # save await db.commit() - LOGGER.info('added album %s (%s) to database: %s' %(album.name, album.provider_ids, album_id)) + LOGGER.debug('added album %s (%s) to database: %s' %(album.name, album.provider_ids, album_id)) return album_id async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Track]: @@ -491,7 +491,7 @@ class Database(): await self.__add_external_ids(track_id, MediaType.Track, track.external_ids, db) # save to db await db.commit() - LOGGER.info('added track %s (%s) to database: %s' %(track.name, track.provider_ids, track_id)) + LOGGER.debug('added track %s (%s) to database: %s' %(track.name, track.provider_ids, track_id)) return track_id async def update_track(self, track_id, column_key, column_value): diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index ca19b193..b4ea84dd 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -99,8 +99,7 @@ class HTTPStreamer(): if fade_length: fade_bytes = int(sample_rate * 4 * 2 * fade_length) else: - fade_length = 1 - fade_bytes = int(sample_rate * 4 * 2) + fade_bytes = int(sample_rate * 4 * 2 * 6) pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate args = 'sox -t %s - -t flac -C 0 -' % pcm_args # start sox process @@ -109,9 +108,8 @@ class HTTPStreamer(): stdout=subprocess.PIPE, stdin=subprocess.PIPE) def fill_buffer(): - chunk_size = int(sample_rate * 4 * 2) while True: - chunk = sox_proc.stdout.read(chunk_size) + chunk = sox_proc.stdout.read(128000) if not chunk: break if chunk and not cancelled.is_set(): @@ -268,8 +266,11 @@ class HTTPStreamer(): ''' get audio stream from provider and apply additional effects/processing where/if needed''' # get stream details from provider # sort by quality and check track availability + streamdetails = None for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True): + if not prov_media['provider'] in self.mass.music.providers: + continue streamdetails = self.mass.run_task( self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), wait_for_result=True) diff --git a/music_assistant/models/musicprovider.py b/music_assistant/models/musicprovider.py index 67192c74..f40855c7 100755 --- a/music_assistant/models/musicprovider.py +++ b/music_assistant/models/musicprovider.py @@ -257,7 +257,7 @@ class MusicProvider(): searchstr = "%s - %s %s" %(searchalbum.artist.name, searchalbum.name, searchalbum.version) search_results = await self.search(searchstr, [MediaType.Album], limit=5) for item in search_results["albums"]: - if item.name == searchalbum.name and item.version == searchalbum.version and item.artist.name == searchalbum.artist.name: + if item and item.name == searchalbum.name and item.version == searchalbum.version and item.artist.name == searchalbum.artist.name: # just load this item in the database, it will be matched automagically ;-) await self.album(item.item_id, lazy=False) @@ -267,7 +267,7 @@ class MusicProvider(): searchartists = [item.name for item in searchtrack.artists] search_results = await self.search(searchstr, [MediaType.Track], limit=5) for item in search_results["tracks"]: - if item.name == searchtrack.name and item.version == searchtrack.version and item.album.name == searchtrack.album.name: + if item and item.name == searchtrack.name and item.version == searchtrack.version and item.album.name == searchtrack.album.name: # double safety check - artist must match exactly ! for artist in item.artists: if artist.name in searchartists: diff --git a/music_assistant/music_manager.py b/music_assistant/music_manager.py index 641eab9a..632ae3f4 100755 --- a/music_assistant/music_manager.py +++ b/music_assistant/music_manager.py @@ -168,7 +168,7 @@ class MusicManager(): playlist = None if not provider or provider == 'database': playlist = await self.mass.db.playlist(playlist_id) - if playlist and playlist.is_editable: + if playlist: # database synced playlist, return tracks from db... return await self.mass.db.playlist_tracks( playlist.item_id, offset=offset, limit=limit) @@ -345,9 +345,8 @@ class MusicManager(): cur_db_ids.append(db_id) if not db_id in prev_db_ids: await self.mass.db.add_to_library(db_id, MediaType.Playlist, prov_id) - if item.is_editable: - # precache/sync playlist tracks (user owned playlists only) - asyncio.create_task( self.sync_playlist_tracks(db_id, prov_id, item.item_id) ) + # sync playlist tracks + await self.sync_playlist_tracks(db_id, prov_id, item.item_id) # process playlist deletions for db_id in prev_db_ids: if db_id not in cur_db_ids: diff --git a/music_assistant/playerproviders/squeezebox.py b/music_assistant/playerproviders/squeezebox.py index cf012eb9..61a4970b 100644 --- a/music_assistant/playerproviders/squeezebox.py +++ b/music_assistant/playerproviders/squeezebox.py @@ -60,7 +60,6 @@ class PySqueezeProvider(PlayerProvider): ''' handle a client connection on the socket''' buffer = b'' player = None - try: # keep reading bytes from the socket while True: @@ -84,10 +83,9 @@ class PySqueezeProvider(PlayerProvider): player_id = str(device_mac).lower() device_type = devices.get(dev_id, 'unknown device') player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer) - self.mass.event_loop.create_task(self.mass.players.add_player(player)) + await self.mass.players.add_player(player) elif player != None: - player.process_msg(operation, packet) - + await player.process_msg(operation, packet) except Exception as exc: # connection lost ? LOGGER.debug(exc) @@ -97,6 +95,7 @@ class PySqueezeProvider(PlayerProvider): if player._heartbeat_task: player._heartbeat_task.cancel() await self.mass.players.remove_player(player.player_id) + self.mass.config.save() class PySqueezePlayer(Player): ''' Squeezebox socket client ''' @@ -114,42 +113,53 @@ class PySqueezePlayer(Player): self._last_heartbeat = 0 self._cur_time_milliseconds = 0 # initialize player - self.send_version() - self.setBrightness() - #self.set_visualisation(SpectrumAnalyser()) - #self.display = Display() - self.send_frame(b"setd", struct.pack("B", 0)) - self.send_frame(b"setd", struct.pack("B", 4)) - - # TODO: remember last volume and power state - self.mass.event_loop.create_task(self.volume_set(40)) - self.mass.event_loop.create_task(self.power_off()) - self._heartbeat_task = asyncio.create_task(self.__send_heartbeat()) + self.mass.event_loop.create_task(self.initialize_player()) + self._heartbeat_task = self.mass.event_loop.create_task(self.__send_heartbeat()) + + async def initialize_player(self): + ''' set some startup settings for the player ''' + # send version + await self.__send_frame(b'vers', b'7.8') + await self.__send_frame(b"setd", struct.pack("B", 0)) + await self.__send_frame(b"setd", struct.pack("B", 4)) + # TODO: handle display stuff + #await self.setBrightness() + # restore last volume and power state + if self.settings.get("last_volume"): + await self.volume_set(self.settings["last_volume"]) + else: + await self.volume_set(40) + if self.settings.get("last_power"): + await self.power(self.settings["last_power"]) + else: + await self.power_off() async def cmd_stop(self): ''' send stop command to player ''' - data = self.pack_stream(b"q", autostart=b"0", flags=0) - self.send_frame(b"strm", data) + data = await self.__pack_stream(b"q", autostart=b"0", flags=0) + await self.__send_frame(b"strm", data) async def cmd_play(self): ''' send play (unpause) command to player ''' - data = self.pack_stream(b"u", autostart=b"0", flags=0) - self.send_frame(b"strm", data) + data = await self.__pack_stream(b"u", autostart=b"0", flags=0) + await self.__send_frame(b"strm", data) async def cmd_pause(self): ''' send pause command to player ''' - data = self.pack_stream(b"p", autostart=b"0", flags=0) - self.send_frame(b"strm", data) + data = await self.__pack_stream(b"p", autostart=b"0", flags=0) + await self.__send_frame(b"strm", data) async def cmd_power_on(self): ''' send power ON command to player ''' - self.send_frame(b"aude", struct.pack("2B", 1, 1)) + await self.__send_frame(b"aude", struct.pack("2B", 1, 1)) + self.settings["last_power"] = True self.powered = True async def cmd_power_off(self): ''' send power TOGGLE command to player ''' await self.cmd_stop() - self.send_frame(b"aude", struct.pack("2B", 0, 0)) + await self.__send_frame(b"aude", struct.pack("2B", 0, 0)) + self.settings["last_power"] = False self.powered = False async def cmd_volume_set(self, volume_level): @@ -157,15 +167,16 @@ class PySqueezePlayer(Player): self._volume.volume = volume_level og = self._volume.old_gain() ng = self._volume.new_gain() - self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng)) + await self.__send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng)) + self.settings["last_volume"] = volume_level self.volume_level = volume_level async def cmd_volume_mute(self, is_muted=False): ''' send mute command to player ''' if is_muted: - self.send_frame(b"aude", struct.pack("2B", 0, 0)) + await self.__send_frame(b"aude", struct.pack("2B", 0, 0)) else: - self.send_frame(b"aude", struct.pack("2B", 1, 1)) + await self.__send_frame(b"aude", struct.pack("2B", 1, 1)) self.muted = is_muted async def cmd_queue_play_index(self, index:int): @@ -175,7 +186,7 @@ class PySqueezePlayer(Player): ''' new_track = await self.queue.get_item(index) if new_track: - self.__send_flush() + await self.__send_flush() await self.__send_play(new_track.uri) async def cmd_queue_load(self, queue_items): @@ -183,7 +194,7 @@ class PySqueezePlayer(Player): load/overwrite given items in the player's own queue implementation :param queue_items: a list of QueueItems ''' - self.__send_flush() + await self.__send_flush() await self.__send_play(queue_items[0].uri) async def cmd_queue_insert(self, queue_items, insert_at_index): @@ -200,12 +211,12 @@ class PySqueezePlayer(Player): [MUST OVERRIDE] tell player to start playing a single uri ''' - self.__send_flush() + await self.__send_flush() await self.__send_play(uri) - def __send_flush(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) - self.send_frame(b"strm", data) + async def __send_flush(self): + data = await self.__pack_stream(b"f", autostart=b"0", flags=0) + await self.__send_frame(b"strm", data) async def __send_play(self, uri): ''' play uri ''' @@ -218,13 +229,13 @@ class PySqueezePlayer(Player): transDuration = self.settings["crossfade_duration"] formatbyte = b'f' # fixed to flac uri = '/stream' + uri.split('/stream')[1] - data = self.pack_stream(command, autostart=autostart, flags=0x00, + data = await self.__pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration) headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.mass.web.local_ip, self.mass.web.http_port) request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers) data = data + request.encode("utf-8") - self.send_frame(b'strm', data) + await self.__send_frame(b'strm', data) LOGGER.info("Requesting play from squeezebox" ) def __delete__(self, instance): @@ -236,19 +247,16 @@ class PySqueezePlayer(Player): async def __send_heartbeat(self): ''' send periodic heartbeat message to player ''' timestamp = int(time.time()) - data = self.pack_stream(b"t", replayGain=timestamp, flags=0) - self.send_frame(b"strm", data) + data = await self.__pack_stream(b"t", replayGain=timestamp, flags=0) + await self.__send_frame(b"strm", data) - def send_frame(self, command, data): + async def __send_frame(self, command, data): ''' send command to Squeeze player''' packet = struct.pack('!H', len(data) + 4) + command + data self._writer.write(packet) - self.mass.event_loop.create_task(self._writer.drain()) + await self._writer.drain() - def send_version(self): - self.send_frame(b'vers', b'7.8') - - def pack_stream(self, command, autostart=b"1", formatbyte = b'o', + async def __pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200, spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0, @@ -258,99 +266,85 @@ class PySqueezePlayer(Player): threshold, spdif, transDuration, transType, flags, outputThreshold, 0, replayGain, serverPort, serverIp) - def displayTrack(self, track): - self.render("%s by %s" % (track.title, track.artist)) + async def displayTrack(self, track): + await self.render("%s by %s" % (track.title, track.artist)) - def setBrightness(self, level=4): + async def setBrightness(self, level=4): assert 0 <= level <= 4 - self.send_frame(b"grfb", struct.pack("!H", level)) + await self.__send_frame(b"grfb", struct.pack("!H", level)) - def set_visualisation(self, visualisation): - self.send_frame(b"visu", visualisation.pack()) + async def set_visualisation(self, visualisation): + await self.__send_frame(b"visu", visualisation.pack()) - def render(self, text): + async def render(self, text): #self.display.clear() #self.display.renderText(text, "DejaVu-Sans", 16, (0,0)) #self.updateDisplay(self.display.frame()) pass - def updateDisplay(self, bitmap, transition = 'c', offset=0, param=0): + async def updateDisplay(self, bitmap, transition = 'c', offset=0, param=0): frame = struct.pack("!Hcb", offset, transition, param) + bitmap - self.send_frame(b"grfe", frame) + await self.__send_frame(b"grfe", frame) - def process_msg(self, operation, packet): + async def process_msg(self, operation, packet): handler = getattr(self, "process_%s" % operation, None) if handler is None: LOGGER.error("No handler for %s" % operation) else: - handler(packet) - - def process_STAT(self, data): - ev = data[:4] - if ev == b'\x00\x00\x00\x00': - LOGGER.info("Presumed informational stat message") + await handler(packet) + + async def process_STAT(self, data): + '''process incoming event from player''' + event = data[:4].decode() + event_data = data[4:] + if event == b'\x00\x00\x00\x00': + # Presumed informational stat message + return + event_handler = getattr(self, 'stat_%s' %event, None) + if event_handler is None: + LOGGER.debug("Got event %s - event_data: %s" %(event, event_data)) else: - handler = getattr(self, 'stat_%s' % ev.decode(), None) - if handler is None: - raise NotImplementedError("Stat message %r not known" % ev) - handler(data[4:]) + await event_handler(data[4:]) - def stat_aude(self, data): + async def stat_aude(self, data): (spdif_enable, dac_enable) = struct.unpack("2B", data[:4]) powered = spdif_enable or dac_enable self.powered = powered self.muted = not powered LOGGER.debug("ACK aude - Received player power: %s" % powered) - def stat_audg(self, data): + async def stat_audg(self, data): # TODO: process volume level LOGGER.info("Received volume_level from player %s" % data) self.volume_level = self._volume.volume - def stat_strm(self, data): - LOGGER.debug("ACK strm") - #self.send_frame(b"cont", b"0") - - def stat_STMc(self, data): - LOGGER.debug("Status Message: Connect") - - def stat_STMd(self, data): + async def stat_STMd(self, data): LOGGER.debug("Decoder Ready for next track") next_item = self.queue.next_item if next_item: - self.mass.event_loop.create_task( - self.__send_play(next_item.uri)) - - def stat_STMe(self, data): - LOGGER.debug("Connection established") + await self.__send_play(next_item.uri) - def stat_STMf(self, data): + async def stat_STMf(self, data): LOGGER.debug("Status Message: Connection closed") self.state = PlayerState.Stopped - def stat_STMh(self, data): - LOGGER.debug("Status Message: End of headers") - - def stat_STMn(self, data): - LOGGER.error("Decoder does not support file format") - - def stat_STMo(self, data): + async def stat_STMo(self, data): ''' No more decoded (uncompressed) data to play; triggers rebuffering. ''' LOGGER.debug("Output Underrun") - def stat_STMp(self, data): + async def stat_STMp(self, data): '''Pause confirmed''' self.state = PlayerState.Paused - def stat_STMr(self, data): + async def stat_STMr(self, data): '''Resume confirmed''' self.state = PlayerState.Playing - def stat_STMs(self, data): + async def stat_STMs(self, data): '''Playback of new track has started''' self.state = PlayerState.Playing - def stat_STMt(self, data): + async def stat_STMt(self, data): """ heartbeat from client """ timestamp = time.time() self._last_heartbeat = timestamp @@ -363,31 +357,16 @@ class PySqueezePlayer(Player): self.cur_time = elapsed_seconds self._cur_time_milliseconds = cur_time_milliseconds - def stat_STMu(self, data): - '''Normal end of playback''' - LOGGER.debug("End of playback - Underrun") + async def stat_STMu(self, data): + ''' Buffer underrun: Normal end of playback''' self.state = PlayerState.Stopped - def process_BYE(self, data): - LOGGER.debug("BYE received") - - def process_RESP(self, data): + async def process_RESP(self, data): + ''' response received at player, send continue ''' LOGGER.debug("RESP received") - self.send_frame(b"cont", b"0") + await self.__send_frame(b"cont", b"0") - def process_BODY(self, data): - LOGGER.debug("BODY received") - - def process_META(self, data): - LOGGER.debug("META received") - - def process_DSCO(self, data): - LOGGER.debug("Data Stream Disconnected") - - def process_DBUG(self, data): - LOGGER.debug("DBUG received") - - def process_IR(self, data): + async def process_IR(self, data): """ Slightly involved codepath here. This raises an event, which may be picked up by the service and then the process_remote_* function in this player will be called. This is mostly relevant for volume changes @@ -401,20 +380,7 @@ class PySqueezePlayer(Player): # else: # LOGGER.info("Unknown IR received: %r, %r" % (time, code)) - def process_RAWI(self, data): - LOGGER.debug("RAWI received") - - def process_ANIC(self, data): - LOGGER.debug("ANIC received") - - def process_BUTN(self, data): - LOGGER.debug("BUTN received") - - def process_KNOB(self, data): - ''' Transporter only, knob-related ''' - LOGGER.debug("KNOB received") - - def process_SETD(self, data): + async def process_SETD(self, data): ''' Get/set player firmware settings ''' LOGGER.debug("SETD received %s" % data) cmd_id = data[0] @@ -423,10 +389,6 @@ class PySqueezePlayer(Player): data = data[1:].decode() self.name = data - def process_UREQ(self, data): - LOGGER.debug("UREQ received") - - # from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO devices = { diff --git a/music_assistant/web/pages/albumdetails.vue.js b/music_assistant/web/pages/albumdetails.vue.js index 4f60a91e..056bc76a 100755 --- a/music_assistant/web/pages/albumdetails.vue.js +++ b/music_assistant/web/pages/albumdetails.vue.js @@ -63,7 +63,7 @@ var AlbumDetails = Vue.component('AlbumDetails', { methods: { getInfo () { this.$globals.loading = true; - const api_url = '/api/albums/' + this.media_id + const api_url = this.$globals.server + 'api/albums/' + this.media_id axios .get(api_url, { params: { provider: this.provider }}) .then(result => { @@ -77,7 +77,7 @@ var AlbumDetails = Vue.component('AlbumDetails', { }); }, getAlbumTracks () { - const api_url = '/api/albums/' + this.media_id + '/tracks' + const api_url = this.$globals.server + 'api/albums/' + this.media_id + '/tracks' axios .get(api_url, { params: { offset: this.offset, limit: 50, provider: this.provider}}) .then(result => { @@ -90,7 +90,7 @@ var AlbumDetails = Vue.component('AlbumDetails', { }); }, getAlbumVersions () { - const api_url = '/api/search'; + const api_url = this.$globals.server + 'api/search'; var searchstr = this.info.artist.name + " - " + this.info.name axios .get(api_url, { params: { query: searchstr, limit: 50, media_types: 'albums', online: true}}) -- 2.34.1