add support for crossfading
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 6 Jun 2019 12:41:18 +0000 (14:41 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 6 Jun 2019 12:41:18 +0000 (14:41 +0200)
- use high quality settings for sox for all stream alterations
- allow crossfading on lms and chromecast
- now playing needs fixing when using the crossfade stream

music_assistant/modules/http_streamer.py
music_assistant/modules/music.py
music_assistant/modules/player.py
music_assistant/modules/playerproviders/chromecast.py
music_assistant/modules/playerproviders/lms.py
music_assistant/modules/playerproviders/pylms.py
music_assistant/modules/web.py
music_assistant/web/pages/config.vue.js

index c8e262094df709e6451df7144cffc7b4d122182e..0ded2873556643623947243e5a60d00b9ec76b63 100755 (executable)
@@ -12,6 +12,10 @@ import base64
 import operator
 from aiohttp import web
 import threading
+import urllib
+import math
+from memory_tempfile import MemoryTempfile
+import tempfile
 
 AUDIO_TEMP_DIR = "/tmp/audio_tmp"
 AUDIO_CACHE_DIR = "/tmp/audio_cache"
@@ -74,10 +78,10 @@ class HTTPStreamer():
                         break
                     await resp.write(chunk)
                     queue.task_done()
-                LOGGER.info("Finished streaming %s" % track_id)
+                LOGGER.info("stream_track fininished for %s" % track_id)
             except asyncio.CancelledError:
                 cancelled.set()
-                LOGGER.info("Streaming interrupted for %s" % track_id)
+                LOGGER.info("stream_track interrupted for %s" % track_id)
                 raise asyncio.CancelledError()
         return resp
 
@@ -123,13 +127,161 @@ class HTTPStreamer():
                 raise asyncio.CancelledError()
         return resp
     
+    async def stream_queue(self, http_request):
+        ''' start streaming radio from provider '''
+        player_id = http_request.query.get('player_id')
+        cancelled = threading.Event()
+        resp = web.StreamResponse(status=200,
+                                 reason='OK',
+                                 headers={'Content-Type': 'audio/flac'})
+        await resp.prepare(http_request)
+        if http_request.method.upper() != 'HEAD':
+            # stream audio
+            queue = asyncio.Queue()
+            cancelled = threading.Event()
+            task = run_async_background_task(
+                self.mass.bg_executor, 
+                self.__stream_queue, player_id, queue, cancelled)
+            try:
+                while True:
+                    chunk = await queue.get()
+                    await resp.write(chunk)
+                    queue.task_done()
+                    if not chunk:
+                        break
+                LOGGER.info("stream_queue fininished for %s" % player_id)
+            except asyncio.CancelledError:
+                cancelled.set()
+                LOGGER.info("stream_queue interrupted for %s" % player_id)
+                raise asyncio.CancelledError()
+        return resp
+
+    async def __stream_queue(self, player_id, buffer, cancelled):
+        ''' start streaming radio from provider '''
+        # stream audio with sox
+        queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000)
+        sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
+        fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
+        pcm_args = 'raw -b 64 -c 2 -e floating-point -r %s' % sample_rate
+        args = 'sox -t %s - -t flac -C 2 -' % pcm_args
+        sox_proc = await asyncio.create_subprocess_shell(args, 
+                stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+
+        async def fill_buffer():
+            while not sox_proc.stdout.at_eof():
+                chunk = await sox_proc.stdout.read(256000)
+                if not chunk:
+                    break
+                await buffer.put(chunk)
+            await sox_proc.wait()
+            await buffer.put('') # indicate EOF
+            LOGGER.info("streaming of queue for player %s completed" % player_id)
+        asyncio.create_task(fill_buffer())
+
+        last_fadeout_data = None
+        for queue_track in queue_tracks:
+            
+            while buffer.qsize() > 5 and not cancelled.is_set():
+                await asyncio.sleep(1)
+            if cancelled.is_set():
+                break
+            
+            params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
+            track_id = params['track_id'][0]
+            provider = params['provider'][0]
+            LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name))
+            temp_file = await self.__get_pcm_audio(track_id, provider, sample_rate)
+            
+            # get fade in part
+            args = 'sox -t %s %s -t %s - trim 0 %s fade t %s' % (pcm_args, temp_file.name, pcm_args,  fade_length, fade_length)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+            fade_in_part, stderr = await process.communicate()
+            LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+            if last_fadeout_data:
+                # perform crossfade with previous fadeout samples
+                fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+                fadeinfile.write(fade_in_part)
+                fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+                fadeoutfile.write(last_fadeout_data)
+                args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
+                process = await asyncio.create_subprocess_shell(args,
+                        stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+                crossfade_part, stderr = await process.communicate(fade_in_part)
+                LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+                sox_proc.stdin.write(crossfade_part)
+                await sox_proc.stdin.drain()
+                fadeinfile.close()
+                fadeoutfile.close()
+            else:
+                # simply put the fadein part in the final file
+                sox_proc.stdin.write(fade_in_part)
+                await sox_proc.stdin.drain()
+
+            # get middle frames (main track without the fade-in and fade-out)
+            args = 'sox -t %s %s -t %s - trim %s -%s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+            middle_part, stderr = await process.communicate()
+            LOGGER.debug("Got %s bytes in memory for middle_part after sox" % len(middle_part))
+            sox_proc.stdin.write(middle_part)
+            await sox_proc.stdin.drain()
+
+            # get fade out part (all remaining chunks of 1 second)
+            args = 'sox -t %s %s -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+            fade_out_part, stderr = await process.communicate()
+            LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(fade_out_part))
+            last_fadeout_data = fade_out_part
+            # close temp file
+            temp_file.close()
+        # end of queue reached, pass last fadeout bits to final output
+        if last_fadeout_data:
+            sox_proc.stdin.write(last_fadeout_data)
+            await sox_proc.stdin.drain()
+
+    async def __get_pcm_audio(self, track_id, provider, sample_rate=96000):
+        ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
+        temp_audiofile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+        cachefile = self.__get_track_cache_filename(track_id, provider)
+        if self.mass.config['base']['http_streamer']['volume_normalisation']:
+            gain_correct = await self.__get_track_gain_correct(track_id, provider)
+        else:
+            gain_correct = -6 # always need some headroom for upsampling and crossfades
+        if os.path.isfile(cachefile):
+            # we have a cache file for this track which we can use
+            # always convert to 64 bit floating point to do any processing/effects
+            args = 'sox -t flac "%s" -t wav -c 2 -e floating-point -b 64 - | sox -t wav - -t raw %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate)
+            process = await asyncio.create_subprocess_shell(args)
+        else:
+            # stream from provider
+            # always convert to 64 bit floating point to do any processing/effects
+            input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
+            assert(input_content_type)
+            args = 'sox -t %s - -t wav -c 2 -e floating-point -b 64 - | sox -t wav - -t raw %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdin=asyncio.subprocess.PIPE)
+            asyncio.get_event_loop().create_task(
+                     self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+        await process.wait()
+        LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id)
+        return temp_audiofile
+    
     async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None):
         ''' get audio stream from provider and apply additional effects/processing where/if needed'''
         cachefile = self.__get_track_cache_filename(track_id, provider)
         sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False)
+        if self.mass.config['base']['http_streamer']['volume_normalisation']:
+            gain_correct = await self.__get_track_gain_correct(track_id, provider)
+            sox_effects += ' vol %s dB ' % gain_correct
         if os.path.isfile(cachefile):
             # we have a cache file for this track which we can use
-            args = 'sox -t flac %s -t flac -C 0 - %s' % (cachefile, sox_effects)
+            if sox_effects.strip():
+                # always convert to 64 bit floating point to do any processing/effects
+                args = 'sox -t flac "%s" -t wav -b 64 -e floating-point - | sox -t wav - -t flac -C 2 - %s' % (cachefile, sox_effects)
+            else:
+                args = 'sox -t flac "%s" -t flac -C 2 - %s' % cachefile
             LOGGER.info("Running sox with args: %s" % args)
             process = await asyncio.create_subprocess_shell(args, 
                     stdout=asyncio.subprocess.PIPE)
@@ -138,7 +290,11 @@ class HTTPStreamer():
             # stream from provider
             input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
             assert(input_content_type)
-            args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
+            if sox_effects.strip():
+                # always convert to 64 bit floating point to do any processing/effects
+                args = 'sox -t %s - -t wav -b 64 -e floating-point - | sox -t wav - -t flac -C 2 - %s' % (input_content_type, sox_effects)
+            else:
+                args = 'sox -t %s - -t flac -C 0 -' % (input_content_type)
             LOGGER.info("Running sox with args: %s" % args)
             process = await asyncio.create_subprocess_shell(args,
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
@@ -146,7 +302,7 @@ class HTTPStreamer():
                      self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
         # put chunks from stdout into queue
         while not process.stdout.at_eof():
-            chunk = await process.stdout.read(256000)
+            chunk = await process.stdout.read(705600)
             if not chunk:
                 break
             if not cancelled.is_set():
@@ -156,9 +312,9 @@ class HTTPStreamer():
         await process.wait()
         await audioqueue.put('') # indicate EOF
         if cancelled.is_set():
-            LOGGER.info("streaming of track_id %s interrupted" % track_id)
+            LOGGER.info("__get_audio_stream for track_id %s interrupted" % track_id)
         else:
-            LOGGER.info("streaming of track_id %s completed" % track_id)
+            LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
 
     async def __get_player_sox_options(self, track_id, provider, player_id, is_radio):
         ''' get player specific sox options '''
@@ -185,9 +341,6 @@ class HTTPStreamer():
                     sox_effects += 'rate -v 48000'
         if player_id and self.mass.config['player_settings'][player_id]['sox_effects']:
             sox_effects += ' ' + self.mass.config['player_settings'][player_id]['sox_effects']
-        if self.mass.config['base']['http_streamer']['volume_normalisation']:
-            gain_correct = await self.__get_track_gain_correct(track_id, provider)
-            sox_effects += ' vol %s dB ' % gain_correct
         return sox_effects
         
     async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
index 3445fb772410edc84f703a0a354f0a3e39633b82..04b62f22250c3b39f094acd6d9bc2319b0dd3fd0 100755 (executable)
@@ -186,7 +186,10 @@ class Music():
             for prov in self.providers.values():
                 prov_results = await prov.search(searchquery, media_types, limit)
                 for item_type, items in prov_results.items():
-                    result[item_type] += items
+                    if not item_type in result:
+                        result[item_type] = items
+                    else:
+                        result[item_type] += items
             # filter out duplicates
             for item_type, items in result.items():
                 items = list(toolz.unique(items, key=operator.attrgetter('item_id')))
index 1928d1d2452ee225be974ab39a406e51b32bceba..33250db90d7037fd92065313fd86ed3dee118ec2 100755 (executable)
@@ -182,8 +182,6 @@ class Player():
         # handle basic player settings
         player_details.enabled = player.settings['enabled']
         player_details.name = player.settings['name'] if player.settings['name'] else player_details.name
-        if player.settings['group_parent']:
-            player_details.group_parent = player.settings['group_parent']
         # handle hass integration
         await self.__update_player_hass_settings(player_details, player.settings)
         # handle mute as power setting
@@ -279,9 +277,11 @@ class Player():
             ("mute_as_power", False, "player_mute_power"),
             ("disable_volume", False, "player_disable_vol"),
             ("sox_effects", '', "http_streamer_sox_effects"),
-            ("max_sample_rate", '96000', "max_sample_rate"),
+            ("max_sample_rate", 96000, "max_sample_rate"),
             ("force_http_streamer", False, "force_http_streamer")
         ]
+        # append provider specific player settings
+        config_entries += await self.mass.player.providers[player_details.player_provider].player_config_entries()
         if player_details.is_group:
             config_entries += [ # group player settings
                 ("apply_group_volume", False, "player_group_vol"),
index 58d658d1bff6fc595df45ad0bc08b918d31526c5..d383a2e222d7f5a09b43f53829484dc88a2dee34 100644 (file)
@@ -58,6 +58,12 @@ class ChromecastProvider(PlayerProvider):
 
     ### Provider specific implementation #####
 
+    async def player_config_entries(self):
+        ''' get the player config entries for this provider (list with key/value pairs)'''
+        return [
+            ("crossfade_duration", 0, "crossfade_duration"),
+            ]
+
     async def player_command(self, player_id, cmd:str, cmd_args=None):
         ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
         count = 0
@@ -151,7 +157,7 @@ class ChromecastProvider(PlayerProvider):
         ''' load queue on player with given queue items '''
         castplayer = self._chromecasts[player_id]
         player = self._players[player_id]
-        queue_items = await self.__create_queue_items(new_tracks[:50])
+        queue_items = await self.__create_queue_items(new_tracks[:50], player_id=player_id)
         queuedata = { 
                 "type": 'QUEUE_LOAD',
                 "repeatMode":  "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
@@ -169,7 +175,7 @@ class ChromecastProvider(PlayerProvider):
     async def __queue_insert(self, player_id, new_tracks, insert_before=None):
         ''' insert item into the player queue '''
         castplayer = self._chromecasts[player_id]
-        queue_items = await self.__create_queue_items(new_tracks)
+        queue_items = await self.__create_queue_items(new_tracks, player_id=player_id)
         for chunk in chunks(queue_items, 50):
             queuedata = { 
                         "type": 'QUEUE_INSERT',
@@ -203,16 +209,21 @@ class ChromecastProvider(PlayerProvider):
         tracks = self._player_queue[player_id]
         await self.__queue_load(player_id, tracks, queue_index)
 
-    async def __create_queue_items(self, tracks):
+    async def __create_queue_items(self, tracks, player_id):
         ''' create list of CC queue items from tracks '''
         queue_items = []
         for track in tracks:
-            queue_item = await self.__create_queue_item(track)
+            queue_item = await self.__create_queue_item(track, player_id)
             queue_items.append(queue_item)
         return queue_items
 
-    async def __create_queue_item(self, track):
+    async def __create_queue_item(self, track, player_id):
         '''create queue item from track info '''
+        enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
+        if enable_crossfade:
+            uri = 'http://%s:%s/stream_queue?player_id=%s'% (self.mass.player.local_ip, self.mass.config['base']['web']['http_port'], player_id)
+        else:
+            uri = track.uri
         return {
             'autoplay' : True,
             'preloadTime' : 10,
@@ -220,7 +231,7 @@ class ChromecastProvider(PlayerProvider):
             'startTime' : 0,
             'activeTrackIds' : [],
             'media': {
-                'contentId':  track.uri,
+                'contentId':  uri,
                 'customData': {
                     'provider': track.provider, 
                     'uri': track.uri, 
@@ -296,11 +307,14 @@ class ChromecastProvider(PlayerProvider):
         elif uri.startswith('qobuz://') and 'qobuz' in self.mass.music.providers:
             track_id = uri.replace('qobuz://','').replace('.flac','')
             track = await self.mass.music.providers['qobuz'].track(track_id)
-        elif uri.startswith('http') and '/stream' in uri:
+        elif uri.startswith('http') and '/stream_track' in uri:
             params = urllib.parse.parse_qs(uri.split('?')[1])
             track_id = params['track_id'][0]
             provider = params['provider'][0]
             track = await self.mass.music.providers[provider].track(track_id)
+        elif uri.startswith('http') and '/stream_queue' in uri:
+            track = Track()
+            track.name = "Crossfade Queue streaming"
         return track
 
     async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
index 9aa94263be505111f7924c601720f4f8af8a876f..a50595ad31ca0d2336e8d6f8fd513d21b99761de 100644 (file)
@@ -59,6 +59,10 @@ class LMSProvider(PlayerProvider):
 
     ### Provider specific implementation #####
 
+    async def player_config_entries(self):
+        ''' get the player config entries for this provider (list with key/value pairs)'''
+        return []
+
     async def player_command(self, player_id, cmd:str, cmd_args=None):
         ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
         lms_commands = []
index 63fcd9493c10739700bea1e390b7b93f46afecd7..bbaf659541839f21614c8ef25977066f07066693 100644 (file)
@@ -54,6 +54,7 @@ class PyLMSServer(PlayerProvider):
 
      ### Provider specific implementation #####
 
+    
     async def start_discovery(self):
         transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
             lambda: DiscoveryProtocol(self.mass.web._http_port),
@@ -64,6 +65,12 @@ class PyLMSServer(PlayerProvider):
         finally:
             transport.close()
 
+    async def player_config_entries(self):
+        ''' get the player config entries for this provider (list with key/value pairs)'''
+        return [
+            ("crossfade_duration", 0, "crossfade_duration"),
+            ]
+
     async def player_command(self, player_id, cmd:str, cmd_args=None):
         ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
         if cmd == 'play':
@@ -211,6 +218,7 @@ class PyLMSServer(PlayerProvider):
             ''' handle events from player'''
             if event == "connected":
                 self._lmsplayers[lms_player.player_id] = lms_player
+                lms_player.player_settings = self.mass.config['player_settings'][lms_player.player_id]
             asyncio.create_task(self.__handle_player_event(lms_player.player_id, event, event_data))
 
         try:
@@ -249,6 +257,7 @@ class PyLMSPlayer(object):
         self.send_event = None
         self.stream_host = stream_host
         self.stream_port = stream_port
+        self.player_settings = {}
         self.playback_millis = 0
         self._volume = PyLMSVolume()
         self._device_type = None
@@ -356,12 +365,12 @@ class PyLMSPlayer(object):
         self._volume.volume = new_vol
         self.send_volume()
     
-    def play(self, uri, crossfade=True):
-        # TODO: attach crossfade to a config setting
+    def play(self, uri):
+        enable_crossfade = self.player_settings["crossfade_duration"] > 0
         command = b's'
         autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers
-        transType= b'1' if crossfade else b'0'
-        transDuration = 10 if crossfade else 0
+        transType= b'1' if enable_crossfade else b'0'
+        transDuration = self.player_settings["crossfade_duration"]
         formatbyte = b'f' # fixed to flac
         uri = '/stream' + uri.split('/stream')[1]
         data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration)
index a1e5d37ff401cb39a870c4fae84fd48a99cf42ab..8db5d85ace96db2210fde4c98743def3e0e99b3b 100755 (executable)
@@ -71,6 +71,7 @@ class Web():
         app.add_routes([web.get('/ws', self.websocket_handler)])
         app.add_routes([web.get('/stream_track', self.mass.http_streamer.stream_track)])
         app.add_routes([web.get('/stream_radio', self.mass.http_streamer.stream_radio)])
+        app.add_routes([web.get('/stream_queue', self.mass.http_streamer.stream_queue)])
         app.add_routes([web.get('/api/search', self.search)])
         app.add_routes([web.get('/api/config', self.get_config)])
         app.add_routes([web.post('/api/config', self.save_config)])
index 570b40bb5a421f6a2a13c63ef2473c2de9c70b97..3fd24a1e5cb65037a3ce25a64903059bbd437fa9 100755 (executable)
@@ -54,6 +54,8 @@ var Config = Vue.component('Config', {
                                           item-text="name"
                                           item-value="id" box>
                                         </v-select>
+                                        <v-select v-else-if="conf_item_key[0] == 'max_sample_rate'" v-model="conf.player_settings[key][conf_item_key[0]]" :label="$t('conf.'+conf_item_key[2])" :items="sample_rates" box></v-select>
+                                        <v-slider v-else-if="conf_item_key[0] == 'crossfade_duration'" v-model="conf.player_settings[key][conf_item_key[0]]" :label="$t('conf.'+conf_item_key[2])" min=0 max=10 box thumb-label></v-slider>
                                         <v-text-field v-else v-model="conf.player_settings[key][conf_item_key[0]]" :label="$t('conf.'+conf_item_key[2])" box></v-text-field>
                                   </v-list-tile>
                                   <v-list-tile v-if="!conf.player_settings[key].enabled">
@@ -80,7 +82,8 @@ var Config = Vue.component('Config', {
     return {
       conf: {},
       players: {},
-      active: 0
+      active: 0,
+      sample_rates: [44100, 48000, 88200, 96000, 192000, 384000]
     }
   },
   computed: {