Update http_streamer.py
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 6 Jun 2019 15:23:43 +0000 (17:23 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 6 Jun 2019 15:23:43 +0000 (17:23 +0200)
music_assistant/modules/http_streamer.py

index 6a2411117ddb5bec7a81165a67b72e1208168500..67515e2ddef890646f8cd0aeb2240f140618e9c7 100755 (executable)
@@ -156,13 +156,13 @@ class HTTPStreamer():
                 raise asyncio.CancelledError()
         return resp
 
-    async def __stream_queue(self, player_id, buffer, cancelled):
+    async def __stream_queue__old_usingfile(self, player_id, buffer, cancelled):
         ''' start streaming radio from provider '''
         # stream audio with sox
         queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000)
         sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
         fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
-        pcm_args = 'raw -b 64 -c 2 -e floating-point -r %s' % sample_rate
+        pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
         args = 'sox -t %s - -t flac -C 2 -' % pcm_args
         sox_proc = await asyncio.create_subprocess_shell(args, 
                 stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
@@ -190,10 +190,10 @@ class HTTPStreamer():
             track_id = params['track_id'][0]
             provider = params['provider'][0]
             LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name))
-            temp_file = await self.__get_pcm_audio(track_id, provider, sample_rate)
+            temp_file = await self.__get_raw_audio(track_id, provider, sample_rate)
             
             # get fade in part
-            args = 'sox -t %s %s -t %s - trim 0 %s fade t %s' % (pcm_args, temp_file.name, pcm_args,  fade_length, fade_length)
+            args = 'sox -t sox %s -t %s - trim 0 %s fade t %s' % (temp_file.name, pcm_args,  fade_length, fade_length)
             process = await asyncio.create_subprocess_shell(args,
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
             fade_in_part, stderr = await process.communicate()
@@ -219,7 +219,7 @@ class HTTPStreamer():
                 await sox_proc.stdin.drain()
 
             # get middle frames (main track without the fade-in and fade-out)
-            args = 'sox -t %s %s -t %s - trim %s -%s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+            args = 'sox -t sox %s -t %s - trim %s -%s' % (temp_file.name, pcm_args, fade_length, fade_length)
             process = await asyncio.create_subprocess_shell(args,
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
             middle_part, stderr = await process.communicate()
@@ -228,7 +228,7 @@ class HTTPStreamer():
             await sox_proc.stdin.drain()
 
             # get fade out part (all remaining chunks of 1 second)
-            args = 'sox -t %s %s -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length)
+            args = 'sox -t sox %s -t %s - reverse trim 0 %s fade t %s reverse ' % (temp_file.name, pcm_args, fade_length, fade_length)
             process = await asyncio.create_subprocess_shell(args,
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
             fade_out_part, stderr = await process.communicate()
@@ -241,7 +241,7 @@ class HTTPStreamer():
             sox_proc.stdin.write(last_fadeout_data)
             await sox_proc.stdin.drain()
 
-    async def __get_pcm_audio(self, track_id, provider, sample_rate=96000):
+    async def __get_raw_audio__old_usingfile(self, track_id, provider, sample_rate=96000):
         ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
         temp_audiofile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
         cachefile = self.__get_track_cache_filename(track_id, provider)
@@ -252,14 +252,14 @@ class HTTPStreamer():
         if os.path.isfile(cachefile):
             # we have a cache file for this track which we can use
             # always convert to 64 bit floating point to do any processing/effects
-            args = 'sox -V3 -t flac "%s" -t wav -c 2 -e floating-point -b 64 - | sox -V3 -t wav - -t raw %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate)
+            args = 'sox -V3 -t flac "%s" -t sox %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate)
             process = await asyncio.create_subprocess_shell(args)
         else:
             # stream from provider
             # always convert to 64 bit floating point to do any processing/effects
             input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
             assert(input_content_type)
-            args = 'sox -V3 -t %s - -t wav -c 2 -e floating-point -b 64 - | sox -V3 -t wav - -t raw %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate)
+            args = 'sox -V3 -t %s - -t sox %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate)
             process = await asyncio.create_subprocess_shell(args,
                     stdin=asyncio.subprocess.PIPE)
             asyncio.get_event_loop().create_task(
@@ -268,6 +268,123 @@ class HTTPStreamer():
         LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id)
         return temp_audiofile
     
+    async def __stream_queue(self, player_id, buffer, cancelled):
+        ''' start streaming all queue tracks '''
+        # TODO: get correct queue index and implement reporting of position
+        queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000)
+        sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
+        fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
+        pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
+        args = 'sox -t %s - -t flac -C 2 -' % pcm_args
+        sox_proc = await asyncio.create_subprocess_shell(args, 
+                stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+
+        async def fill_buffer():
+            while not sox_proc.stdout.at_eof():
+                chunk = await sox_proc.stdout.read(256000)
+                if not chunk:
+                    break
+                await buffer.put(chunk)
+            await buffer.put('') # indicate EOF
+        asyncio.create_task(fill_buffer())
+
+        last_fadeout_data = None
+        for queue_track in queue_tracks:
+            
+            while buffer.qsize() > 5 and not cancelled.is_set():
+                await asyncio.sleep(1)
+            if cancelled.is_set():
+                break
+            
+            params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
+            track_id = params['track_id'][0]
+            provider = params['provider'][0]
+            LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name))
+            audiodata = await self.__get_raw_audio(track_id, provider, sample_rate)
+            
+            # get fade in part
+            args = 'sox -t sox - -t %s - trim 0 %s fade t %s' % (pcm_args,  fade_length, fade_length)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+            fade_in_part, stderr = await process.communicate(audiodata[0:15360000])
+            LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+            if last_fadeout_data:
+                # perform crossfade with previous fadeout samples
+                fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+                fadeinfile.write(fade_in_part)
+                fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+                fadeoutfile.write(last_fadeout_data)
+                args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
+                process = await asyncio.create_subprocess_shell(args,
+                        stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+                crossfade_part, stderr = await process.communicate(fade_in_part)
+                LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+                sox_proc.stdin.write(crossfade_part)
+                await sox_proc.stdin.drain()
+                fadeinfile.close()
+                fadeoutfile.close()
+                del crossfade_part
+                del fade_in_part
+                last_fadeout_data = None
+            else:
+                # simply put the fadein part in the final file
+                sox_proc.stdin.write(fade_in_part)
+                await sox_proc.stdin.drain()
+                del fade_in_part
+
+            # get middle frames (main track without the fade-in and fade-out)
+            args = 'sox -t sox - -t %s - trim %s -%s' % (pcm_args, fade_length, fade_length)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+            middle_part, stderr = await process.communicate(audiodata)
+            LOGGER.debug("Got %s bytes in memory for middle_part after sox" % len(middle_part))
+            sox_proc.stdin.write(middle_part)
+            await sox_proc.stdin.drain()
+            del middle_part
+
+            # get fade out part
+            args = 'sox -t sox - -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, fade_length, fade_length)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+            last_fadeout_data, stderr = await process.communicate(audiodata)
+            LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
+            # cleanup audio data
+            del audiodata
+        
+        # end of queue reached, pass last fadeout bits to final output
+        if last_fadeout_data:
+            sox_proc.stdin.write(last_fadeout_data)
+            await sox_proc.stdin.drain()
+        sox_proc.stdin.close()
+        await sox_proc.wait()
+        LOGGER.info("streaming of queue for player %s completed" % player_id)
+
+    async def __get_raw_audio(self, track_id, provider, sample_rate=96000):
+        ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
+        audiodata = b''
+        cachefile = self.__get_track_cache_filename(track_id, provider)
+        if self.mass.config['base']['http_streamer']['volume_normalisation']:
+            gain_correct = await self.__get_track_gain_correct(track_id, provider)
+        else:
+            gain_correct = -6 # always need some headroom for upsampling and crossfades
+        if os.path.isfile(cachefile):
+            # we have a cache file for this track which we can use
+            args = 'sox -t flac "%s" -t sox - vol %s dB rate -v %s' % (cachefile, gain_correct, sample_rate)
+            process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
+        else:
+            # stream from provider
+            input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
+            assert(input_content_type)
+            args = 'sox -t %s - -t sox - vol %s dB rate -v %s' % (input_content_type, gain_correct, sample_rate)
+            process = await asyncio.create_subprocess_shell(args,
+                    stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+            asyncio.get_event_loop().create_task(
+                     self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+        #await process.wait()
+        audiodata, stderr = await process.communicate()
+        LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id)
+        return audiodata
+    
     async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None):
         ''' get audio stream from provider and apply additional effects/processing where/if needed'''
         cachefile = self.__get_track_cache_filename(track_id, provider)
@@ -278,7 +395,7 @@ class HTTPStreamer():
         if os.path.isfile(cachefile):
             # we have a cache file for this track which we can use
             if sox_effects.strip():
-                args = 'sox -V3 -t flac "%s" -t flac -C 0 - %s' % (cachefile, sox_effects)
+                args = 'sox -t flac "%s" -t flac -C 0 - %s' % (cachefile, sox_effects)
             else:
                 args = 'sox -t flac "%s" -t flac -C 0 - %s' % cachefile
             LOGGER.debug("Running sox with args: %s" % args)
@@ -290,7 +407,7 @@ class HTTPStreamer():
             input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
             assert(input_content_type)
             if sox_effects.strip():
-                args = 'sox -V3 -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
+                args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
             else:
                 args = 'sox -t %s - -t flac -C 0 -' % (input_content_type)
             LOGGER.debug("Running sox with args: %s" % args)