From ba61fdeee40ed1a7d2b63dba40e156826bdcdd32 Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Sun, 13 Oct 2019 02:41:47 +0200 Subject: [PATCH] fixes fixes for bugs introduces after refactor --- music_assistant/homeassistant.py | 72 +++++++++++------------- music_assistant/http_streamer.py | 48 +++++++++------- music_assistant/models/player.py | 61 +++++++++++++------- music_assistant/models/player_queue.py | 9 +-- music_assistant/musicproviders/tunein.py | 10 +++- music_assistant/player_manager.py | 11 +--- web/components/player.vue.js | 40 +++++++------ 7 files changed, 137 insertions(+), 114 deletions(-) diff --git a/music_assistant/homeassistant.py b/music_assistant/homeassistant.py index 925a7cf9..45730033 100644 --- a/music_assistant/homeassistant.py +++ b/music_assistant/homeassistant.py @@ -58,8 +58,8 @@ class HomeAssistant(): def __init__(self, mass, url, token): self.mass = mass self._published_players = {} - self._tracked_states = {} - self._state_listeners = [] + self._tracked_entities = {} + self._state_listeners = {} self._sources = [] self._token = token if url.startswith('https://'): @@ -81,41 +81,34 @@ class HomeAssistant(): await self.mass.add_event_listener(self.mass_event, "player updated") self.mass.event_loop.create_task(self.__get_sources()) - def get_state_sync(self, entity_id, attribute='state', register_listener=None): + async def get_state_async(self, entity_id, attribute='state'): + ''' get state of a hass entity (async)''' + state = self.get_state(entity_id, attribute) + if not state: + await self.__request_state(entity_id) + state = self.get_state(entity_id, attribute) + return state + + def get_state(self, entity_id, attribute='state'): ''' get state of a hass entity''' - if entity_id in self._tracked_states: - state_obj = self._tracked_states.get(entity_id) - if not state_obj: - return None + state_obj = self._tracked_entities.get(entity_id) + if state_obj: + if attribute == 'state': + return state_obj['state'] + elif attribute: + return state_obj['attributes'].get(attribute) + else: + return state_obj else: - if register_listener: - # register state listener - self._state_listeners.append( (entity_id, register_listener) ) + self.mass.event_loop.create_task(self.__request_state(entity_id)) return None - if attribute == 'state': - return state_obj['state'] - elif not attribute: - return state_obj - else: - return state_obj['attributes'].get(attribute) - - async def get_state(self, entity_id, attribute='state', register_listener=None): + + async def __request_state(self, entity_id): ''' get state of a hass entity''' - if entity_id in self._tracked_states: - state_obj = self._tracked_states[entity_id] - else: - # first request - state_obj = await self.__get_data('states/%s' % entity_id) - if register_listener: - # register state listener - self._state_listeners.append( (entity_id, register_listener) ) - self._tracked_states[entity_id] = state_obj - if attribute == 'state': - return state_obj['state'] - elif not attribute: - return state_obj - else: - return state_obj['attributes'].get(attribute) + state_obj = await self.__get_data('states/%s' % entity_id) + self._tracked_entities[entity_id] = state_obj + self.mass.event_loop.create_task( + self.mass.signal_event("hass entity changed", entity_id)) async def mass_event(self, msg, msg_details): ''' received event from mass ''' @@ -125,11 +118,10 @@ class HomeAssistant(): async def hass_event(self, event_type, event_data): ''' received event from hass ''' if event_type == 'state_changed': - if event_data['entity_id'] in self._tracked_states: - self._tracked_states[event_data['entity_id']] = event_data['new_state'] - for entity_id, handler in self._state_listeners: - if entity_id == event_data['entity_id']: - self.mass.event_loop.create_task(handler()) + if event_data['entity_id'] in self._tracked_entities: + self._tracked_entities[event_data['entity_id']] = event_data['new_state'] + self.mass.event_loop.create_task( + self.mass.signal_event("hass entity changed", event_data['entity_id'])) elif event_type == 'call_service' and event_data['domain'] == 'media_player': await self.__handle_player_command(event_data['service'], event_data['service_data']) @@ -286,8 +278,8 @@ class HomeAssistant(): elif data['type'] == 'result' and data.get('result'): # reply to our get_states request asyncio.create_task(self.hass_event('all_states', data['result'])) - else: - LOGGER.info(data) + # else: + # LOGGER.info(data) elif msg.type == aiohttp.WSMsgType.ERROR: raise Exception("error in websocket") except Exception as exc: diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 9fcf3717..5d9b819a 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -128,8 +128,7 @@ class HTTPStreamer(): if not queue_track: LOGGER.warning("no (more) tracks left in queue") break - LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) - LOGGER.info(player.state) + LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) fade_in_part = b'' cur_chunk = 0 prev_chunk = None @@ -212,14 +211,14 @@ class HTTPStreamer(): # end of the track reached if cancelled.is_set(): # break out the loop if the http session is cancelled - LOGGER.warning("session cancelled") + LOGGER.debug("session cancelled") break else: # WIP: update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / int(sample_rate * 4 * 2) queue_track.duration = accurate_duration LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) - LOGGER.info("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) + LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) # end of queue reached, pass last fadeout bits to final output if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) @@ -231,19 +230,29 @@ class HTTPStreamer(): async def __get_audio_stream(self, player, queue_item, cancelled, chunksize=512000, resample=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' + # get stream details from provider + # sort by quality and check track availability + for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True): + streamdetails = asyncio.run_coroutine_threadsafe( + self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), + self.mass.event_loop).result() + if streamdetails: + queue_item.streamdetails = streamdetails + queue_item.item_id = prov_media['item_id'] + queue_item.provider = prov_media['provider'] + queue_item.quality = prov_media['quality'] + break + if not streamdetails: + LOGGER.warning("no stream details!") + yield (True, b'') + return + # get sox effects and resample options sox_effects = await self.__get_player_sox_options(player, queue_item) outputfmt = 'flac -C 0' if resample: outputfmt = 'raw -b 32 -c 2 -e signed-integer' sox_effects += ' rate -v %s' % resample - # stream audio from provider - streamdetails = asyncio.run_coroutine_threadsafe( - self.mass.music.providers[queue_item.provider].get_stream_details(queue_item.item_id), - self.mass.event_loop).result() - if not streamdetails: - LOGGER.warning("no stream details!") - yield (True, b'') - return + # determine how to proceed based on input file ype if streamdetails["content_type"] == 'aac': # support for AAC created with ffmpeg in between args = 'ffmpeg -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects) @@ -253,8 +262,7 @@ class HTTPStreamer(): elif streamdetails['type'] == 'executable': args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, sox_effects) - - LOGGER.info("Running sox with args: %s" % args) + # start sox process process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) # fire event that streaming has started for this track (needed by some streaming providers) @@ -288,9 +296,9 @@ class HTTPStreamer(): process.terminate() except ProcessLookupError: pass - LOGGER.warning("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) + LOGGER.debug("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) else: - LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) + LOGGER.debug("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) # fire event that streaming has ended for this track (needed by some streaming providers) if resample: bytes_per_second = resample * (32/8) * 2 @@ -302,9 +310,11 @@ class HTTPStreamer(): self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop) # send task to background to analyse the audio - asyncio.run_coroutine_threadsafe( - self.__analyze_audio(queue_item.item_id, queue_item.provider), - self.mass.event_loop) + # TODO: send audio data completely + if not queue_item.media_type == MediaType.Radio: + asyncio.run_coroutine_threadsafe( + self.__analyze_audio(queue_item.item_id, queue_item.provider), + self.mass.event_loop) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 49e0cd75..dbbf0d9e 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -111,11 +111,16 @@ class Player(): self._muted = False self._group_parent = None self._queue = PlayerQueue(mass, self) + self._player_settings = None # public attributes self.supports_queue = True # has native support for a queue self.supports_gapless = True # has native gapless support self.supports_crossfade = False # has native crossfading support self.supports_replay_gain = False # has native support for replaygain volume leveling + # if home assistant support is enabled, register state listener + if self.mass.hass: + self.mass.event_loop.create_task( + self.mass.add_event_listener(self.hass_state_listener, "hass entity changed")) @property def player_id(self): @@ -177,17 +182,15 @@ class Player(): def powered(self): ''' [PROTECTED] return power state for this player ''' # homeassistant integration - if self.mass.hass and self.settings.get('hass_power_entity') and self.settings.get('hass_power_entity_source'): - hass_state = self.mass.hass.get_state_sync( + if (self.mass.hass and self.settings.get('hass_power_entity') and + self.settings.get('hass_power_entity_source')): + hass_state = self.mass.hass.get_state( self.settings['hass_power_entity'], - attribute='source', - register_listener=self.update) + attribute='source') return hass_state == self.settings['hass_power_entity_source'] elif self.mass.hass and self.settings.get('hass_power_entity'): - hass_state = self.mass.hass.get_state_sync( - self.settings['hass_power_entity'], - attribute='state', - register_listener=self.update) + hass_state = self.mass.hass.get_state( + self.settings['hass_power_entity']) return hass_state != 'off' # mute as power elif self.settings.get('mute_as_power'): @@ -252,10 +255,9 @@ class Player(): return group_volume # handle hass integration elif self.mass.hass and self.settings.get('hass_volume_entity'): - hass_state = self.mass.hass.get_state_sync( + hass_state = self.mass.hass.get_state( self.settings['hass_volume_entity'], - attribute='volume_level', - register_listener=self.update) + attribute='volume_level') return int(try_parse_float(hass_state)*100) else: return self._volume_level @@ -394,12 +396,15 @@ class Player(): if self.settings['mute_as_power']: await self.volume_mute(False) # handle hass integration - if self.mass.hass and self.settings.get('hass_power_entity') and self.settings.get('hass_power_entity_source'): - cur_source = await self.mass.hass.get_state(self.settings['hass_power_entity'], attribute='source') + if (self.mass.hass and + self.settings.get('hass_power_entity') and + self.settings.get('hass_power_entity_source')): + cur_source = await self.mass.hass.get_state_async( + self.settings['hass_power_entity'], attribute='source') if not cur_source: service_data = { 'entity_id': self.settings['hass_power_entity'], - 'source':self.settings['hass_power_entity_source'] + 'source': self.settings['hass_power_entity_source'] } await self.mass.hass.call_service('media_player', 'select_source', service_data) elif self.settings.get('hass_power_entity'): @@ -423,15 +428,18 @@ class Player(): if self.settings['mute_as_power']: await self.volume_mute(True) # handle hass integration - if self.mass.hass and self.settings.get('hass_power_entity') and self.settings.get('hass_power_entity_source'): - cur_source = await self.mass.hass.get_state(self.settings['hass_power_entity'], attribute='source') + if (self.mass.hass and + self.settings.get('hass_power_entity') and + self.settings.get('hass_power_entity_source')): + cur_source = await self.mass.hass.get_state_async( + self.settings['hass_power_entity'], attribute='source') if cur_source == self.settings['hass_power_entity_source']: service_data = { 'entity_id': self.settings['hass_power_entity'] } await self.mass.hass.call_service('media_player', 'turn_off', service_data) elif self.mass.hass and self.settings.get('hass_power_entity'): domain = self.settings['hass_power_entity'].split('.')[0] service_data = { 'entity_id': self.settings['hass_power_entity']} - await self.mass.hass.call_service(domain, 'turn_ff', service_data) + await self.mass.hass.call_service(domain, 'turn_off', service_data) # handle group power if self.is_group: # player is group, turn off all childs @@ -508,13 +516,25 @@ class Player(): await self.queue.update() LOGGER.debug("player updated: %s" % self.name) await self.mass.signal_event('player changed', self) + self.get_player_settings() + async def hass_state_listener(self, msg, msg_details=None): + ''' called when tracked entities in hass change state ''' + if (msg_details == self.settings.get('hass_power_entity') or + msg_details == self.settings.get('hass_volume_entity')): + await self.update() + @property def settings(self): + ''' [PROTECTED] get (or create) player config settings ''' + if self._player_settings: + return self._player_settings + else: + return self.get_player_settings() + + def get_player_settings(self): ''' [PROTECTED] get (or create) player config settings ''' player_settings = self.mass.config['player_settings'].get(self.player_id,{}) - if player_settings: - return player_settings # generate config for the player config_entries = [ # default config entries for a player ("enabled", True, "player_enabled"), @@ -545,7 +565,8 @@ class Player(): player_settings[key] = def_value self.mass.config['player_settings'][self.player_id] = player_settings self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries - return player_settings + self._player_settings = self.mass.config['player_settings'][self.player_id] + return player_settings def to_dict(self): ''' instance attributes as dict so it can be serialized to json ''' diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index de7d555c..60d70226 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -17,7 +17,7 @@ class QueueItem(Track): ''' representation of a queue item, simplified version of track ''' def __init__(self, media_item=None): super().__init__() - self.quality = TrackQuality.FLAC_LOSSLESS + self.streamdetails = {} self.uri = "" self.queue_item_id = str(uuid.uuid4()) # if existing media_item given, load those values @@ -206,10 +206,7 @@ class PlayerQueue(): :param queue_items: a list of QueueItem :param offset: offset from current queue position ''' - if self.cur_index: - insert_at_index = self.cur_index + offset - else: - insert_at_index = 0 + insert_at_index = self.cur_index + offset if not self.items or insert_at_index >= len(self.items): return await self.load(queue_items) if self.shuffle_enabled: @@ -217,7 +214,7 @@ class PlayerQueue(): self._items = self._items[:insert_at_index] + queue_items + self._items[insert_at_index:] if self.use_queue_stream or not self._player.supports_queue: if offset == 0: - return await self.play_index(0) + return await self.play_index(insert_at_index) else: return await self._player.cmd_queue_insert(queue_items, offset) diff --git a/music_assistant/musicproviders/tunein.py b/music_assistant/musicproviders/tunein.py index 71a46cce..84c3f4f9 100644 --- a/music_assistant/musicproviders/tunein.py +++ b/music_assistant/musicproviders/tunein.py @@ -130,14 +130,18 @@ class TuneInProvider(MusicProvider): async def get_stream_details(self, stream_id): ''' return the content details for the given track when it will be streamed''' - radio_id, media_type = stream_id.split('--') + radio_id = stream_id.split('--')[0] + if len(stream_id.split('--')) > 1: + media_type = stream_id.split('--')[1] + else: + media_type = '' stream_info = await self.__get_stream_urls(radio_id) for stream in stream_info["body"]: - if stream['media_type'] == media_type: + if stream['media_type'] == media_type or not media_type: return { "type": "url", "path": stream['url'], - "content_type": media_type, + "content_type": stream['media_type'], "sample_rate": 44100, "bit_depth": 16 } diff --git a/music_assistant/player_manager.py b/music_assistant/player_manager.py index 324c4823..357d7bec 100755 --- a/music_assistant/player_manager.py +++ b/music_assistant/player_manager.py @@ -95,15 +95,8 @@ class PlayerManager(): # generate uri for this queue item queue_item.uri = 'http://%s:%s/stream/%s?queue_item_id=%s'% ( self.mass.web.local_ip, self.mass.web.http_port, player_id, queue_item.queue_item_id) - # sort by quality and check track availability - for prov_media in sorted(track.provider_ids, key=operator.itemgetter('quality'), reverse=True): - queue_item.provider = prov_media['provider'] - queue_item.item_id = prov_media['item_id'] - queue_item.quality = prov_media['quality'] - # TODO: check track availability - # TODO: handle direct stream capability - queue_items.append(queue_item) - break + queue_items.append(queue_item) + # load items into the queue if queue_opt == 'replace' or (queue_opt in ['next', 'play'] and len(queue_items) > 50): return await player.queue.load(queue_items) diff --git a/web/components/player.vue.js b/web/components/player.vue.js index 33247fe1..d8b9859b 100755 --- a/web/components/player.vue.js +++ b/web/components/player.vue.js @@ -238,13 +238,18 @@ Vue.component("player", { }, setPlayerVolume: function(player_id, new_volume) { this.players[player_id].volume_level = new_volume; - this.playerCommand('volume', new_volume, player_id); + if (new_volume == 'up') + this.playerCommand('volume_up', null, player_id); + else if (new_volume == 'down') + this.playerCommand('volume_down', null, player_id); + else + this.playerCommand('volume_set', new_volume, player_id); }, togglePlayerPower: function(player_id) { if (this.players[player_id].powered) - this.playerCommand('power', 'off', player_id); + this.playerCommand('power_off', null, player_id); else - this.playerCommand('power', 'on', player_id); + this.playerCommand('power_on', null, player_id); }, connectWS() { var loc = window.location, new_uri; @@ -264,20 +269,21 @@ Vue.component("player", { this.ws.onmessage = function(e) { var msg = JSON.parse(e.data); - var players = []; - console.log(msg); - if (msg.message == 'player updated') - players = [msg.message_details]; - else if (msg.message == 'player removed') - this.players[msg.message_details].enabled = false; - else if (msg.message == 'players') - players = msg.message_details; - - for (var item of players) - if (item.player_id in this.players) - this.players[item.player_id] = Object.assign({}, this.players[item.player_id], item); - else - this.$set(this.players, item.player_id, item) + if (msg.message == 'player changed') + { + Vue.set(this.players, msg.message_details.player_id, msg.message_details); + } + else if (msg.message == 'player removed') { + this.players[msg.message_details.player_id].enabled = false; + } + else if (msg.message == 'players') { + for (var item of msg.message_details) { + console.log("new player: " + item.player_id); + Vue.set(this.players, item.player_id, item); + } + } + else + console.log(msg); // select new active player // TODO: store previous player in local storage -- 2.34.1