From: Marcel van der Veldt Date: Sat, 8 Jun 2019 13:06:07 +0000 (+0200) Subject: improve crossfade streaming a lot X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=3fa562fb8310b7fd8dbd6129f34ff30703aad26d;p=music-assistant-server.git improve crossfade streaming a lot --- diff --git a/music_assistant/modules/http_streamer.py b/music_assistant/modules/http_streamer.py index 0b2cc95c..5009ed2a 100755 --- a/music_assistant/modules/http_streamer.py +++ b/music_assistant/modules/http_streamer.py @@ -161,140 +161,12 @@ class HTTPStreamer(): raise asyncio.CancelledError() return resp - async def __stream_queue_org(self, player_id, startindex, buffer, cancelled): - ''' start streaming all queue tracks ''' - # TODO: get correct queue index and implement reporting of position - sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate'] - fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"] - pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate - args = 'sox -t %s - -t flac -C 2 -' % pcm_args - sox_proc = await asyncio.create_subprocess_shell(args, - stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) - - async def fill_buffer(): - while not sox_proc.stdout.at_eof(): - chunk = await sox_proc.stdout.read(256000) - if not chunk: - break - await buffer.put(chunk) - await buffer.put(b'') # indicate EOF - asyncio.create_task(fill_buffer()) - - queue_index = startindex - last_fadeout_data = None - while True: - # get the (next) track in queue - try: - queue_tracks = await self.mass.player.player_queue(player_id, queue_index, queue_index+1) - queue_track = queue_tracks[0] - except IndexError: - LOGGER.info("queue index out of range or end reached") - break - - params = urllib.parse.parse_qs(queue_track.uri.split('?')[1]) - track_id = params['track_id'][0] - provider = params['provider'][0] - LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name)) - audiodata = await self.__get_raw_audio(track_id, provider, sample_rate) - fade_bytes = int(sample_rate * 4 * 2 * fade_length) - LOGGER.debug("total bytes in audio_data: %s - fade_bytes: %s" % (len(audiodata),fade_bytes)) - - # report start stream of current queue index - self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index)) - queue_index += 1 - - if last_fadeout_data: - # get fade in part - args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - fade_in_part, stderr = await process.communicate(audiodata[:fade_bytes]) - LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part)) - # perform crossfade with previous fadeout samples - fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) - fadeinfile.write(fade_in_part) - fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) - fadeoutfile.write(last_fadeout_data) - args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - crossfade_part, stderr = await process.communicate(fade_in_part) - LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) - sox_proc.stdin.write(crossfade_part) - await sox_proc.stdin.drain() - fadeinfile.close() - fadeoutfile.close() - del crossfade_part - del fade_in_part - last_fadeout_data = None - else: - # simply put the fadein part in the final file - sox_proc.stdin.write(audiodata[:fade_bytes]) - await sox_proc.stdin.drain() - - # feed the middle part into the main sox - sox_proc.stdin.write(audiodata[fade_bytes:-fade_bytes]) - await sox_proc.stdin.drain() - - # get fade out part - args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - last_fadeout_data, stderr = await process.communicate(audiodata[-fade_bytes:]) - LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data)) - # cleanup audio data - del audiodata - - LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name)) - - # wait for the queue to consume the data - while buffer.qsize() > 1 and not cancelled.is_set(): - await asyncio.sleep(1) - if cancelled.is_set(): - break - LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name)) - - # end of queue reached, pass last fadeout bits to final output - if last_fadeout_data: - sox_proc.stdin.write(last_fadeout_data) - await sox_proc.stdin.drain() - sox_proc.stdin.close() - await sox_proc.wait() - LOGGER.info("streaming of queue for player %s completed" % player_id) - - async def __get_raw_audio_org(self, track_id, provider, sample_rate=96000): - ''' get raw pcm data for a track upsampled to given sample_rate packed as wav ''' - audiodata = b'' - cachefile = self.__get_track_cache_filename(track_id, provider) - pcm_args = 'raw -b 32 -c 2 -e signed-integer' - if self.mass.config['base']['http_streamer']['volume_normalisation']: - gain_correct = await self.__get_track_gain_correct(track_id, provider) - else: - gain_correct = -6 # always need some headroom for upsampling and crossfades - if os.path.isfile(cachefile): - # we have a cache file for this track which we can use - args = 'sox -t flac "%s" -t %s - vol %s dB rate -v %s' % (cachefile, pcm_args, gain_correct, sample_rate) - process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) - else: - # stream from provider - input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) - assert(input_content_type) - args = 'sox -t %s - -t %s - vol %s dB rate -v %s' % (input_content_type, pcm_args, gain_correct, sample_rate) - process = await asyncio.create_subprocess_shell(args, - stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) - asyncio.get_event_loop().create_task( - self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) - audiodata, stderr = await process.communicate() - LOGGER.debug("__get_raw_audio for track_id %s completed" % (track_id)) - return audiodata - async def __stream_queue(self, player_id, startindex, buffer, cancelled): ''' start streaming all queue tracks ''' - # TODO: get correct queue index and implement reporting of position sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate'] fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"] pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate - args = 'sox -t %s - -t flac -C 2 -' % pcm_args + args = 'sox -t %s - -t flac -C 0 -' % pcm_args sox_proc = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) @@ -323,49 +195,31 @@ class HTTPStreamer(): provider = params['provider'][0] LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name)) fade_bytes = int(sample_rate * 4 * 2 * fade_length) - cachefile = self.__get_track_cache_filename(track_id, provider) - if os.path.isfile(cachefile): - # get track length from cachefile - args = 'soxi -d "%s"' % cachefile - process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) - stdout, stderr = await process.communicate() - timestr = stdout.split()[0].decode() - hours = int(timestr.split(":")[0]) - minutes = int(timestr.split(":")[1]) - seconds = int(float(timestr.split(":")[2])) - total_chunks = hours*60*60 + minutes*60 + seconds - else: - total_chunks = int(queue_track.duration) - - # report start stream of current queue index - self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index)) - queue_index += 1 fade_in_part = b'' cur_chunk = 0 + prev_chunk = None - async for chunk in self.__get_raw_audio(track_id, provider, sample_rate): + async for is_last_chunk, chunk in self.__get_raw_audio(track_id, provider, sample_rate, fade_bytes): cur_chunk += 1 - - if cur_chunk <= fade_length and not last_fadeout_data: + if cur_chunk == 1 and not last_fadeout_data: # fade-in part but this is the first track so just pass it to the final file sox_proc.stdin.write(chunk) await sox_proc.stdin.drain() - elif (cur_chunk < fade_length) and last_fadeout_data: - # need to have fade_length of chunks for the fade-in data - fade_in_part += chunk - elif fade_in_part and last_fadeout_data: - fade_in_part += chunk - # perform crossfade with previous fadeout samples + elif cur_chunk == 1 and last_fadeout_data: + # create fade-out part args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) last_fadeout_data, stderr = await process.communicate(last_fadeout_data) - LOGGER.info("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data)) + LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data)) + # create fade-in part args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - fade_in_part, stderr = await process.communicate(fade_in_part) - LOGGER.info("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part)) + fade_in_part, stderr = await process.communicate(chunk) + LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part)) + # create crossfade using sox and some temp files + # TODO: figure out how to make this less complex and without the tempfiles fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) fadeinfile.write(fade_in_part) fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) @@ -374,7 +228,8 @@ class HTTPStreamer(): process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) crossfade_part, stderr = await process.communicate(fade_in_part) - LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) + LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) + # write the crossfade part to the sox player sox_proc.stdin.write(crossfade_part) await sox_proc.stdin.drain() fadeinfile.close() @@ -382,22 +237,36 @@ class HTTPStreamer(): del crossfade_part fade_in_part = None last_fadeout_data = b'' - elif (cur_chunk > fade_length) and (cur_chunk < (total_chunks-fade_length)): - # middle part of the track - sox_proc.stdin.write(chunk) + elif prev_chunk and is_last_chunk: + # last chunk received so create the fadeout with the previous chunk and this chunk + last_part = prev_chunk + chunk + last_fadeout_data = last_part[-fade_bytes:] + bytes_remaining = last_part[:-fade_bytes] + sox_proc.stdin.write(bytes_remaining) await sox_proc.stdin.drain() else: - # fade out part - last_fadeout_data += chunk - - LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name)) - - #wait for the queue to consume the data - while buffer.qsize() > 1 and not cancelled.is_set(): - await asyncio.sleep(1) - if cancelled.is_set(): - break + # middle part of the track + # keep previous chunk in memory so we have enough samples to perform the crossfade + if prev_chunk: + sox_proc.stdin.write(prev_chunk) + await sox_proc.stdin.drain() + prev_chunk = chunk + else: + prev_chunk = chunk + # wait for the queue to consume the data + # this prevents that the entire track is sitting in memory + # and it helps a bit in the quest to follow where we are in the queue + while buffer.qsize() > 1 and not cancelled.is_set(): + await asyncio.sleep(1) + # break out the loop if the http session is cancelled + if cancelled.is_set(): + break + if cur_chunk == 1: + # report start stream of current queue index + self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index)) + # end of the track reached LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name)) + queue_index += 1 # end of queue reached, pass last fadeout bits to final output if last_fadeout_data: @@ -407,7 +276,7 @@ class HTTPStreamer(): await sox_proc.wait() LOGGER.info("streaming of queue for player %s completed" % player_id) - async def __get_raw_audio(self, track_id, provider, sample_rate=96000): + async def __get_raw_audio(self, track_id, provider, sample_rate, chunksize): ''' get raw pcm data for a track upsampled to given sample_rate packed as wav ''' cachefile = self.__get_track_cache_filename(track_id, provider) pcm_args = 'raw -b 32 -c 2 -e signed-integer' @@ -429,7 +298,8 @@ class HTTPStreamer(): asyncio.get_event_loop().create_task( self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) # put chunks from stdout into queue - chunksize = int(sample_rate * (32/8) * 2) # 1 second + # we keep 1 chunk behind to detect end of stream properly + prev_chunk = None while not process.stdout.at_eof(): try: chunk = await process.stdout.readexactly(chunksize) @@ -437,7 +307,11 @@ class HTTPStreamer(): chunk = await process.stdout.read(chunksize) if not chunk: break - yield chunk + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk + # yield last chunk + yield (True, prev_chunk) await process.wait() LOGGER.info("__get_raw_audio for track_id %s completed" % (track_id)) diff --git a/music_assistant/modules/playerproviders/chromecast.py b/music_assistant/modules/playerproviders/chromecast.py index da6e7986..08af0b7d 100644 --- a/music_assistant/modules/playerproviders/chromecast.py +++ b/music_assistant/modules/playerproviders/chromecast.py @@ -318,7 +318,7 @@ class ChromecastProvider(PlayerProvider): cur_queue_index = player.cur_queue_index player.cur_item = self._player_queue[player_id][cur_queue_index] cur_time = mediastatus.adjusted_current_time - while cur_time > player.cur_item.duration: + while cur_time > player.cur_item.duration-10: cur_queue_index -=1 prev_track = self._player_queue[player_id][cur_queue_index] cur_time -= prev_track.duration