improve crossfade streaming a lot
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 8 Jun 2019 13:06:07 +0000 (15:06 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 8 Jun 2019 13:06:07 +0000 (15:06 +0200)
music_assistant/modules/http_streamer.py
music_assistant/modules/playerproviders/chromecast.py

index 0b2cc95c2dd486a9b276e7dd5cb6c53985f4c148..5009ed2a60efd21450644666513d3b1afe501d13 100755 (executable)
@@ -161,140 +161,12 @@ class HTTPStreamer():
                 raise asyncio.CancelledError()
         return resp
 
-    async def __stream_queue_org(self, player_id, startindex, buffer, cancelled):
-        ''' start streaming all queue tracks '''
-        # TODO: get correct queue index and implement reporting of position
-        sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
-        fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
-        pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
-        args = 'sox -t %s - -t flac -C 2 -' % pcm_args
-        sox_proc = await asyncio.create_subprocess_shell(args, 
-                stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
-
-        async def fill_buffer():
-            while not sox_proc.stdout.at_eof():
-                chunk = await sox_proc.stdout.read(256000)
-                if not chunk:
-                    break
-                await buffer.put(chunk)
-            await buffer.put(b'') # indicate EOF
-        asyncio.create_task(fill_buffer())
-
-        queue_index = startindex
-        last_fadeout_data = None
-        while True:
-            # get the (next) track in queue
-            try:
-                queue_tracks = await self.mass.player.player_queue(player_id, queue_index, queue_index+1)
-                queue_track = queue_tracks[0]
-            except IndexError:
-                LOGGER.info("queue index out of range or end reached")
-                break
-
-            params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
-            track_id = params['track_id'][0]
-            provider = params['provider'][0]
-            LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
-            audiodata = await self.__get_raw_audio(track_id, provider, sample_rate)
-            fade_bytes = int(sample_rate * 4 * 2 * fade_length)
-            LOGGER.debug("total bytes in audio_data: %s - fade_bytes: %s" % (len(audiodata),fade_bytes))
-
-            # report start stream of current queue index
-            self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
-            queue_index += 1
-
-            if last_fadeout_data:
-                # get fade in part
-                args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
-                process = await asyncio.create_subprocess_shell(args,
-                        stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                fade_in_part, stderr = await process.communicate(audiodata[:fade_bytes])
-                LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
-                # perform crossfade with previous fadeout samples
-                fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
-                fadeinfile.write(fade_in_part)
-                fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
-                fadeoutfile.write(last_fadeout_data)
-                args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
-                process = await asyncio.create_subprocess_shell(args,
-                        stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                crossfade_part, stderr = await process.communicate(fade_in_part)
-                LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
-                sox_proc.stdin.write(crossfade_part)
-                await sox_proc.stdin.drain()
-                fadeinfile.close()
-                fadeoutfile.close()
-                del crossfade_part
-                del fade_in_part
-                last_fadeout_data = None
-            else:
-                # simply put the fadein part in the final file
-                sox_proc.stdin.write(audiodata[:fade_bytes])
-                await sox_proc.stdin.drain()
-
-            # feed the middle part into the main sox
-            sox_proc.stdin.write(audiodata[fade_bytes:-fade_bytes])
-            await sox_proc.stdin.drain()
-
-            # get fade out part
-            args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
-            process = await asyncio.create_subprocess_shell(args,
-                    stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-            last_fadeout_data, stderr = await process.communicate(audiodata[-fade_bytes:])
-            LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
-            # cleanup audio data
-            del audiodata
-
-            LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name))
-
-            # wait for the queue to consume the data
-            while buffer.qsize() > 1 and not cancelled.is_set():
-                await asyncio.sleep(1)
-            if cancelled.is_set():
-                break
-            LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name))
-        
-        # end of queue reached, pass last fadeout bits to final output
-        if last_fadeout_data:
-            sox_proc.stdin.write(last_fadeout_data)
-            await sox_proc.stdin.drain()
-        sox_proc.stdin.close()
-        await sox_proc.wait()
-        LOGGER.info("streaming of queue for player %s completed" % player_id)
-
-    async def __get_raw_audio_org(self, track_id, provider, sample_rate=96000):
-        ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
-        audiodata = b''
-        cachefile = self.__get_track_cache_filename(track_id, provider)
-        pcm_args = 'raw -b 32 -c 2 -e signed-integer'
-        if self.mass.config['base']['http_streamer']['volume_normalisation']:
-            gain_correct = await self.__get_track_gain_correct(track_id, provider)
-        else:
-            gain_correct = -6 # always need some headroom for upsampling and crossfades
-        if os.path.isfile(cachefile):
-            # we have a cache file for this track which we can use
-            args = 'sox -t flac "%s" -t %s - vol %s dB rate -v %s' % (cachefile, pcm_args, gain_correct, sample_rate)
-            process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
-        else:
-            # stream from provider
-            input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
-            assert(input_content_type)
-            args = 'sox -t %s - -t %s - vol %s dB rate -v %s' % (input_content_type, pcm_args, gain_correct, sample_rate)
-            process = await asyncio.create_subprocess_shell(args,
-                    stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
-            asyncio.get_event_loop().create_task(
-                     self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
-        audiodata, stderr = await process.communicate()
-        LOGGER.debug("__get_raw_audio for track_id %s completed" % (track_id))
-        return audiodata
-    
     async def __stream_queue(self, player_id, startindex, buffer, cancelled):
         ''' start streaming all queue tracks '''
-        # TODO: get correct queue index and implement reporting of position
         sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
         fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
         pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
-        args = 'sox -t %s - -t flac -C 2 -' % pcm_args
+        args = 'sox -t %s - -t flac -C 0 -' % pcm_args
         sox_proc = await asyncio.create_subprocess_shell(args, 
                 stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
 
@@ -323,49 +195,31 @@ class HTTPStreamer():
             provider = params['provider'][0]
             LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
             fade_bytes = int(sample_rate * 4 * 2 * fade_length)
-            cachefile = self.__get_track_cache_filename(track_id, provider)
-            if os.path.isfile(cachefile):
-                # get track length from cachefile
-                args = 'soxi -d "%s"' % cachefile
-                process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
-                stdout, stderr = await process.communicate()
-                timestr = stdout.split()[0].decode()
-                hours = int(timestr.split(":")[0])
-                minutes = int(timestr.split(":")[1])
-                seconds = int(float(timestr.split(":")[2]))
-                total_chunks = hours*60*60 + minutes*60 + seconds
-            else:
-                total_chunks = int(queue_track.duration)
-            
-            # report start stream of current queue index
-            self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
-            queue_index += 1
             fade_in_part = b''
             cur_chunk = 0
+            prev_chunk = None
             
-            async for chunk in self.__get_raw_audio(track_id, provider, sample_rate):
+            async for is_last_chunk, chunk in self.__get_raw_audio(track_id, provider, sample_rate, fade_bytes):
                 cur_chunk += 1
-
-                if cur_chunk <= fade_length and not last_fadeout_data:
+                if cur_chunk == 1 and not last_fadeout_data:
                     # fade-in part but this is the first track so just pass it to the final file
                     sox_proc.stdin.write(chunk)
                     await sox_proc.stdin.drain()
-                elif (cur_chunk < fade_length) and last_fadeout_data:
-                    # need to have fade_length of chunks for the fade-in data
-                    fade_in_part += chunk
-                elif fade_in_part and last_fadeout_data:
-                    fade_in_part += chunk
-                    # perform crossfade with previous fadeout samples
+                elif cur_chunk == 1 and last_fadeout_data:
+                    # create fade-out part
                     args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
                     process = await asyncio.create_subprocess_shell(args,
                             stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
                     last_fadeout_data, stderr = await process.communicate(last_fadeout_data)
-                    LOGGER.info("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
+                    LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
+                    # create fade-in part
                     args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
                     process = await asyncio.create_subprocess_shell(args,
                             stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                    fade_in_part, stderr = await process.communicate(fade_in_part)
-                    LOGGER.info("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+                    fade_in_part, stderr = await process.communicate(chunk)
+                    LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+                    # create crossfade using sox and some temp files
+                    # TODO: figure out how to make this less complex and without the tempfiles
                     fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
                     fadeinfile.write(fade_in_part)
                     fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
@@ -374,7 +228,8 @@ class HTTPStreamer():
                     process = await asyncio.create_subprocess_shell(args,
                             stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
                     crossfade_part, stderr = await process.communicate(fade_in_part)
-                    LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+                    LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+                    # write the crossfade part to the sox player
                     sox_proc.stdin.write(crossfade_part)
                     await sox_proc.stdin.drain()
                     fadeinfile.close()
@@ -382,22 +237,36 @@ class HTTPStreamer():
                     del crossfade_part
                     fade_in_part = None
                     last_fadeout_data = b''
-                elif (cur_chunk > fade_length) and (cur_chunk < (total_chunks-fade_length)):
-                    # middle part of the track
-                    sox_proc.stdin.write(chunk)
+                elif prev_chunk and is_last_chunk:
+                    # last chunk received so create the fadeout with the previous chunk and this chunk
+                    last_part = prev_chunk + chunk
+                    last_fadeout_data = last_part[-fade_bytes:]
+                    bytes_remaining = last_part[:-fade_bytes]
+                    sox_proc.stdin.write(bytes_remaining)
                     await sox_proc.stdin.drain()
                 else:
-                    # fade out part
-                    last_fadeout_data += chunk
-
-            LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name))
-
-            #wait for the queue to consume the data
-            while buffer.qsize() > 1 and not cancelled.is_set():
-                await asyncio.sleep(1)
-            if cancelled.is_set():
-                break
+                    # middle part of the track
+                    # keep previous chunk in memory so we have enough samples to perform the crossfade
+                    if prev_chunk:
+                        sox_proc.stdin.write(prev_chunk)
+                        await sox_proc.stdin.drain()
+                        prev_chunk = chunk
+                    else:
+                        prev_chunk = chunk
+                # wait for the queue to consume the data
+                # this prevents that the entire track is sitting in memory
+                # and it helps a bit in the quest to follow where we are in the queue
+                while buffer.qsize() > 1 and not cancelled.is_set():
+                    await asyncio.sleep(1)
+                # break out the loop if the http session is cancelled
+                if cancelled.is_set():
+                    break
+                if cur_chunk == 1:
+                    # report start stream of current queue index
+                    self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
+            # end of the track reached
             LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name))
+            queue_index += 1
         
         # end of queue reached, pass last fadeout bits to final output
         if last_fadeout_data:
@@ -407,7 +276,7 @@ class HTTPStreamer():
         await sox_proc.wait()
         LOGGER.info("streaming of queue for player %s completed" % player_id)
 
-    async def __get_raw_audio(self, track_id, provider, sample_rate=96000):
+    async def __get_raw_audio(self, track_id, provider, sample_rate, chunksize):
         ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
         cachefile = self.__get_track_cache_filename(track_id, provider)
         pcm_args = 'raw -b 32 -c 2 -e signed-integer'
@@ -429,7 +298,8 @@ class HTTPStreamer():
             asyncio.get_event_loop().create_task(
                      self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
         # put chunks from stdout into queue
-        chunksize = int(sample_rate * (32/8) * 2) # 1 second
+        # we keep 1 chunk behind to detect end of stream properly
+        prev_chunk = None
         while not process.stdout.at_eof():
             try:
                 chunk = await process.stdout.readexactly(chunksize)
@@ -437,7 +307,11 @@ class HTTPStreamer():
                 chunk = await process.stdout.read(chunksize)
             if not chunk:
                 break
-            yield chunk
+            if prev_chunk:
+                yield (False, prev_chunk)
+            prev_chunk = chunk
+        # yield last chunk
+        yield (True, prev_chunk)
         await process.wait()
         LOGGER.info("__get_raw_audio for track_id %s completed" % (track_id))
     
index da6e79861f44851838f8fcc6069fc5a88e1f16f3..08af0b7d38dac62876b273f18d4a0b189661db1d 100644 (file)
@@ -318,7 +318,7 @@ class ChromecastProvider(PlayerProvider):
                 cur_queue_index = player.cur_queue_index
                 player.cur_item = self._player_queue[player_id][cur_queue_index]
                 cur_time = mediastatus.adjusted_current_time
-                while cur_time > player.cur_item.duration:
+                while cur_time > player.cur_item.duration-10:
                     cur_queue_index -=1
                     prev_track = self._player_queue[player_id][cur_queue_index]
                     cur_time -= prev_track.duration