CONF_KEY = 'homeassistant'
CONF_PUBLISH_PLAYERS = "publish_players"
EVENT_HASS_CHANGED = "hass entity changed"
-CONFIG_ENTRIES = [
+
+### auto detect hassio for auto config ####
+if os.path.isfile('/data/options.json'):
+ IS_HASSIO = True
+ CONFIG_ENTRIES = [
+ (CONF_ENABLED, False, CONF_ENABLED)]
+else:
+ IS_HASSIO = False
+ CONFIG_ENTRIES = [
(CONF_ENABLED, False, CONF_ENABLED),
(CONF_URL, 'localhost', 'hass_url'),
(CONF_TOKEN, '<password>', 'hass_token'),
- (CONF_PUBLISH_PLAYERS, True, 'hass_publish')
- ]
+ (CONF_PUBLISH_PLAYERS, True, 'hass_publish')]
+
class HomeAssistant():
'''
# load/create/update config
config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES)
self.enabled = config[CONF_ENABLED]
- if self.enabled and (not config[CONF_URL] or
- not config[CONF_TOKEN]):
+ if (self.enabled and not IS_HASSIO and not
+ (config[CONF_URL] or config[CONF_TOKEN])):
LOGGER.warning("Invalid configuration for Home Assistant")
self.enabled = False
self._token = config[CONF_TOKEN]
- url = config[CONF_URL]
- if url.startswith('https://'):
- self._use_ssl = True
- self._host = url.replace('https://','').split('/')[0]
- else:
+ if IS_HASSIO:
self._use_ssl = False
- self._host = url.replace('http://','').split('/')[0]
+ self._host = 'hassio/homeassistant'
+ else:
+ url = config[CONF_URL]
+ if url.startswith('https://'):
+ self._use_ssl = True
+ self._host = url.replace('https://','').split('/')[0]
+ else:
+ self._use_ssl = False
+ self._host = url.replace('http://','').split('/')[0]
if self.enabled:
LOGGER.info('Homeassistant integration is enabled')
try:
while True:
chunk = await buf_queue.get()
- if not chunk:
+ if chunk:
+ await resp.write(chunk)
+ buf_queue.task_done()
+ del chunk
+ else:
buf_queue.task_done()
break
- await resp.write(chunk)
- buf_queue.task_done()
except (asyncio.CancelledError, asyncio.TimeoutError):
LOGGER.debug("stream interrupted")
cancelled.set()
return resp
async def __stream_single(self, player, queue_item, buffer, cancelled):
- ''' start streaming single track from provider '''
- LOGGER.debug("stream single track started for track %s on player %s" % (queue_item.name, player.name))
+ ''' start streaming single queue track '''
+ LOGGER.debug("stream single queue track started for track %s on player %s" % (queue_item.name, player.name))
audio_stream = self.__get_audio_stream(player, queue_item, cancelled)
async for is_last_chunk, audio_chunk in audio_stream:
+ if cancelled.is_set():
+ # http session ended
+ # we must consume the data to prevent hanging subprocess instances
+ continue
+ # put chunk in buffer
asyncio.run_coroutine_threadsafe(
buffer.put(audio_chunk),
self.mass.event_loop)
- # indicate EOF if no more data
- asyncio.run_coroutine_threadsafe(
- buffer.put(b''),
- self.mass.event_loop)
+ # this should be garbage collected but just in case...
+ del audio_chunk
+ # wait for the queue to consume the data
+ while not cancelled.is_set() and buffer.qsize() > 5:
+ await asyncio.sleep(0.5)
+ # all chunks received: streaming finished
if cancelled.is_set():
LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
else:
+ # indicate EOF if no more data
+ asyncio.run_coroutine_threadsafe(
+ buffer.put(b''),
+ self.mass.event_loop)
LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
async def __stream_queue(self, player, buffer, cancelled):
fade_length = try_parse_int(player.settings["crossfade_duration"])
if not sample_rate or sample_rate < 44100 or sample_rate > 384000:
sample_rate = 96000
- elif player.player_provider == 'web':
- sample_rate = 41100
if fade_length:
fade_bytes = int(sample_rate * 4 * 2 * fade_length)
else:
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
def fill_buffer():
- sample_size = int(sample_rate * 4 * 2)
+ chunk_size = int(sample_rate * 4 * 2)
while sox_proc.returncode == None:
- chunk = sox_proc.stdout.read(sample_size)
- if not chunk:
- # no more data
- break
- if not cancelled.is_set():
+ chunk = sox_proc.stdout.read(chunk_size)
+ if chunk and not cancelled.is_set():
asyncio.run_coroutine_threadsafe(
- buffer.put(chunk),
- self.mass.event_loop)
+ buffer.put(chunk), self.mass.event_loop)
+ del chunk
# indicate EOF if no more data
if not cancelled.is_set():
asyncio.run_coroutine_threadsafe(
- buffer.put(b''),
- self.mass.event_loop)
+ buffer.put(b''), self.mass.event_loop)
+ # start fill buffer task in background
threading.Thread(target=fill_buffer).start()
-
LOGGER.info("Start Queue Stream for player %s " %(player.name))
is_start = True
last_fadeout_data = b''
bytes_written = 0
# handle incoming audio chunks
async for is_last_chunk, chunk in self.__get_audio_stream(
- player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate):
+ player, queue_track, cancelled, chunksize=fade_bytes,
+ resample=sample_rate):
cur_chunk += 1
### HANDLE FIRST PART OF TRACK
# no fadeout_part available so just pass it to the output directly
sox_proc.stdin.write(chunk)
bytes_written += len(chunk)
+ del chunk
elif cur_chunk == 1 and last_fadeout_data:
prev_chunk = chunk
+ del chunk
### HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
elif cur_chunk == 2 and last_fadeout_data:
# combine the first 2 chunks and strip off silence
sox_proc.stdin.write(remaining_bytes)
bytes_written += len(remaining_bytes)
del remaining_bytes
+ del chunk
prev_chunk = None # needed to prevent this chunk being sent again
### HANDLE LAST PART OF TRACK
elif prev_chunk and is_last_chunk:
sox_proc.stdin.write(last_part)
bytes_written += len(last_part)
del last_part
+ del chunk
else:
# handle crossfading support
if len(last_part) < fade_bytes:
bytes_written += len(remaining_bytes)
del last_part
del remaining_bytes
+ del chunk
### MIDDLE PARTS OF TRACK
else:
# middle part of the track
# break out the loop if the http session is cancelled
break
else:
- # WIP: update actual duration to the queue for more accurate now playing info
+ # update actual duration to the queue for more accurate now playing info
accurate_duration = bytes_written / int(sample_rate * 4 * 2)
queue_track.duration = accurate_duration
LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
del last_fadeout_data
+ ### END OF QUEUE STREAM
sox_proc.stdin.close()
sox_proc.terminate()
LOGGER.info("streaming of queue for player %s completed" % player.name)
while True:
# read exactly buffersize of data
if cancelled.is_set():
- process.kill()
+ # http session ended
+ # send terminate and pick up left over bytes
+ process.terminate()
+ # try to read as much data as possible from stdin
data = process.stdout.read(chunksize)
if not data:
- # last bytes received
+ # no more data
+ # yield (empty) chunk as lust chunk
yield (True, buf)
bytes_sent += len(buf)
+ del buf
break
elif len(buf) + len(data) >= chunksize:
+ # enough data to send a chunk
new_data = buf + data
chunk = new_data[:chunksize]
yield (False, chunk)
bytes_sent += len(chunk)
buf = new_data[chunksize:]
del chunk
+ del data
+ del new_data
else:
buf += data
- del buf
+ del data
# fire event that streaming has ended
asyncio.run_coroutine_threadsafe(
self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop)
- # send task to background to analyse the audio
+ # send task to main event loop to analyse the audio
self.mass.event_loop.call_soon_threadsafe(
asyncio.ensure_future, self.__analyze_audio(queue_item))
}
},
createAudioPlayer(data) {
+ if (navigator.userAgent.includes("WebKit"))
+ return // streaming flac not supported on webkit ?!
if (localStorage.getItem('audio_player_id'))
// get player id from local storage
this.audioPlayerId = localStorage.getItem('audio_player_id');
// add event handlers
this.audioPlayer.addEventListener("canplaythrough", event => {
/* the audio is now playable; play it if permissions allow */
- console.log("canplaythrough")
this.audioPlayer.play();
});
- this.audioPlayer.addEventListener("canplay", event => {
- /* the audio is now playable; play it if permissions allow */
- console.log("canplay");
- //this.audioPlayer.play();
- //msg_details['cur_uri'] = this.audioPlayer.src;
- //this.ws.send(JSON.stringify({message:'webplayer state', message_details: msg_details}));
- });
- this.audioPlayer.addEventListener("emptied", event => {
- /* the audio is now playable; play it if permissions allow */
- console.log("emptied");
- //this.audioPlayer.play();
- });
const timeupdateHandler = (event) => {
// currenTime of player updated, sent state (throttled at 1 sec)
msg_details['cur_time'] = Math.round(this.audioPlayer.currentTime);