def __init__(self, mass, url, token):
self.mass = mass
self._published_players = {}
- self._tracked_states = {}
- self._state_listeners = []
+ self._tracked_entities = {}
+ self._state_listeners = {}
self._sources = []
self._token = token
if url.startswith('https://'):
await self.mass.add_event_listener(self.mass_event, "player updated")
self.mass.event_loop.create_task(self.__get_sources())
- def get_state_sync(self, entity_id, attribute='state', register_listener=None):
+ async def get_state_async(self, entity_id, attribute='state'):
+ ''' get state of a hass entity (async)'''
+ state = self.get_state(entity_id, attribute)
+ if not state:
+ await self.__request_state(entity_id)
+ state = self.get_state(entity_id, attribute)
+ return state
+
+ def get_state(self, entity_id, attribute='state'):
''' get state of a hass entity'''
- if entity_id in self._tracked_states:
- state_obj = self._tracked_states.get(entity_id)
- if not state_obj:
- return None
+ state_obj = self._tracked_entities.get(entity_id)
+ if state_obj:
+ if attribute == 'state':
+ return state_obj['state']
+ elif attribute:
+ return state_obj['attributes'].get(attribute)
+ else:
+ return state_obj
else:
- if register_listener:
- # register state listener
- self._state_listeners.append( (entity_id, register_listener) )
+ self.mass.event_loop.create_task(self.__request_state(entity_id))
return None
- if attribute == 'state':
- return state_obj['state']
- elif not attribute:
- return state_obj
- else:
- return state_obj['attributes'].get(attribute)
-
- async def get_state(self, entity_id, attribute='state', register_listener=None):
+
+ async def __request_state(self, entity_id):
''' get state of a hass entity'''
- if entity_id in self._tracked_states:
- state_obj = self._tracked_states[entity_id]
- else:
- # first request
- state_obj = await self.__get_data('states/%s' % entity_id)
- if register_listener:
- # register state listener
- self._state_listeners.append( (entity_id, register_listener) )
- self._tracked_states[entity_id] = state_obj
- if attribute == 'state':
- return state_obj['state']
- elif not attribute:
- return state_obj
- else:
- return state_obj['attributes'].get(attribute)
+ state_obj = await self.__get_data('states/%s' % entity_id)
+ self._tracked_entities[entity_id] = state_obj
+ self.mass.event_loop.create_task(
+ self.mass.signal_event("hass entity changed", entity_id))
async def mass_event(self, msg, msg_details):
''' received event from mass '''
async def hass_event(self, event_type, event_data):
''' received event from hass '''
if event_type == 'state_changed':
- if event_data['entity_id'] in self._tracked_states:
- self._tracked_states[event_data['entity_id']] = event_data['new_state']
- for entity_id, handler in self._state_listeners:
- if entity_id == event_data['entity_id']:
- self.mass.event_loop.create_task(handler())
+ if event_data['entity_id'] in self._tracked_entities:
+ self._tracked_entities[event_data['entity_id']] = event_data['new_state']
+ self.mass.event_loop.create_task(
+ self.mass.signal_event("hass entity changed", event_data['entity_id']))
elif event_type == 'call_service' and event_data['domain'] == 'media_player':
await self.__handle_player_command(event_data['service'], event_data['service_data'])
elif data['type'] == 'result' and data.get('result'):
# reply to our get_states request
asyncio.create_task(self.hass_event('all_states', data['result']))
- else:
- LOGGER.info(data)
+ # else:
+ # LOGGER.info(data)
elif msg.type == aiohttp.WSMsgType.ERROR:
raise Exception("error in websocket")
except Exception as exc:
if not queue_track:
LOGGER.warning("no (more) tracks left in queue")
break
- LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
- LOGGER.info(player.state)
+ LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
fade_in_part = b''
cur_chunk = 0
prev_chunk = None
# end of the track reached
if cancelled.is_set():
# break out the loop if the http session is cancelled
- LOGGER.warning("session cancelled")
+ LOGGER.debug("session cancelled")
break
else:
# WIP: update actual duration to the queue for more accurate now playing info
accurate_duration = bytes_written / int(sample_rate * 4 * 2)
queue_track.duration = accurate_duration
LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
- LOGGER.info("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
+ LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
async def __get_audio_stream(self, player, queue_item, cancelled,
chunksize=512000, resample=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
+ # get stream details from provider
+ # sort by quality and check track availability
+ for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True):
+ streamdetails = asyncio.run_coroutine_threadsafe(
+ self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']),
+ self.mass.event_loop).result()
+ if streamdetails:
+ queue_item.streamdetails = streamdetails
+ queue_item.item_id = prov_media['item_id']
+ queue_item.provider = prov_media['provider']
+ queue_item.quality = prov_media['quality']
+ break
+ if not streamdetails:
+ LOGGER.warning("no stream details!")
+ yield (True, b'')
+ return
+ # get sox effects and resample options
sox_effects = await self.__get_player_sox_options(player, queue_item)
outputfmt = 'flac -C 0'
if resample:
outputfmt = 'raw -b 32 -c 2 -e signed-integer'
sox_effects += ' rate -v %s' % resample
- # stream audio from provider
- streamdetails = asyncio.run_coroutine_threadsafe(
- self.mass.music.providers[queue_item.provider].get_stream_details(queue_item.item_id),
- self.mass.event_loop).result()
- if not streamdetails:
- LOGGER.warning("no stream details!")
- yield (True, b'')
- return
+ # determine how to proceed based on input file ype
if streamdetails["content_type"] == 'aac':
# support for AAC created with ffmpeg in between
args = 'ffmpeg -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects)
elif streamdetails['type'] == 'executable':
args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"],
streamdetails["content_type"], outputfmt, sox_effects)
-
- LOGGER.info("Running sox with args: %s" % args)
+ # start sox process
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE)
# fire event that streaming has started for this track (needed by some streaming providers)
process.terminate()
except ProcessLookupError:
pass
- LOGGER.warning("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
+ LOGGER.debug("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
else:
- LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
+ LOGGER.debug("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
# fire event that streaming has ended for this track (needed by some streaming providers)
if resample:
bytes_per_second = resample * (32/8) * 2
self.mass.signal_event('streaming_ended', streamdetails),
self.mass.event_loop)
# send task to background to analyse the audio
- asyncio.run_coroutine_threadsafe(
- self.__analyze_audio(queue_item.item_id, queue_item.provider),
- self.mass.event_loop)
+ # TODO: send audio data completely
+ if not queue_item.media_type == MediaType.Radio:
+ asyncio.run_coroutine_threadsafe(
+ self.__analyze_audio(queue_item.item_id, queue_item.provider),
+ self.mass.event_loop)
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''
self._muted = False
self._group_parent = None
self._queue = PlayerQueue(mass, self)
+ self._player_settings = None
# public attributes
self.supports_queue = True # has native support for a queue
self.supports_gapless = True # has native gapless support
self.supports_crossfade = False # has native crossfading support
self.supports_replay_gain = False # has native support for replaygain volume leveling
+ # if home assistant support is enabled, register state listener
+ if self.mass.hass:
+ self.mass.event_loop.create_task(
+ self.mass.add_event_listener(self.hass_state_listener, "hass entity changed"))
@property
def player_id(self):
def powered(self):
''' [PROTECTED] return power state for this player '''
# homeassistant integration
- if self.mass.hass and self.settings.get('hass_power_entity') and self.settings.get('hass_power_entity_source'):
- hass_state = self.mass.hass.get_state_sync(
+ if (self.mass.hass and self.settings.get('hass_power_entity') and
+ self.settings.get('hass_power_entity_source')):
+ hass_state = self.mass.hass.get_state(
self.settings['hass_power_entity'],
- attribute='source',
- register_listener=self.update)
+ attribute='source')
return hass_state == self.settings['hass_power_entity_source']
elif self.mass.hass and self.settings.get('hass_power_entity'):
- hass_state = self.mass.hass.get_state_sync(
- self.settings['hass_power_entity'],
- attribute='state',
- register_listener=self.update)
+ hass_state = self.mass.hass.get_state(
+ self.settings['hass_power_entity'])
return hass_state != 'off'
# mute as power
elif self.settings.get('mute_as_power'):
return group_volume
# handle hass integration
elif self.mass.hass and self.settings.get('hass_volume_entity'):
- hass_state = self.mass.hass.get_state_sync(
+ hass_state = self.mass.hass.get_state(
self.settings['hass_volume_entity'],
- attribute='volume_level',
- register_listener=self.update)
+ attribute='volume_level')
return int(try_parse_float(hass_state)*100)
else:
return self._volume_level
if self.settings['mute_as_power']:
await self.volume_mute(False)
# handle hass integration
- if self.mass.hass and self.settings.get('hass_power_entity') and self.settings.get('hass_power_entity_source'):
- cur_source = await self.mass.hass.get_state(self.settings['hass_power_entity'], attribute='source')
+ if (self.mass.hass and
+ self.settings.get('hass_power_entity') and
+ self.settings.get('hass_power_entity_source')):
+ cur_source = await self.mass.hass.get_state_async(
+ self.settings['hass_power_entity'], attribute='source')
if not cur_source:
service_data = {
'entity_id': self.settings['hass_power_entity'],
- 'source':self.settings['hass_power_entity_source']
+ 'source': self.settings['hass_power_entity_source']
}
await self.mass.hass.call_service('media_player', 'select_source', service_data)
elif self.settings.get('hass_power_entity'):
if self.settings['mute_as_power']:
await self.volume_mute(True)
# handle hass integration
- if self.mass.hass and self.settings.get('hass_power_entity') and self.settings.get('hass_power_entity_source'):
- cur_source = await self.mass.hass.get_state(self.settings['hass_power_entity'], attribute='source')
+ if (self.mass.hass and
+ self.settings.get('hass_power_entity') and
+ self.settings.get('hass_power_entity_source')):
+ cur_source = await self.mass.hass.get_state_async(
+ self.settings['hass_power_entity'], attribute='source')
if cur_source == self.settings['hass_power_entity_source']:
service_data = { 'entity_id': self.settings['hass_power_entity'] }
await self.mass.hass.call_service('media_player', 'turn_off', service_data)
elif self.mass.hass and self.settings.get('hass_power_entity'):
domain = self.settings['hass_power_entity'].split('.')[0]
service_data = { 'entity_id': self.settings['hass_power_entity']}
- await self.mass.hass.call_service(domain, 'turn_ff', service_data)
+ await self.mass.hass.call_service(domain, 'turn_off', service_data)
# handle group power
if self.is_group:
# player is group, turn off all childs
await self.queue.update()
LOGGER.debug("player updated: %s" % self.name)
await self.mass.signal_event('player changed', self)
+ self.get_player_settings()
+ async def hass_state_listener(self, msg, msg_details=None):
+ ''' called when tracked entities in hass change state '''
+ if (msg_details == self.settings.get('hass_power_entity') or
+ msg_details == self.settings.get('hass_volume_entity')):
+ await self.update()
+
@property
def settings(self):
+ ''' [PROTECTED] get (or create) player config settings '''
+ if self._player_settings:
+ return self._player_settings
+ else:
+ return self.get_player_settings()
+
+ def get_player_settings(self):
''' [PROTECTED] get (or create) player config settings '''
player_settings = self.mass.config['player_settings'].get(self.player_id,{})
- if player_settings:
- return player_settings
# generate config for the player
config_entries = [ # default config entries for a player
("enabled", True, "player_enabled"),
player_settings[key] = def_value
self.mass.config['player_settings'][self.player_id] = player_settings
self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries
- return player_settings
+ self._player_settings = self.mass.config['player_settings'][self.player_id]
+ return player_settings
def to_dict(self):
''' instance attributes as dict so it can be serialized to json '''
''' representation of a queue item, simplified version of track '''
def __init__(self, media_item=None):
super().__init__()
- self.quality = TrackQuality.FLAC_LOSSLESS
+ self.streamdetails = {}
self.uri = ""
self.queue_item_id = str(uuid.uuid4())
# if existing media_item given, load those values
:param queue_items: a list of QueueItem
:param offset: offset from current queue position
'''
- if self.cur_index:
- insert_at_index = self.cur_index + offset
- else:
- insert_at_index = 0
+ insert_at_index = self.cur_index + offset
if not self.items or insert_at_index >= len(self.items):
return await self.load(queue_items)
if self.shuffle_enabled:
self._items = self._items[:insert_at_index] + queue_items + self._items[insert_at_index:]
if self.use_queue_stream or not self._player.supports_queue:
if offset == 0:
- return await self.play_index(0)
+ return await self.play_index(insert_at_index)
else:
return await self._player.cmd_queue_insert(queue_items, offset)
async def get_stream_details(self, stream_id):
''' return the content details for the given track when it will be streamed'''
- radio_id, media_type = stream_id.split('--')
+ radio_id = stream_id.split('--')[0]
+ if len(stream_id.split('--')) > 1:
+ media_type = stream_id.split('--')[1]
+ else:
+ media_type = ''
stream_info = await self.__get_stream_urls(radio_id)
for stream in stream_info["body"]:
- if stream['media_type'] == media_type:
+ if stream['media_type'] == media_type or not media_type:
return {
"type": "url",
"path": stream['url'],
- "content_type": media_type,
+ "content_type": stream['media_type'],
"sample_rate": 44100,
"bit_depth": 16
}
# generate uri for this queue item
queue_item.uri = 'http://%s:%s/stream/%s?queue_item_id=%s'% (
self.mass.web.local_ip, self.mass.web.http_port, player_id, queue_item.queue_item_id)
- # sort by quality and check track availability
- for prov_media in sorted(track.provider_ids, key=operator.itemgetter('quality'), reverse=True):
- queue_item.provider = prov_media['provider']
- queue_item.item_id = prov_media['item_id']
- queue_item.quality = prov_media['quality']
- # TODO: check track availability
- # TODO: handle direct stream capability
- queue_items.append(queue_item)
- break
+ queue_items.append(queue_item)
+
# load items into the queue
if queue_opt == 'replace' or (queue_opt in ['next', 'play'] and len(queue_items) > 50):
return await player.queue.load(queue_items)
},
setPlayerVolume: function(player_id, new_volume) {
this.players[player_id].volume_level = new_volume;
- this.playerCommand('volume', new_volume, player_id);
+ if (new_volume == 'up')
+ this.playerCommand('volume_up', null, player_id);
+ else if (new_volume == 'down')
+ this.playerCommand('volume_down', null, player_id);
+ else
+ this.playerCommand('volume_set', new_volume, player_id);
},
togglePlayerPower: function(player_id) {
if (this.players[player_id].powered)
- this.playerCommand('power', 'off', player_id);
+ this.playerCommand('power_off', null, player_id);
else
- this.playerCommand('power', 'on', player_id);
+ this.playerCommand('power_on', null, player_id);
},
connectWS() {
var loc = window.location, new_uri;
this.ws.onmessage = function(e) {
var msg = JSON.parse(e.data);
- var players = [];
- console.log(msg);
- if (msg.message == 'player updated')
- players = [msg.message_details];
- else if (msg.message == 'player removed')
- this.players[msg.message_details].enabled = false;
- else if (msg.message == 'players')
- players = msg.message_details;
-
- for (var item of players)
- if (item.player_id in this.players)
- this.players[item.player_id] = Object.assign({}, this.players[item.player_id], item);
- else
- this.$set(this.players, item.player_id, item)
+ if (msg.message == 'player changed')
+ {
+ Vue.set(this.players, msg.message_details.player_id, msg.message_details);
+ }
+ else if (msg.message == 'player removed') {
+ this.players[msg.message_details.player_id].enabled = false;
+ }
+ else if (msg.message == 'players') {
+ for (var item of msg.message_details) {
+ console.log("new player: " + item.player_id);
+ Vue.set(this.players, item.player_id, item);
+ }
+ }
+ else
+ console.log(msg);
// select new active player
// TODO: store previous player in local storage