import os
from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
from models import TrackQuality, MediaType, PlayerState
-import shutil
-import xml.etree.ElementTree as ET
-import random
-import base64
import operator
from aiohttp import web
import threading
import urllib
-import math
from memory_tempfile import MemoryTempfile
-import tempfile
import io
import soundfile as sf
import pyloudnorm as pyln
import aiohttp
-AUDIO_TEMP_DIR = "/tmp/audio_tmp"
-AUDIO_CACHE_DIR = "/tmp/audio_cache"
class HTTPStreamer():
''' Built-in streamer using sox and webserver '''
self.create_config_entries()
self.local_ip = get_ip()
self.analyze_jobs = {}
- self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
- # create needed temp/cache dirs
- if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir):
- self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
- os.makedirs(self._audio_cache_dir)
- if not os.path.isdir(AUDIO_TEMP_DIR):
- os.makedirs(AUDIO_TEMP_DIR)
- mass.event_loop.create_task(self.__cache_cleanup())
def create_config_entries(self):
''' sets the config entries for this module (list with key/value pairs)'''
config_entries = [
('volume_normalisation', True, 'enable_r128_volume_normalisation'),
('target_volume', '-23', 'target_volume_lufs'),
- ('fallback_gain_correct', '-12', 'fallback_gain_correct'),
- ('enable_cache', True, 'enable_audio_cache'),
- ('trim_silence', True, 'trim_silence'),
- ('audio_cache_folder', '/tmp/audio_cache', 'audio_cache_folder'),
- ('audio_cache_max_size_gb', 20, 'audio_cache_max_size_gb')
+ ('fallback_gain_correct', '-12', 'fallback_gain_correct')
]
if not self.mass.config['base'].get('http_streamer'):
self.mass.config['base']['http_streamer'] = {}
track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args,
sox_effects='rate -v %s' % sample_rate ):
cur_chunk += 1
- if cur_chunk == 1 and not last_fadeout_data:
+ if cur_chunk <= 2 and not last_fadeout_data:
# fade-in part but this is the first track so just pass it to the final file
sox_proc.stdin.write(chunk)
await sox_proc.stdin.drain()
bytes_written += len(chunk)
elif cur_chunk == 1 and last_fadeout_data:
- # create fade-out part
- args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
+ prev_chunk = chunk
+ elif cur_chunk == 2 and last_fadeout_data:
+ # combine the first 2 chunks and strip off silence
+ args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- last_fadeout_data, stderr = await process.communicate(last_fadeout_data)
- LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
- # create fade-in part
- args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- fade_in_part, stderr = await process.communicate(chunk)
- LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
- # create crossfade using sox and some temp files
- # TODO: figure out how to make this less complex and without the tempfiles
- fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
- fadeinfile.write(fade_in_part)
- fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
- fadeoutfile.write(last_fadeout_data)
- args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- crossfade_part, stderr = await process.communicate(fade_in_part)
- LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
- # write the crossfade part to the sox player
+ first_part, stderr = await process.communicate(prev_chunk + chunk)
+ fade_in_part = first_part[:fade_bytes]
+ remaining_bytes = first_part[fade_bytes:]
+ del first_part
+ # do crossfade
+ crossfade_part = await self.__crossfade_pcm_parts(fade_in_part, last_fadeout_data, pcm_args, fade_length)
sox_proc.stdin.write(crossfade_part)
await sox_proc.stdin.drain()
bytes_written += len(crossfade_part)
- fadeinfile.close()
- fadeoutfile.close()
del crossfade_part
- fade_in_part = None
+ del fade_in_part
last_fadeout_data = b''
+ # also write the leftover bytes from the strip action
+ sox_proc.stdin.write(remaining_bytes)
+ await sox_proc.stdin.drain()
+ bytes_written += len(remaining_bytes)
+ del remaining_bytes
elif prev_chunk and is_last_chunk:
- # last chunk received so create the fadeout with the previous chunk and this chunk
- last_part = prev_chunk + chunk
+ # last chunk received so create the fadeout_part with the previous chunk and this chunk
+ # and strip off silence
+ args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ last_part, stderr = await process.communicate(prev_chunk + chunk)
last_fadeout_data = last_part[-fade_bytes:]
- bytes_remaining = last_part[:-fade_bytes]
- sox_proc.stdin.write(bytes_remaining)
- bytes_written += len(bytes_remaining)
+ remaining_bytes = last_part[:-fade_bytes]
+ # write remaining bytes
+ sox_proc.stdin.write(remaining_bytes)
+ bytes_written += len(remaining_bytes)
await sox_proc.stdin.drain()
+ del last_part
+ del remaining_bytes
else:
# middle part of the track
# keep previous chunk in memory so we have enough samples to perform the crossfade
prev_chunk = chunk
else:
prev_chunk = chunk
- # pre-analyse the next track - to ensure smooth transitions
- try:
- queue_tracks = await self.mass.player.player_queue(player_id, queue_index+1, queue_index+2)
- queue_track = queue_tracks[0]
- params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
- track_id = params['track_id'][0]
- provider = params['provider'][0]
- self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider))
- except:
- pass
# wait for the queue to consume the data
# this prevents that the entire track is sitting in memory
# and it helps a bit in the quest to follow where we are in the queue
else:
gain_correct = ''
sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False)
-
- cachefile = self.__get_track_cache_filename(track_id, provider)
- if os.path.isfile(cachefile):
- # stream from cachefile
- args = 'sox -t sox "%s" -t %s - %s %s' % (cachefile, outputfmt, gain_correct, sox_effects)
- else:
- # stream directly from provider
- streamdetails = asyncio.run_coroutine_threadsafe(
- self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
- if not streamdetails:
- yield b''
- return
- if streamdetails['type'] == 'url':
- args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects)
- elif streamdetails['type'] == 'executable':
- args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects)
+ # stream audio from provider
+ streamdetails = asyncio.run_coroutine_threadsafe(
+ self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
+ if not streamdetails:
+ yield b''
+ return
+ if streamdetails['type'] == 'url':
+ args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects)
+ elif streamdetails['type'] == 'executable':
+ args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects)
LOGGER.debug("Running sox with args: %s" % args)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE)
return # prevent multiple analyze jobs for same tracks
self.analyze_jobs[track_key] = True
streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id)
- cachefile = self.__get_track_cache_filename(track_id, provider)
- enable_cache = self.mass.config['base']['http_streamer']['enable_cache']
- needs_cachefile = enable_cache and not os.path.isfile(cachefile)
track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
- if needs_cachefile or track_loudness == None:
+ if track_loudness == None:
# only when needed we do the analyze stuff
LOGGER.info('Start analyzing track %s' % track_id)
if streamdetails['type'] == 'url':
del data
LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness))
await self.mass.db.set_track_loudness(track_id, provider, loudness)
- if needs_cachefile:
- # use sox to store cache file (strip silence from start and end for better transitions)
- cmd = 'sox -t %s - -t sox %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(streamdetails['content_type'], cachefile)
- process = await asyncio.create_subprocess_shell(cmd, stdin=asyncio.subprocess.PIPE)
- await process.communicate(audio_data)
del audio_data
LOGGER.info('Finished analyzing track %s' % track_id)
self.analyze_jobs.pop(track_key, None)
gain_correct = target_gain - track_loudness
return round(gain_correct,2)
- async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
- ''' get audio data from provider and write to buffer'''
- # fill the buffer with audio data
- # a tempfile is created so we can do audio analysis
- try:
- tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
- fd = open(tmpfile, 'wb')
- async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
- if not chunk:
- break
- buf.write(chunk)
- await buf.drain()
- fd.write(chunk)
- LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
- # successfull completion, process temp file for analysis
- self.mass.event_loop.create_task(
- self.__analyze_audio(tmpfile, track_id, provider, content_type))
- except Exception as exc:
- LOGGER.exception("fill_audio_buffer failed for track %s" % track_id)
- finally:
- buf.write_eof()
- await buf.drain()
- fd.close()
-
- def __get_track_cache_filename(self, track_id, provider):
- ''' get filename for a track to use as cache file '''
- filename = '%s%s' %(provider, track_id.split(os.sep)[-1])
- filename = base64.b64encode(filename.encode()).decode()
- return os.path.join(self._audio_cache_dir, filename)
-
- @run_periodic(3600)
- async def __cache_cleanup(self):
- ''' calculate size of cache folder and cleanup if needed '''
- def cleanup():
- size_limit = self.mass.config['base']['http_streamer']['audio_cache_max_size_gb']
- total_size_gb = get_folder_size(self._audio_cache_dir)
- LOGGER.info("current size of cache folder is %s GB" % total_size_gb)
- if size_limit and total_size_gb > size_limit:
- LOGGER.info("Cache folder size exceeds threshold, start cleanup...")
- from pathlib import Path
- import time
- days = 14
- while total_size_gb > size_limit:
- time_in_secs = time.time() - (days * 24 * 60 * 60)
- for i in Path(self._audio_cache_dir).iterdir():
- if i.is_file():
- if i.stat().st_atime <= time_in_secs:
- total_size_gb -= i.stat().st_size/float(1<<30)
- i.unlink()
- if total_size_gb < size_limit:
- break
- days -= 1
- LOGGER.info("Cache folder size cleanup completed")
- await self.mass.event_loop.run_in_executor(None, cleanup)
-
- @staticmethod
- def __get_bs1770_binary():
- ''' get the path to the bs1770 binary for the current OS '''
- import platform
- bs1770_binary = None
- if platform.system() == "Windows":
- bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain")
- elif platform.system() == "Darwin":
- # macos binary is x86_64 intel
- bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain")
- elif platform.system() == "Linux":
- architecture = platform.machine()
- if architecture.startswith('AMD64') or architecture.startswith('x86_64'):
- bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain")
- # TODO: build armhf binary
- return bs1770_binary
\ No newline at end of file
+ async def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length):
+ ''' crossfade two chunks of audio using sox '''
+ # create fade-in part
+ fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length)
+ process = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE)
+ await process.communicate(fade_in_part)
+ # create fade-out part
+ fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ await process.communicate(fade_out_part)
+ # create crossfade using sox and some temp files
+ # TODO: figure out how to make this less complex and without the tempfiles
+ args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ crossfade_part, stderr = await process.communicate()
+ LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+ return crossfade_part