import math
from memory_tempfile import MemoryTempfile
import tempfile
+import io
+import soundfile as sf
+import pyloudnorm as pyln
+import aiohttp
AUDIO_TEMP_DIR = "/tmp/audio_tmp"
AUDIO_CACHE_DIR = "/tmp/audio_cache"
self.mass = mass
self.create_config_entries()
self.local_ip = get_ip()
+ self.analyze_jobs = {}
self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
# create needed temp/cache dirs
if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir):
await resp.prepare(http_request)
if http_request.method.upper() != 'HEAD':
# stream audio
- queue = asyncio.Queue()
cancelled = threading.Event()
- task = run_async_background_task(
- self.mass.bg_executor,
- self.__get_audio_stream, queue, track_id, provider, player_id, cancelled)
+ queue = asyncio.Queue()
+
+ async def fill_buffer():
+ ''' fill buffer runs in background process to prevent deadlocks of the sox executable '''
+ audio_stream = self.__get_audio_stream(track_id, provider, player_id)
+ async for is_last_chunk, audio_chunk in audio_stream:
+ if not cancelled.is_set():
+ await queue.put(audio_chunk)
+ await queue.put(b'') # EOF
+ run_async_background_task(self.mass.bg_executor, fill_buffer)
+
try:
while True:
chunk = await queue.get()
break
await resp.write(chunk)
queue.task_done()
- LOGGER.info("stream_track fininished for %s" % track_id)
except (asyncio.CancelledError, asyncio.TimeoutError):
cancelled.set()
LOGGER.info("stream_track interrupted for %s" % track_id)
raise asyncio.CancelledError()
- return resp
+ else:
+ LOGGER.info("stream_track fininished for %s" % track_id)
+ return resp
async def stream_radio(self, http_request):
''' start streaming radio from provider '''
'''
stream all tracks in queue from player with http
loads large part of audiodata in memory so only recommended for high performance servers
- use case is enable crossfade support for chromecast devices
+ use case is enable crossfade/gapless support for chromecast devices
'''
player_id = http_request.query.get('player_id')
startindex = int(http_request.query.get('startindex'))
try:
while True:
chunk = await queue.get()
- await resp.write(chunk)
- queue.task_done()
if not chunk:
+ queue.task_done()
break
+ await resp.write(chunk)
+ queue.task_done()
LOGGER.info("stream_queue fininished for %s" % player_id)
except asyncio.CancelledError:
cancelled.set()
''' start streaming all queue tracks '''
sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
+ fade_bytes = int(sample_rate * 4 * 2 * fade_length)
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
args = 'sox -t %s - -t flac -C 0 -' % pcm_args
sox_proc = await asyncio.create_subprocess_shell(args,
await buffer.put(b'') # indicate EOF
asyncio.create_task(fill_buffer())
- player = await self.mass.player.player(player_id)
-
# retrieve player object
player = await self.mass.player.player(player_id)
queue_index = startindex
track_id = params['track_id'][0]
provider = params['provider'][0]
LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
- fade_bytes = int(sample_rate * 4 * 2 * fade_length)
fade_in_part = b''
cur_chunk = 0
prev_chunk = None
bytes_written = 0
- async for is_last_chunk, chunk in self.__get_raw_audio(track_id, provider, sample_rate, fade_bytes):
+ async for is_last_chunk, chunk in self.__get_audio_stream(
+ track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args,
+ sox_effects='rate -v %s' % sample_rate ):
cur_chunk += 1
if cur_chunk == 1 and not last_fadeout_data:
# fade-in part but this is the first track so just pass it to the final file
prev_chunk = chunk
else:
prev_chunk = chunk
+ # pre-analyse the next track - to ensure smooth transitions
+ try:
+ queue_tracks = await self.mass.player.player_queue(player_id, queue_index+1, queue_index+2)
+ queue_track = queue_tracks[0]
+ params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
+ track_id = params['track_id'][0]
+ provider = params['provider'][0]
+ self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider))
+ except:
+ pass
# wait for the queue to consume the data
# this prevents that the entire track is sitting in memory
# and it helps a bit in the quest to follow where we are in the queue
- while buffer.qsize() > 1 and not cancelled.is_set():
+ while buffer.qsize() > 2 and not cancelled.is_set():
await asyncio.sleep(1)
# end of the track reached
# WIP: update actual duration to the queue for more accurate now playing info
await sox_proc.wait()
LOGGER.info("streaming of queue for player %s completed" % player_id)
- async def __get_raw_audio(self, track_id, provider, sample_rate, chunksize):
- ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
- cachefile = self.__get_track_cache_filename(track_id, provider)
- pcm_args = 'raw -b 32 -c 2 -e signed-integer'
+ async def __get_audio_stream(self, track_id, provider, player_id,
+ chunksize=512000, outputfmt='flac -C 0', sox_effects=''):
+ ''' get audio stream from provider and apply additional effects/processing where/if needed'''
if self.mass.config['base']['http_streamer']['volume_normalisation']:
gain_correct = await self.__get_track_gain_correct(track_id, provider)
+ gain_correct = 'vol %s dB ' % gain_correct
else:
- gain_correct = -6 # always need some headroom for upsampling and crossfades
+ gain_correct = ''
+ sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False)
+
+ cachefile = self.__get_track_cache_filename(track_id, provider)
if os.path.isfile(cachefile):
- # we have a cache file for this track which we can use
- args = 'sox -t flac "%s" -t %s - vol %s dB rate -v %s' % (cachefile, pcm_args, gain_correct, sample_rate)
- process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
+ # stream from cachefile
+ args = 'sox -t sox "%s" -t %s - %s %s' % (cachefile, outputfmt, gain_correct, sox_effects)
else:
- # stream from provider
- input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
- assert(input_content_type)
- args = 'sox -t %s - -t %s - vol %s dB rate -v %s' % (input_content_type, pcm_args, gain_correct, sample_rate)
- process = await asyncio.create_subprocess_shell(args,
- stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
- asyncio.get_event_loop().create_task(
- self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
- # put chunks from stdout into queue
+ # stream directly from provider
+ streamdetails = asyncio.run_coroutine_threadsafe(
+ self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
+ if not streamdetails:
+ yield b''
+ return
+ if streamdetails['type'] == 'url':
+ args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects)
+ elif streamdetails['type'] == 'executable':
+ args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects)
+ LOGGER.debug("Running sox with args: %s" % args)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE)
+ # yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
- prev_chunk = None
+ prev_chunk = b''
while not process.stdout.at_eof():
try:
chunk = await process.stdout.readexactly(chunksize)
except asyncio.streams.IncompleteReadError:
chunk = await process.stdout.read(chunksize)
- if not chunk:
- break
if prev_chunk:
yield (False, prev_chunk)
prev_chunk = chunk
# yield last chunk
yield (True, prev_chunk)
await process.wait()
- LOGGER.info("__get_raw_audio for track_id %s completed" % (track_id))
-
- async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None):
- ''' get audio stream from provider and apply additional effects/processing where/if needed'''
- cachefile = self.__get_track_cache_filename(track_id, provider)
- sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False)
- if self.mass.config['base']['http_streamer']['volume_normalisation']:
- gain_correct = await self.__get_track_gain_correct(track_id, provider)
- sox_effects += ' vol %s dB ' % gain_correct
- if os.path.isfile(cachefile):
- # we have a cache file for this track which we can use
- if sox_effects.strip():
- args = 'sox -t flac "%s" -t flac -C 0 - %s' % (cachefile, sox_effects)
- else:
- args = 'sox -t flac "%s" -t flac -C 0 - %s' % cachefile
- LOGGER.debug("Running sox with args: %s" % args)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE)
- buffer_task = None
- else:
- # stream from provider
- input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
- assert(input_content_type)
- if sox_effects.strip():
- args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
- else:
- args = 'sox -t %s - -t flac -C 0 -' % (input_content_type)
- LOGGER.debug("Running sox with args: %s" % args)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- buffer_task = asyncio.get_event_loop().create_task(
- self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
- # put chunks from stdout into queue
- while not process.stdout.at_eof():
- chunk = await process.stdout.read(705600)
- if not chunk:
- break
- if not cancelled.is_set():
- await audioqueue.put(chunk)
- if audioqueue.qsize() > 10:
- await asyncio.sleep(0.1) # cooldown a bit
- await process.wait()
- await audioqueue.put('') # indicate EOF
- if cancelled.is_set():
- LOGGER.warning("__get_audio_stream for track_id %s interrupted" % track_id)
- else:
- LOGGER.debug("__get_audio_stream for track_id %s completed" % track_id)
+ LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
+ # send task to background to analyse the audio
+ self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider))
async def __get_player_sox_options(self, track_id, provider, player_id, is_radio):
''' get player specific sox options '''
sox_effects += ' ' + self.mass.config['player_settings'][player_id]['sox_effects']
return sox_effects
- async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
+ async def __analyze_audio(self, track_id, provider):
''' analyze track audio, for now we only calculate EBU R128 loudness '''
- LOGGER.info('Start analyzing file %s' % tmpfile)
+ track_key = '%s%s' %(track_id, provider)
+ if track_key in self.analyze_jobs:
+ return # prevent multiple analyze jobs for same tracks
+ self.analyze_jobs[track_key] = True
+ streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id)
cachefile = self.__get_track_cache_filename(track_id, provider)
- # not needed to do processing if there already is a cachedfile
- bs1770_binary = self.__get_bs1770_binary()
- if bs1770_binary:
- # calculate integrated r128 loudness with bs1770
- analyse_dir = os.path.join(self.mass.datapath, 'analyse_info')
- analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
- if not os.path.isfile(analysis_file):
- if not os.path.isdir(analyse_dir):
- os.makedirs(analyse_dir)
- cmd = '%s %s --xml --ebu -f %s' % (bs1770_binary, tmpfile, analysis_file)
- process = await asyncio.create_subprocess_shell(cmd)
- await process.wait()
- if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isfile(cachefile):
+ enable_cache = self.mass.config['base']['http_streamer']['enable_cache']
+ needs_cachefile = enable_cache and not os.path.isfile(cachefile)
+ track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
+ if needs_cachefile or track_loudness == None:
+ # only when needed we do the analyze stuff
+ LOGGER.info('Start analyzing track %s' % track_id)
+ if streamdetails['type'] == 'url':
+ async with aiohttp.ClientSession() as session:
+ async with session.get(streamdetails["path"]) as resp:
+ audio_data = await resp.read()
+ elif streamdetails['type'] == 'executable':
+ process = await asyncio.create_subprocess_shell(streamdetails["path"],
+ stdout=asyncio.subprocess.PIPE)
+ audio_data, stderr = await process.communicate()
+ # calculate BS.1770 R128 integrated loudness
+ if track_loudness == None:
+ with io.BytesIO(audio_data) as tmpfile:
+ data, rate = sf.read(tmpfile)
+ meter = pyln.Meter(rate) # create BS.1770 meter
+ loudness = meter.integrated_loudness(data) # measure loudness
+ del data
+ LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness))
+ await self.mass.db.set_track_loudness(track_id, provider, loudness)
+ if needs_cachefile:
# use sox to store cache file (strip silence from start and end for better transitions)
- cmd = 'sox -t %s %s -t flac -C5 %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(content_type, tmpfile, cachefile)
- process = await asyncio.create_subprocess_shell(cmd)
- await process.wait()
-
- # always clean up temp file
- if os.path.isfile(tmpfile):
- os.remove(tmpfile)
- LOGGER.info('Fininished analyzing file %s' % tmpfile)
+ cmd = 'sox -t %s - -t sox %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(streamdetails['content_type'], cachefile)
+ process = await asyncio.create_subprocess_shell(cmd, stdin=asyncio.subprocess.PIPE)
+ await process.communicate(audio_data)
+ del audio_data
+ LOGGER.info('Finished analyzing track %s' % track_id)
+ self.analyze_jobs.pop(track_key, None)
async def __get_track_gain_correct(self, track_id, provider):
''' get the gain correction that should be applied to a track '''
target_gain = int(self.mass.config['base']['http_streamer']['target_volume'])
fallback_gain = int(self.mass.config['base']['http_streamer']['fallback_gain_correct'])
- analysis_file = os.path.join(self.mass.datapath, 'analyse_info', "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
- if not os.path.isfile(analysis_file):
+ track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
+ if track_loudness == None:
return fallback_gain
- try: # read audio analysis if available
- tree = ET.parse(analysis_file)
- trackinfo = tree.getroot().find("album").find("track")
- track_lufs = trackinfo.find('integrated').get('lufs')
- gain_correct = target_gain - float(track_lufs)
- except Exception as exc:
- LOGGER.error('could not retrieve track gain - %s' % exc)
- gain_correct = fallback_gain # fallback value
- if os.path.isfile(analysis_file):
- os.remove(analysis_file)
- # reschedule analyze task to try again
- cachefile = self.__get_track_cache_filename(track_id, provider)
- self.mass.event_loop.create_task(self.__analyze_audio(cachefile, track_id, provider, 'flac'))
+ gain_correct = target_gain - track_loudness
return round(gain_correct,2)
async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
fd = open(tmpfile, 'wb')
async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
+ if not chunk:
+ break
buf.write(chunk)
await buf.drain()
fd.write(chunk)
- await buf.drain()
LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
# successfull completion, process temp file for analysis
self.mass.event_loop.create_task(