def handle_exception(self, loop, context):
''' global exception handler '''
loop.default_exception_handler(context)
- LOGGER.exception(f"Caught exception: {context}")
+ #LOGGER.exception(f"Caught exception: {context}")
async def signal_event(self, msg, msg_details=None):
''' signal (systemwide) event '''
async def setup(self):
''' perform async setup '''
+ if not self.enabled:
+ return
self.http_session = aiohttp.ClientSession(
loop=self.mass.event_loop, connector=aiohttp.TCPConnector())
self.mass.event_loop.create_task(self.__hass_websocket())
import io
import aiohttp
import subprocess
+
+from .constants import EVENT_STREAM_STARTED, EVENT_STREAM_ENDED
from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
from .models.media_types import TrackQuality, MediaType
from .models.playerstate import PlayerState
await resp.write(chunk)
buf_queue.task_done()
except (asyncio.CancelledError, asyncio.TimeoutError):
+ LOGGER.debug("stream interrupted")
cancelled.set()
# wait for bg_task
- await asyncio.sleep(2)
+ await asyncio.gather(bg_task)
del buf_queue
raise asyncio.CancelledError()
return resp
self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']),
self.mass.event_loop).result()
if streamdetails:
+ streamdetails['player_id'] = player.player_id
queue_item.streamdetails = streamdetails
queue_item.item_id = prov_media['item_id']
queue_item.provider = prov_media['provider']
# determine how to proceed based on input file ype
if streamdetails["content_type"] == 'aac':
# support for AAC created with ffmpeg in between
- args = 'ffmpeg -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects)
+ args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects)
elif streamdetails['type'] == 'url':
args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"],
streamdetails["path"], outputfmt, sox_effects)
# we use normal subprocess instead of asyncio because of bug with executor
# this should be fixed with python 3.8
process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
-
# fire event that streaming has started for this track (needed by some streaming providers)
- streamdetails["provider"] = queue_item.provider
- streamdetails["track_id"] = queue_item.item_id
- streamdetails["player_id"] = player.player_id
asyncio.run_coroutine_threadsafe(
- self.mass.signal_event('streaming_started',
+ self.mass.signal_event(EVENT_STREAM_STARTED,
streamdetails), self.mass.event_loop)
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
while True:
# read exactly buffersize of data
if cancelled.is_set():
- process.terminate()
+ process.kill()
data = process.stdout.read(chunksize)
if not data:
# last bytes received
else:
buf += data
del buf
- if cancelled.is_set():
- return
# fire event that streaming has ended for this track (needed by some streaming providers)
if resample:
bytes_per_second = resample * (32/8) * 2
seconds_streamed = queue_item.duration
streamdetails["seconds"] = seconds_streamed
asyncio.run_coroutine_threadsafe(
- self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop)
+ self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop)
# send task to background to analyse the audio
- asyncio.run_coroutine_threadsafe(
- self.__analyze_audio(queue_item), self.mass.event_loop)
+ asyncio.ensure_future(self.__analyze_audio(queue_item), loop=self.mass.event_loop)
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''
sox_effects = []
- # volume normalisation enabled but not natively handled by player so handle with sox
- if not player.supports_replay_gain and player.settings['volume_normalisation']:
- target_gain = int(player.settings['target_volume'])
- fallback_gain = int(player.settings['fallback_gain_correct'])
- track_loudness = asyncio.run_coroutine_threadsafe(
- self.mass.db.get_track_loudness(queue_item.item_id, queue_item.provider),
- self.mass.event_loop).result()
- if track_loudness == None:
- gain_correct = fallback_gain
- else:
- gain_correct = target_gain - track_loudness
- gain_correct = round(gain_correct,2)
+ # volume normalisation
+ gain_correct = asyncio.run_coroutine_threadsafe(
+ self.mass.players.get_gain_correct(
+ player.player_id, queue_item.item_id, queue_item.provider),
+ self.mass.event_loop).result()
+ if gain_correct != 0:
sox_effects.append('vol %s dB ' % gain_correct)
- else:
- gain_correct = ''
# downsample if needed
if player.settings['max_sample_rate']:
max_sample_rate = try_parse_int(player.settings['max_sample_rate'])
''' analyze track audio, for now we only calculate EBU R128 loudness '''
if queue_item.media_type != MediaType.Track:
# TODO: calculate loudness average for web radio ?
+ LOGGER.debug("analyze is only supported for tracks")
return
item_key = '%s%s' %(queue_item.item_id, queue_item.provider)
if item_key in self.analyze_jobs:
return # prevent multiple analyze jobs for same track
self.analyze_jobs[item_key] = True
- streamdetails = queue_item.stream_details
+ streamdetails = queue_item.streamdetails
track_loudness = await self.mass.db.get_track_loudness(
queue_item.item_id, queue_item.provider)
if track_loudness == None:
async with session.get(streamdetails["path"], verify_ssl=False) as resp:
audio_data = await resp.read()
elif streamdetails['type'] == 'executable':
- process = await asyncio.create_subprocess_shell(streamdetails["path"],
- stdout=asyncio.subprocess.PIPE)
- audio_data, stderr = await process.communicate()
+ audio_data = subprocess.check_output(streamdetails["path"], shell=True)
# calculate BS.1770 R128 integrated loudness
- if track_loudness == None:
- with io.BytesIO(audio_data) as tmpfile:
- data, rate = soundfile.read(tmpfile)
- meter = pyloudnorm.Meter(rate) # create BS.1770 meter
- loudness = meter.integrated_loudness(data) # measure loudness
- del data
- LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness))
- await self.mass.db.set_track_loudness(queue_item.item_id, queue_item.provider, loudness)
+ with io.BytesIO(audio_data) as tmpfile:
+ data, rate = soundfile.read(tmpfile)
+ meter = pyloudnorm.Meter(rate) # create BS.1770 meter
+ loudness = meter.integrated_loudness(data) # measure loudness
+ del data
+ await self.mass.db.set_track_loudness(queue_item.item_id, queue_item.provider, loudness)
del audio_data
- LOGGER.debug('Finished analyzing track %s' % item_key)
+ LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness))
self.analyze_jobs.pop(item_key, None)
def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length):
def __init__(self, mass):
self.mass = mass
+ self.cache = mass.cache
async def setup(self):
''' perform async setup '''
def __init__(self, mass):
self.mass = mass
+ self.cache = mass.cache
async def setup(self):
''' perform async setup '''
play item at index X on player's queue
:attrib index: (int) index of the queue item that should start playing
'''
- raise NotImplementedError
+ item = await self.queue.get_item(index)
+ if item:
+ return await self.cmd_play_uri(item.uri)
async def cmd_queue_load(self, queue_items):
'''
load/overwrite given items in the player's own queue implementation
:param queue_items: a list of QueueItems
'''
- raise NotImplementedError
+ pass
async def cmd_queue_insert(self, queue_items, offset=0):
'''
:param queue_items: a list of QueueItems
:param offset: offset from current queue position to insert new items
'''
- raise NotImplementedError
+ pass
async def cmd_queue_append(self, queue_items):
'''
+ [OVERRIDE IF SUPPORTED]
append new items at the end of the queue
:param queue_items: a list of QueueItems
'''
- raise NotImplementedError
+ pass
async def cmd_play_uri(self, uri:str):
'''
self.supports_queue = True # has native support for a queue
self.supports_gapless = False # has native gapless support
self.supports_crossfade = False # has native crossfading support
- self.supports_replay_gain = False # has native support for replaygain volume leveling
# if home assistant support is enabled, register state listener
if self.mass.hass.enabled:
self.mass.event_loop.create_task(
self._players[player.player_id] = player
await self.mass.signal_event('player added', player)
# TODO: turn on player if it was previously turned on ?
+ LOGGER.info(f"New player added: {player.player_provider}/{player.player_id}")
return player
async def remove_player(self, player_id):
''' handle a player remove '''
self._players.pop(player_id, None)
await self.mass.signal_event('player removed', player_id)
+ LOGGER.info(f"Player removed: {player_id}")
async def trigger_update(self, player_id):
''' manually trigger update for a player '''
return await player.queue.insert(queue_items, 0)
elif queue_opt == 'add':
return await player.queue.append(queue_items)
-
\ No newline at end of file
+
+ async def get_gain_correct(self, player_id, item_id, provider_id, replaygain=False):
+ ''' get gain correction for given player / track combination '''
+ player = self._players[player_id]
+ if not player.settings['volume_normalisation']:
+ return 0
+ target_gain = int(player.settings['target_volume'])
+ fallback_gain = int(player.settings['fallback_gain_correct'])
+ track_loudness = await self.mass.db.get_track_loudness(item_id, provider_id)
+ if track_loudness == None:
+ gain_correct = fallback_gain
+ else:
+ gain_correct = target_gain - track_loudness
+ gain_correct = round(gain_correct,2)
+ LOGGER.info(f"Loudness level for track {provider_id}/{item_id} is {track_loudness} - calculated replayGain is {gain_correct}")
+ return gain_correct
\ No newline at end of file
if self._discovery_running:
return
self._discovery_running = True
- LOGGER.info("Chromecast discovery started...")
+ LOGGER.debug("Chromecast discovery started...")
# remove any disconnected players...
removed_players = []
for player in self.players:
if not player.cc.socket_client or not player.cc.socket_client.is_connected:
- LOGGER.info("%s is disconnected" % player.name)
+ LOGGER.warning("%s is disconnected" % player.name)
# cleanup cast object
del player.cc
removed_players.append(player.player_id)
self.get_player(player_id),
self.mass.event_loop).result()
if not player:
- LOGGER.info("discovered chromecast: %s - %s:%s" % (friendly_name, ip_address, port))
asyncio.run_coroutine_threadsafe(
self.__chromecast_discovered(player_id, discovery_info), self.mass.event_loop)
listener, browser = start_discovery(discovered_callback)
await asyncio.sleep(15) # run discovery for 15 seconds
stop_discovery(browser)
- LOGGER.info("Chromecast discovery completed...")
+ LOGGER.debug("Chromecast discovery completed...")
self._discovery_running = False
async def __chromecast_discovered(self, player_id, discovery_info):
self.supports_queue = True
self.supports_gapless = False
self.supports_crossfade = False
- self.supports_replay_gain = False
if chromecast.cast_type == 'group':
player.is_group = True
mz = MultizoneController(chromecast.uuid)
# keep reading bytes from the socket
while True:
data = await reader.read(64)
+ if not data:
+ # connection lost with client
+ break
# handle incoming data from socket
buffer = buffer + data
if len(buffer) > 8:
except Exception as exc:
# connection lost ?
LOGGER.warning(exc)
- # disconnect
- await self.mass.players.remove_player(player)
+ finally:
+ # disconnect and cleanup
+ if player:
+ if player._heartbeat_task:
+ player._heartbeat_task.cancel()
+ await self.mass.players.remove_player(player)
class PySqueezePlayer(Player):
''' Squeezebox socket client '''
self.supports_queue = True
self.supports_gapless = True
self.supports_crossfade = True
- self.supports_replay_gain = False
self._writer = writer
self.buffer = b''
self.name = "%s - %s" %(dev_type, player_id)
:attrib index: (int) index of the queue item that should start playing
'''
new_track = await self.queue.get_item(index)
- self.flush()
- self.__play_uri(new_track.uri)
+ if new_track:
+ self.__send_flush()
+ await self.__send_play(new_track.uri)
async def cmd_queue_load(self, queue_items):
'''
load/overwrite given items in the player's own queue implementation
:param queue_items: a list of QueueItems
'''
- self.flush()
- self.__play_uri(queue_items[0].uri)
-
- async def cmd_queue_insert(self, queue_items, offset=0):
- ''' nothing to do, handled by built-in queue '''
- pass
-
- async def cmd_queue_append(self, queue_items):
- ''' nothing to do, handled by built-in queue '''
- pass
+ self.__send_flush()
+ await self.__send_play(queue_items[0].uri)
async def cmd_play_uri(self, uri:str):
'''
[MUST OVERRIDE]
tell player to start playing a single uri
'''
- self.flush()
- self.__play_uri(uri)
+ self.__send_flush()
+ await self.__send_play(uri)
- def flush(self):
+ def __send_flush(self):
data = self.pack_stream(b"f", autostart=b"0", flags=0)
self.send_frame(b"strm", data)
- def __play_uri(self, uri):
- # TODO: replaygain
+ async def __send_play(self, uri):
+ ''' play uri '''
self.cur_uri = uri
self.powered = True
enable_crossfade = self.settings["crossfade_duration"] > 0
transDuration = self.settings["crossfade_duration"]
formatbyte = b'f' # fixed to flac
uri = '/stream' + uri.split('/stream')[1]
- data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration)
+ data = self.pack_stream(command, autostart=autostart, flags=0x00,
+ formatbyte=formatbyte, transType=transType,
+ transDuration=transDuration)
headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.mass.web.local_ip, self.mass.web.http_port)
request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers)
data = data + request.encode("utf-8")
def __delete__(self, instance):
''' make sure the heartbeat task is deleted '''
- self._heartbeat_task.cancel()
+ if self._heartbeat_task:
+ self._heartbeat_task.cancel()
@run_periodic(5)
async def __send_heartbeat(self):
LOGGER.debug("ACK aude - Received player power: %s" % powered)
def stat_audg(self, data):
+ # TODO: process volume level
LOGGER.info("Received volume_level from player %s" % data)
self.volume_level = self._volume.volume
LOGGER.debug("Status Message: Connect")
def stat_STMd(self, data):
- LOGGER.info("Decoder Ready for next track")
+ LOGGER.debug("Decoder Ready for next track")
next_item = self.queue.next_item
- self.__play_uri(next_item.uri)
+ if next_item:
+ self.mass.event_loop.create_task(
+ self.__send_play(next_item.uri))
def stat_STMe(self, data):
- LOGGER.idebugnfo("Connection established")
+ LOGGER.debug("Connection established")
def stat_STMf(self, data):
LOGGER.debug("Status Message: Connection closed")
jiffies, output_buffer_size, output_buffer_fullness,
elapsed_seconds, voltage, cur_time_milliseconds,
server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
- if elapsed_seconds != self.cur_time:
+ if self.state == PlayerState.Playing and elapsed_seconds != self.cur_time:
self.cur_time = elapsed_seconds
self._cur_time_milliseconds = cur_time_milliseconds
LOGGER.debug("META received")
def process_DSCO(self, data):
- LOGGER.info("Data Stream Disconnected")
+ LOGGER.debug("Data Stream Disconnected")
def process_DBUG(self, data):
LOGGER.debug("DBUG received")
group = socket.inet_aton('239.255.255.250')
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+ def connection_lost(self, *args, **kwargs):
+ LOGGER.warning("Connection lost to discovery")
def build_TLV_response(self, requestdata):
responsedata = OrderedDict()
cmd = request.match_info.get('cmd')
cmd_args = request.match_info.get('cmd_args')
player_cmd = getattr(player, cmd, None)
- if player_cmd and cmd_args:
+ if player_cmd and cmd_args != None:
result = await player_cmd(cmd_args)
elif player_cmd:
result = await player_cmd()
mounted() { },
created() {
this.connectWS();
- this.updateProgress();
},
computed: {
return this.players[this.active_player_id];
else
return {
- name: 'no player selected',
+ name: $t('no_player'),
cur_item: null,
cur_time: 0,
player_id: '',
return true;
return false;
},
- updateProgress: function(){
- this.intervalid2 = setInterval(function(){
- if (this.active_player.state == 'playing')
- this.active_player.cur_time +=1;
- }.bind(this), 1000);
- },
setPlayerVolume: function(player_id, new_volume) {
this.players[player_id].volume_level = new_volume;
if (new_volume == 'up')
return playersLst;
}
},
- watch: {
- // 'conf': {
- // handler: _.debounce(function (val, oldVal) {
- // if (oldVal.base) {
- // console.log("save config needed!");
- // this.saveConfig();
- // this.$toasted.show(this.$t('conf.conf_saved'))
- // }
- // }, 5000),
- // deep: true
- // }
- },
+ watch: {},
created() {
this.$globals.windowtitle = this.$t('settings');
this.getPlayers();
remove_library: "Remove from library",
add_playlist: "Add to playlist...",
remove_playlist: "Remove from playlist",
+ no_player: "No player selected",
// settings strings
conf: {
enabled: "Enabled",
remove_library: "Verwijder uit bibliotheek",
add_playlist: "Aan playlist toevoegen...",
remove_playlist: "Verwijder uit playlist",
+ no_player: "Geen speler geselecteerd",
// settings strings
conf: {
enabled: "Ingeschakeld",
audio_cache_folder: "Map om te gebruiken voor cache bestanden",
audio_cache_max_size_gb: "Maximale grootte van de cache map in GB.",
gapless_enabled: "Schakel ondersteuning voor gapless in.",
- crossfade_duration: "Crossfade in (seconden, 0 om uit te schakelen)."
+ crossfade_duration: "Crossfade (in seconden, 0 om uit te schakelen)."
},
// player strings
players: "Spelers",