import asyncio
import os
-from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip
+from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task
import aiohttp
from difflib import SequenceMatcher as Matcher
from models import MediaType, PlayerState, MusicPlayer
async def get_audio_stream(self, track_id, provider):
''' get audio stream from provider and apply additional effects/processing where needed'''
- input_audio_fmt = self.mass.music.providers[provider].audio_fmt
+ input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
cachefile = self.__get_track_cache_filename(track_id, provider)
gain_correct = await self.__get_track_gain_correct(track_id, provider)
- sox_effects=['vol', str(gain_correct), 'dB' ]
+ LOGGER.info("apply gain correction of %s" % gain_correct)
+ sox_effects='vol %s dB' % gain_correct
if os.path.isfile(cachefile):
- # we have a temp file for this track which we can use
- args = ['-t', input_audio_fmt, cachefile, '-t', 'flac', '-', *sox_effects]
+ # we have a cache file for this track which we can use
+ args = ['-t', 'flac', cachefile, '-t', 'flac', '-C', '0', '-', *sox_effects.split(' ')]
process = await asyncio.create_subprocess_exec('sox', *args,
stdout=asyncio.subprocess.PIPE)
buffer_task = None
else:
# stream from provider
- args = ['-t', input_audio_fmt, '-', '-t', 'flac', '-', *sox_effects]
+ args = ['-t', input_content_type, '-', '-t', 'flac', '-C', '0', '-', *sox_effects.split(' ')]
process = await asyncio.create_subprocess_exec('sox', *args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
buffer_task = asyncio.create_task(
- self.__fill_audio_buffer(process.stdin, track_id, provider))
- try:
- # yield the chunks from stdout
- while not process.stdout.at_eof():
- chunk = await process.stdout.read(2000000)
- if not chunk:
- break
- yield chunk
- except (asyncio.CancelledError, concurrent.futures._base.CancelledError):
- # client disconnected so cleanup
- #if buffer_task:
- # buffer_task.cancel()
- # Could not figure out how to reliably close process without deadlocks
- # so instead just read all data for a clean exit
- while True:
- if not await process.stdout.read(2000000):
- break
- await process.wait()
- LOGGER.info("streaming of track_id %s aborted (client disconnect ?)" % track_id)
- raise asyncio.CancelledError()
- except Exception as exc:
- LOGGER.error(exc)
- else:
- await process.wait()
- LOGGER.info("streaming of track_id %s completed" % track_id)
+ self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+ # yield the chunks from stdout
+ while not process.stdout.at_eof():
+ chunk = await process.stdout.read(2000000)
+ if not chunk:
+ break
+ yield chunk
+ await process.wait()
+ LOGGER.info("streaming of track_id %s completed" % track_id)
- async def __analyze_track_audio(self, musicfile, track_id, provider):
+ async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
''' analyze track audio, for now we only calculate EBU R128 loudness '''
- import platform
- analyse_dir = os.path.join(self.mass.datapath, 'analyse_info')
- analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
- if not os.path.isdir(analyse_dir):
- os.makedirs(analyse_dir)
- bs1770_binary = None
- if platform.system() == "Windows":
- bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain")
- elif platform.system() == "Darwin":
- # macos binary is x86_64 intel
- bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain")
- elif platform.system() == "Linux":
- architecture = platform.machine()
- if architecture.startswith('AMD64') or architecture.startswith('x86_64'):
- bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain")
- # TODO: build armhf binary
- cmd = '%s %s --loglevel quiet --xml --ebu -f %s' % (bs1770_binary, musicfile, analysis_file)
- process = await asyncio.create_subprocess_shell(cmd)
- await process.wait()
+ LOGGER.info('Start analyzing file %s' % tmpfile)
+ cachefile = self.__get_track_cache_filename(track_id, provider)
+ strip_silence = True # TODO: attach config setting
+ if not os.path.isfile(cachefile):
+ # not needed to do processing if there already is a cachedfile
+ bs1770_binary = self.__get_bs1770_binary()
+ if bs1770_binary:
+ # calculate integrated r128 loudness with bs1770
+ analyse_dir = os.path.join(self.mass.datapath, 'analyse_info')
+ analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
+ if not os.path.isfile(analysis_file):
+ if not os.path.isdir(analyse_dir):
+ os.makedirs(analyse_dir)
+ cmd = '%s %s --loglevel quiet --xml --ebu -f %s' % (bs1770_binary, tmpfile, analysis_file)
+ process = await asyncio.create_subprocess_shell(cmd)
+ await process.wait()
+ # use sox to store cache file (optionally strip silence from start and end)
+ if strip_silence:
+ cmd = 'sox -t %s %s -t flac -C 5 %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(content_type, tmpfile, cachefile)
+ else:
+ # cachefile is always stored as flac
+ cmd = 'sox -t %s %s -t flac -C 5 %s' %(content_type, tmpfile, cachefile)
+ process = await asyncio.create_subprocess_shell(cmd)
+ await process.wait()
+ # always clean up temp file
+ if os.path.isfile(tmpfile):
+ os.remove(tmpfile)
+ LOGGER.info('Fininished analyzing file %s' % tmpfile)
async def __get_track_gain_correct(self, track_id, provider):
''' get the gain correction that should be applied to a track '''
target_gain = -23
fallback_gain = -14 # fallback if no analyse info is available
analysis_file = os.path.join(self.mass.datapath, 'analyse_info', "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
+ if not os.path.isfile(analysis_file):
+ return fallback_gain
try: # read audio analysis if available
tree = ET.parse(analysis_file)
trackinfo = tree.getroot().find("album").find("track")
track_lufs = trackinfo.find('integrated').get('lufs')
gain_correct = target_gain - float(track_lufs)
- LOGGER.info("apply gain correction of %s" % gain_correct)
- except Exception:
+ except Exception as exc:
+ LOGGER.error('could not retrieve track gain - %s' % exc)
gain_correct = fallback_gain # fallback value
if os.path.isfile(analysis_file):
os.remove(analysis_file)
- cachefile = self.__get_track_cache_filename(track_id, provider)
+ # cachefile = self.__get_track_cache_filename(track_id, provider)
# reschedule analyze task to try again
- asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider))
- return gain_correct
+ # asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider))
+ return round(gain_correct,2)
- async def __fill_audio_buffer(self, buf, track_id, provider):
+ async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
''' get audio data from provider and write to buffer'''
# fill the buffer with audio data
# a tempfile is created so we can do audio analysis
- try:
- tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
- finalfile = self.__get_track_cache_filename(track_id, provider)
- fd = open(tmpfile, 'wb')
- async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
- buf.write(chunk)
- await buf.drain()
- fd.write(chunk)
+ tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
+ fd = open(tmpfile, 'wb')
+ async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
+ buf.write(chunk)
await buf.drain()
- buf.write_eof()
- fd.close()
- except Exception as exc:
- LOGGER.error(exc)
- else:
- # successfull completion
- if os.path.isfile(tmpfile) and not os.path.isfile(finalfile):
- shutil.move(tmpfile, finalfile)
- asyncio.create_task(self.__analyze_track_audio(finalfile, track_id, provider))
- LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
- finally:
- # always clean up temp file
- if os.path.isfile(tmpfile):
- of.remove(tmpfile)
+ fd.write(chunk)
+ await buf.drain()
+ buf.write_eof()
+ fd.close()
+ # successfull completion, send tmpfile to be processed in the background
+ #asyncio.create_task(self.__process_audio(tmpfile, track_id, provider))
+ run_async_background_task(self.mass.bg_executor, self.__analyze_audio, tmpfile, track_id, provider, content_type)
+ LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
+ return
@staticmethod
def __get_track_cache_filename(track_id, provider):
''' get filename for a track to use as cache file '''
return os.path.join(AUDIO_CACHE_DIR, '%s_%s' %(provider, track_id.split(os.sep)[-1]))
+ @staticmethod
+ def __get_bs1770_binary():
+ ''' get the path to the bs1770 binary for the current OS '''
+ import platform
+ bs1770_binary = None
+ if platform.system() == "Windows":
+ bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain")
+ elif platform.system() == "Darwin":
+ # macos binary is x86_64 intel
+ bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain")
+ elif platform.system() == "Linux":
+ architecture = platform.machine()
+ if architecture.startswith('AMD64') or architecture.startswith('x86_64'):
+ bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain")
+ # TODO: build armhf binary
+ return bs1770_binary
def load_providers(self):
''' dynamically load providers '''
async def player_command(self, player_id, cmd:str, cmd_args=None):
''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
if cmd == 'play':
- self._chromecasts[player_id].media_controller.play()
+ if self._chromecasts[player_id].media_controller.status.media_session_id:
+ self._chromecasts[player_id].media_controller.play()
+ else:
+ await self.__resume_queue(player_id)
elif cmd == 'pause':
self._chromecasts[player_id].media_controller.pause()
elif cmd == 'stop':
elif cmd == 'next':
self._chromecasts[player_id].media_controller.queue_next()
elif cmd == 'previous':
- self._chromecasts[player_id].media_controller.queue_previous()
+ self._chromecasts[player_id].media_controller.queue_prev()
elif cmd == 'power' and cmd_args == 'off':
- self._players[player_id].powered = False # power is not supported
- await self.mass.player.update_player(self._players[player_id])
+ self._chromecasts[player_id].quit_app() # power is not supported so send quit app instead
elif cmd == 'power':
- self._players[player_id].powered = True # power is not supported
+ self._chromecasts[player_id].media_controller.launch()
elif cmd == 'volume':
self._chromecasts[player_id].set_volume(try_parse_int(cmd_args)/100)
elif cmd == 'mute' and cmd_args == 'off':
self._chromecasts[player_id].set_volume_muted(False)
elif cmd == 'mute':
self._chromecasts[player_id].set_volume_muted(True)
- elif cmd == 'power':
- pass # power is not supported on chromecast
async def player_queue(self, player_id, offset=0, limit=50):
''' return the items in the player's queue '''
startindex = 0
elif queue_opt == 'next':
# play the new items after the current playing item (insert before current next item)
- castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + plcastplayerayer.queue[:cur_queue_index]
+ castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index]
startindex = cur_queue_index
else:
# overwrite the whole queue with new item(s)
"items": castplayer.queue[:10]
}
await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+ await asyncio.sleep(1)
# append the rest of the items in the queue in chunks
for chunk in chunks(castplayer.queue[10:], 100):
- await asyncio.sleep(1)
queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+ await asyncio.sleep(0.1)
elif queue_opt == 'add':
# existing queue is playing: simply append items to the end of the queue (in small chunks)
castplayer.queue = castplayer.queue + new_queue_items
for chunk in chunks(new_queue_items, 100):
queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
- await asyncio.sleep(1)
+ await asyncio.sleep(0.1)
elif queue_opt == 'next':
# play the new items after the current playing item (insert before current next item)
player.queue = castplayer.queue[:cur_queue_index] + new_queue_items + castplayer.queue[cur_queue_index:]
### Provider specific (helper) methods #####
+ async def __resume_queue(self, player_id):
+ ''' resume queue play after power off '''
+ player = self._players[player_id]
+ castplayer = self._chromecasts[player_id]
+ media_controller = castplayer.media_controller
+ receiver_ctrl = media_controller._socket_client.receiver_controller
+ startindex = 0
+ if player.cur_item and player.cur_item.name:
+ for index, item in enumerate(castplayer.queue):
+ if item['media']['metadata']['title'] == player.cur_item.name:
+ startindex = index
+ break
+ queuedata = {
+ "type": 'QUEUE_LOAD',
+ "repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
+ "shuffle": player.shuffle_enabled,
+ "queueType": "PLAYLIST",
+ "startIndex": startindex, # Item index to play after this request or keep same item if undefined
+ "items": castplayer.queue[:10]
+ }
+ await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+ await asyncio.sleep(1)
+ # append the rest of the items in the queue in chunks
+ for chunk in chunks(castplayer.queue[10:], 100):
+ await asyncio.sleep(0.1)
+ queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
+ await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
async def __send_player_queue(self, receiver_ctrl, media_controller, queuedata):
'''send new data to the CC queue'''
def app_launched_callback():
if caststatus:
player.muted = caststatus.volume_muted
player.volume_level = caststatus.volume_level * 100
- player.powered = not caststatus.is_stand_by
+ player.powered = chromecast.media_controller.status.media_session_id != None
if mediastatus:
if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
player.state = PlayerState.Playing