From: Marcel van der Veldt Date: Thu, 6 Jun 2019 15:23:43 +0000 (+0200) Subject: Update http_streamer.py X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=58c4f3ae4bf0b4523a86282b435cb47902def607;p=music-assistant-server.git Update http_streamer.py --- diff --git a/music_assistant/modules/http_streamer.py b/music_assistant/modules/http_streamer.py index 6a241111..67515e2d 100755 --- a/music_assistant/modules/http_streamer.py +++ b/music_assistant/modules/http_streamer.py @@ -156,13 +156,13 @@ class HTTPStreamer(): raise asyncio.CancelledError() return resp - async def __stream_queue(self, player_id, buffer, cancelled): + async def __stream_queue__old_usingfile(self, player_id, buffer, cancelled): ''' start streaming radio from provider ''' # stream audio with sox queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000) sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate'] fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"] - pcm_args = 'raw -b 64 -c 2 -e floating-point -r %s' % sample_rate + pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate args = 'sox -t %s - -t flac -C 2 -' % pcm_args sox_proc = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) @@ -190,10 +190,10 @@ class HTTPStreamer(): track_id = params['track_id'][0] provider = params['provider'][0] LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name)) - temp_file = await self.__get_pcm_audio(track_id, provider, sample_rate) + temp_file = await self.__get_raw_audio(track_id, provider, sample_rate) # get fade in part - args = 'sox -t %s %s -t %s - trim 0 %s fade t %s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length) + args = 'sox -t sox %s -t %s - trim 0 %s fade t %s' % (temp_file.name, pcm_args, fade_length, fade_length) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) fade_in_part, stderr = await process.communicate() @@ -219,7 +219,7 @@ class HTTPStreamer(): await sox_proc.stdin.drain() # get middle frames (main track without the fade-in and fade-out) - args = 'sox -t %s %s -t %s - trim %s -%s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length) + args = 'sox -t sox %s -t %s - trim %s -%s' % (temp_file.name, pcm_args, fade_length, fade_length) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) middle_part, stderr = await process.communicate() @@ -228,7 +228,7 @@ class HTTPStreamer(): await sox_proc.stdin.drain() # get fade out part (all remaining chunks of 1 second) - args = 'sox -t %s %s -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length) + args = 'sox -t sox %s -t %s - reverse trim 0 %s fade t %s reverse ' % (temp_file.name, pcm_args, fade_length, fade_length) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) fade_out_part, stderr = await process.communicate() @@ -241,7 +241,7 @@ class HTTPStreamer(): sox_proc.stdin.write(last_fadeout_data) await sox_proc.stdin.drain() - async def __get_pcm_audio(self, track_id, provider, sample_rate=96000): + async def __get_raw_audio__old_usingfile(self, track_id, provider, sample_rate=96000): ''' get raw pcm data for a track upsampled to given sample_rate packed as wav ''' temp_audiofile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) cachefile = self.__get_track_cache_filename(track_id, provider) @@ -252,14 +252,14 @@ class HTTPStreamer(): if os.path.isfile(cachefile): # we have a cache file for this track which we can use # always convert to 64 bit floating point to do any processing/effects - args = 'sox -V3 -t flac "%s" -t wav -c 2 -e floating-point -b 64 - | sox -V3 -t wav - -t raw %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate) + args = 'sox -V3 -t flac "%s" -t sox %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate) process = await asyncio.create_subprocess_shell(args) else: # stream from provider # always convert to 64 bit floating point to do any processing/effects input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) assert(input_content_type) - args = 'sox -V3 -t %s - -t wav -c 2 -e floating-point -b 64 - | sox -V3 -t wav - -t raw %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate) + args = 'sox -V3 -t %s - -t sox %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate) process = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE) asyncio.get_event_loop().create_task( @@ -268,6 +268,123 @@ class HTTPStreamer(): LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id) return temp_audiofile + async def __stream_queue(self, player_id, buffer, cancelled): + ''' start streaming all queue tracks ''' + # TODO: get correct queue index and implement reporting of position + queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000) + sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate'] + fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"] + pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate + args = 'sox -t %s - -t flac -C 2 -' % pcm_args + sox_proc = await asyncio.create_subprocess_shell(args, + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + + async def fill_buffer(): + while not sox_proc.stdout.at_eof(): + chunk = await sox_proc.stdout.read(256000) + if not chunk: + break + await buffer.put(chunk) + await buffer.put('') # indicate EOF + asyncio.create_task(fill_buffer()) + + last_fadeout_data = None + for queue_track in queue_tracks: + + while buffer.qsize() > 5 and not cancelled.is_set(): + await asyncio.sleep(1) + if cancelled.is_set(): + break + + params = urllib.parse.parse_qs(queue_track.uri.split('?')[1]) + track_id = params['track_id'][0] + provider = params['provider'][0] + LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name)) + audiodata = await self.__get_raw_audio(track_id, provider, sample_rate) + + # get fade in part + args = 'sox -t sox - -t %s - trim 0 %s fade t %s' % (pcm_args, fade_length, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + fade_in_part, stderr = await process.communicate(audiodata[0:15360000]) + LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part)) + if last_fadeout_data: + # perform crossfade with previous fadeout samples + fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + fadeinfile.write(fade_in_part) + fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + fadeoutfile.write(last_fadeout_data) + args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + crossfade_part, stderr = await process.communicate(fade_in_part) + LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) + sox_proc.stdin.write(crossfade_part) + await sox_proc.stdin.drain() + fadeinfile.close() + fadeoutfile.close() + del crossfade_part + del fade_in_part + last_fadeout_data = None + else: + # simply put the fadein part in the final file + sox_proc.stdin.write(fade_in_part) + await sox_proc.stdin.drain() + del fade_in_part + + # get middle frames (main track without the fade-in and fade-out) + args = 'sox -t sox - -t %s - trim %s -%s' % (pcm_args, fade_length, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + middle_part, stderr = await process.communicate(audiodata) + LOGGER.debug("Got %s bytes in memory for middle_part after sox" % len(middle_part)) + sox_proc.stdin.write(middle_part) + await sox_proc.stdin.drain() + del middle_part + + # get fade out part + args = 'sox -t sox - -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, fade_length, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + last_fadeout_data, stderr = await process.communicate(audiodata) + LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data)) + # cleanup audio data + del audiodata + + # end of queue reached, pass last fadeout bits to final output + if last_fadeout_data: + sox_proc.stdin.write(last_fadeout_data) + await sox_proc.stdin.drain() + sox_proc.stdin.close() + await sox_proc.wait() + LOGGER.info("streaming of queue for player %s completed" % player_id) + + async def __get_raw_audio(self, track_id, provider, sample_rate=96000): + ''' get raw pcm data for a track upsampled to given sample_rate packed as wav ''' + audiodata = b'' + cachefile = self.__get_track_cache_filename(track_id, provider) + if self.mass.config['base']['http_streamer']['volume_normalisation']: + gain_correct = await self.__get_track_gain_correct(track_id, provider) + else: + gain_correct = -6 # always need some headroom for upsampling and crossfades + if os.path.isfile(cachefile): + # we have a cache file for this track which we can use + args = 'sox -t flac "%s" -t sox - vol %s dB rate -v %s' % (cachefile, gain_correct, sample_rate) + process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) + else: + # stream from provider + input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) + assert(input_content_type) + args = 'sox -t %s - -t sox - vol %s dB rate -v %s' % (input_content_type, gain_correct, sample_rate) + process = await asyncio.create_subprocess_shell(args, + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + asyncio.get_event_loop().create_task( + self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) + #await process.wait() + audiodata, stderr = await process.communicate() + LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id) + return audiodata + async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' cachefile = self.__get_track_cache_filename(track_id, provider) @@ -278,7 +395,7 @@ class HTTPStreamer(): if os.path.isfile(cachefile): # we have a cache file for this track which we can use if sox_effects.strip(): - args = 'sox -V3 -t flac "%s" -t flac -C 0 - %s' % (cachefile, sox_effects) + args = 'sox -t flac "%s" -t flac -C 0 - %s' % (cachefile, sox_effects) else: args = 'sox -t flac "%s" -t flac -C 0 - %s' % cachefile LOGGER.debug("Running sox with args: %s" % args) @@ -290,7 +407,7 @@ class HTTPStreamer(): input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) assert(input_content_type) if sox_effects.strip(): - args = 'sox -V3 -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects) + args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects) else: args = 'sox -t %s - -t flac -C 0 -' % (input_content_type) LOGGER.debug("Running sox with args: %s" % args)