import operator
from aiohttp import web
import threading
+import urllib
+import math
+from memory_tempfile import MemoryTempfile
+import tempfile
AUDIO_TEMP_DIR = "/tmp/audio_tmp"
AUDIO_CACHE_DIR = "/tmp/audio_cache"
break
await resp.write(chunk)
queue.task_done()
- LOGGER.info("Finished streaming %s" % track_id)
+ LOGGER.info("stream_track fininished for %s" % track_id)
except asyncio.CancelledError:
cancelled.set()
- LOGGER.info("Streaming interrupted for %s" % track_id)
+ LOGGER.info("stream_track interrupted for %s" % track_id)
raise asyncio.CancelledError()
return resp
raise asyncio.CancelledError()
return resp
+ async def stream_queue(self, http_request):
+ ''' start streaming radio from provider '''
+ player_id = http_request.query.get('player_id')
+ cancelled = threading.Event()
+ resp = web.StreamResponse(status=200,
+ reason='OK',
+ headers={'Content-Type': 'audio/flac'})
+ await resp.prepare(http_request)
+ if http_request.method.upper() != 'HEAD':
+ # stream audio
+ queue = asyncio.Queue()
+ cancelled = threading.Event()
+ task = run_async_background_task(
+ self.mass.bg_executor,
+ self.__stream_queue, player_id, queue, cancelled)
+ try:
+ while True:
+ chunk = await queue.get()
+ await resp.write(chunk)
+ queue.task_done()
+ if not chunk:
+ break
+ LOGGER.info("stream_queue fininished for %s" % player_id)
+ except asyncio.CancelledError:
+ cancelled.set()
+ LOGGER.info("stream_queue interrupted for %s" % player_id)
+ raise asyncio.CancelledError()
+ return resp
+
+ async def __stream_queue(self, player_id, buffer, cancelled):
+ ''' start streaming radio from provider '''
+ # stream audio with sox
+ queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000)
+ sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
+ fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
+ pcm_args = 'raw -b 64 -c 2 -e floating-point -r %s' % sample_rate
+ args = 'sox -t %s - -t flac -C 2 -' % pcm_args
+ sox_proc = await asyncio.create_subprocess_shell(args,
+ stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+
+ async def fill_buffer():
+ while not sox_proc.stdout.at_eof():
+ chunk = await sox_proc.stdout.read(256000)
+ if not chunk:
+ break
+ await buffer.put(chunk)
+ await sox_proc.wait()
+ await buffer.put('') # indicate EOF
+ LOGGER.info("streaming of queue for player %s completed" % player_id)
+ asyncio.create_task(fill_buffer())
+
+ last_fadeout_data = None
+ for queue_track in queue_tracks:
+
+ while buffer.qsize() > 5 and not cancelled.is_set():
+ await asyncio.sleep(1)
+ if cancelled.is_set():
+ break
+
+ params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
+ track_id = params['track_id'][0]
+ provider = params['provider'][0]
+ LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name))
+ temp_file = await self.__get_pcm_audio(track_id, provider, sample_rate)
+
+ # get fade in part
+ args = 'sox -t %s %s -t %s - trim 0 %s fade t %s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ fade_in_part, stderr = await process.communicate()
+ LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+ if last_fadeout_data:
+ # perform crossfade with previous fadeout samples
+ fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ fadeinfile.write(fade_in_part)
+ fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ fadeoutfile.write(last_fadeout_data)
+ args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ crossfade_part, stderr = await process.communicate(fade_in_part)
+ LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+ sox_proc.stdin.write(crossfade_part)
+ await sox_proc.stdin.drain()
+ fadeinfile.close()
+ fadeoutfile.close()
+ else:
+ # simply put the fadein part in the final file
+ sox_proc.stdin.write(fade_in_part)
+ await sox_proc.stdin.drain()
+
+ # get middle frames (main track without the fade-in and fade-out)
+ args = 'sox -t %s %s -t %s - trim %s -%s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ middle_part, stderr = await process.communicate()
+ LOGGER.debug("Got %s bytes in memory for middle_part after sox" % len(middle_part))
+ sox_proc.stdin.write(middle_part)
+ await sox_proc.stdin.drain()
+
+ # get fade out part (all remaining chunks of 1 second)
+ args = 'sox -t %s %s -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ fade_out_part, stderr = await process.communicate()
+ LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(fade_out_part))
+ last_fadeout_data = fade_out_part
+ # close temp file
+ temp_file.close()
+ # end of queue reached, pass last fadeout bits to final output
+ if last_fadeout_data:
+ sox_proc.stdin.write(last_fadeout_data)
+ await sox_proc.stdin.drain()
+
+ async def __get_pcm_audio(self, track_id, provider, sample_rate=96000):
+ ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
+ temp_audiofile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ cachefile = self.__get_track_cache_filename(track_id, provider)
+ if self.mass.config['base']['http_streamer']['volume_normalisation']:
+ gain_correct = await self.__get_track_gain_correct(track_id, provider)
+ else:
+ gain_correct = -6 # always need some headroom for upsampling and crossfades
+ if os.path.isfile(cachefile):
+ # we have a cache file for this track which we can use
+ # always convert to 64 bit floating point to do any processing/effects
+ args = 'sox -t flac "%s" -t wav -c 2 -e floating-point -b 64 - | sox -t wav - -t raw %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate)
+ process = await asyncio.create_subprocess_shell(args)
+ else:
+ # stream from provider
+ # always convert to 64 bit floating point to do any processing/effects
+ input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
+ assert(input_content_type)
+ args = 'sox -t %s - -t wav -c 2 -e floating-point -b 64 - | sox -t wav - -t raw %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate)
+ process = await asyncio.create_subprocess_shell(args,
+ stdin=asyncio.subprocess.PIPE)
+ asyncio.get_event_loop().create_task(
+ self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+ await process.wait()
+ LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id)
+ return temp_audiofile
+
async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
cachefile = self.__get_track_cache_filename(track_id, provider)
sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False)
+ if self.mass.config['base']['http_streamer']['volume_normalisation']:
+ gain_correct = await self.__get_track_gain_correct(track_id, provider)
+ sox_effects += ' vol %s dB ' % gain_correct
if os.path.isfile(cachefile):
# we have a cache file for this track which we can use
- args = 'sox -t flac %s -t flac -C 0 - %s' % (cachefile, sox_effects)
+ if sox_effects.strip():
+ # always convert to 64 bit floating point to do any processing/effects
+ args = 'sox -t flac "%s" -t wav -b 64 -e floating-point - | sox -t wav - -t flac -C 2 - %s' % (cachefile, sox_effects)
+ else:
+ args = 'sox -t flac "%s" -t flac -C 2 - %s' % cachefile
LOGGER.info("Running sox with args: %s" % args)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE)
# stream from provider
input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
assert(input_content_type)
- args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
+ if sox_effects.strip():
+ # always convert to 64 bit floating point to do any processing/effects
+ args = 'sox -t %s - -t wav -b 64 -e floating-point - | sox -t wav - -t flac -C 2 - %s' % (input_content_type, sox_effects)
+ else:
+ args = 'sox -t %s - -t flac -C 0 -' % (input_content_type)
LOGGER.info("Running sox with args: %s" % args)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
# put chunks from stdout into queue
while not process.stdout.at_eof():
- chunk = await process.stdout.read(256000)
+ chunk = await process.stdout.read(705600)
if not chunk:
break
if not cancelled.is_set():
await process.wait()
await audioqueue.put('') # indicate EOF
if cancelled.is_set():
- LOGGER.info("streaming of track_id %s interrupted" % track_id)
+ LOGGER.info("__get_audio_stream for track_id %s interrupted" % track_id)
else:
- LOGGER.info("streaming of track_id %s completed" % track_id)
+ LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
async def __get_player_sox_options(self, track_id, provider, player_id, is_radio):
''' get player specific sox options '''
sox_effects += 'rate -v 48000'
if player_id and self.mass.config['player_settings'][player_id]['sox_effects']:
sox_effects += ' ' + self.mass.config['player_settings'][player_id]['sox_effects']
- if self.mass.config['base']['http_streamer']['volume_normalisation']:
- gain_correct = await self.__get_track_gain_correct(track_id, provider)
- sox_effects += ' vol %s dB ' % gain_correct
return sox_effects
async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
### Provider specific implementation #####
+ async def player_config_entries(self):
+ ''' get the player config entries for this provider (list with key/value pairs)'''
+ return [
+ ("crossfade_duration", 0, "crossfade_duration"),
+ ]
+
async def player_command(self, player_id, cmd:str, cmd_args=None):
''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
count = 0
''' load queue on player with given queue items '''
castplayer = self._chromecasts[player_id]
player = self._players[player_id]
- queue_items = await self.__create_queue_items(new_tracks[:50])
+ queue_items = await self.__create_queue_items(new_tracks[:50], player_id=player_id)
queuedata = {
"type": 'QUEUE_LOAD',
"repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
async def __queue_insert(self, player_id, new_tracks, insert_before=None):
''' insert item into the player queue '''
castplayer = self._chromecasts[player_id]
- queue_items = await self.__create_queue_items(new_tracks)
+ queue_items = await self.__create_queue_items(new_tracks, player_id=player_id)
for chunk in chunks(queue_items, 50):
queuedata = {
"type": 'QUEUE_INSERT',
tracks = self._player_queue[player_id]
await self.__queue_load(player_id, tracks, queue_index)
- async def __create_queue_items(self, tracks):
+ async def __create_queue_items(self, tracks, player_id):
''' create list of CC queue items from tracks '''
queue_items = []
for track in tracks:
- queue_item = await self.__create_queue_item(track)
+ queue_item = await self.__create_queue_item(track, player_id)
queue_items.append(queue_item)
return queue_items
- async def __create_queue_item(self, track):
+ async def __create_queue_item(self, track, player_id):
'''create queue item from track info '''
+ enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
+ if enable_crossfade:
+ uri = 'http://%s:%s/stream_queue?player_id=%s'% (self.mass.player.local_ip, self.mass.config['base']['web']['http_port'], player_id)
+ else:
+ uri = track.uri
return {
'autoplay' : True,
'preloadTime' : 10,
'startTime' : 0,
'activeTrackIds' : [],
'media': {
- 'contentId': track.uri,
+ 'contentId': uri,
'customData': {
'provider': track.provider,
'uri': track.uri,
elif uri.startswith('qobuz://') and 'qobuz' in self.mass.music.providers:
track_id = uri.replace('qobuz://','').replace('.flac','')
track = await self.mass.music.providers['qobuz'].track(track_id)
- elif uri.startswith('http') and '/stream' in uri:
+ elif uri.startswith('http') and '/stream_track' in uri:
params = urllib.parse.parse_qs(uri.split('?')[1])
track_id = params['track_id'][0]
provider = params['provider'][0]
track = await self.mass.music.providers[provider].track(track_id)
+ elif uri.startswith('http') and '/stream_queue' in uri:
+ track = Track()
+ track.name = "Crossfade Queue streaming"
return track
async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):