From fc9b54c23ceed3c4137c03f45bbdc0e9dbc2a3ae Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 6 Jun 2019 14:41:18 +0200 Subject: [PATCH] add support for crossfading - use high quality settings for sox for all stream alterations - allow crossfading on lms and chromecast - now playing needs fixing when using the crossfade stream --- music_assistant/modules/http_streamer.py | 173 +++++++++++++++++- music_assistant/modules/music.py | 5 +- music_assistant/modules/player.py | 6 +- .../modules/playerproviders/chromecast.py | 28 ++- .../modules/playerproviders/lms.py | 4 + .../modules/playerproviders/pylms.py | 17 +- music_assistant/modules/web.py | 1 + music_assistant/web/pages/config.vue.js | 5 +- 8 files changed, 213 insertions(+), 26 deletions(-) diff --git a/music_assistant/modules/http_streamer.py b/music_assistant/modules/http_streamer.py index c8e26209..0ded2873 100755 --- a/music_assistant/modules/http_streamer.py +++ b/music_assistant/modules/http_streamer.py @@ -12,6 +12,10 @@ import base64 import operator from aiohttp import web import threading +import urllib +import math +from memory_tempfile import MemoryTempfile +import tempfile AUDIO_TEMP_DIR = "/tmp/audio_tmp" AUDIO_CACHE_DIR = "/tmp/audio_cache" @@ -74,10 +78,10 @@ class HTTPStreamer(): break await resp.write(chunk) queue.task_done() - LOGGER.info("Finished streaming %s" % track_id) + LOGGER.info("stream_track fininished for %s" % track_id) except asyncio.CancelledError: cancelled.set() - LOGGER.info("Streaming interrupted for %s" % track_id) + LOGGER.info("stream_track interrupted for %s" % track_id) raise asyncio.CancelledError() return resp @@ -123,13 +127,161 @@ class HTTPStreamer(): raise asyncio.CancelledError() return resp + async def stream_queue(self, http_request): + ''' start streaming radio from provider ''' + player_id = http_request.query.get('player_id') + cancelled = threading.Event() + resp = web.StreamResponse(status=200, + reason='OK', + headers={'Content-Type': 'audio/flac'}) + await resp.prepare(http_request) + if http_request.method.upper() != 'HEAD': + # stream audio + queue = asyncio.Queue() + cancelled = threading.Event() + task = run_async_background_task( + self.mass.bg_executor, + self.__stream_queue, player_id, queue, cancelled) + try: + while True: + chunk = await queue.get() + await resp.write(chunk) + queue.task_done() + if not chunk: + break + LOGGER.info("stream_queue fininished for %s" % player_id) + except asyncio.CancelledError: + cancelled.set() + LOGGER.info("stream_queue interrupted for %s" % player_id) + raise asyncio.CancelledError() + return resp + + async def __stream_queue(self, player_id, buffer, cancelled): + ''' start streaming radio from provider ''' + # stream audio with sox + queue_tracks = await self.mass.player.player_queue(player_id, 0, 1000) + sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate'] + fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"] + pcm_args = 'raw -b 64 -c 2 -e floating-point -r %s' % sample_rate + args = 'sox -t %s - -t flac -C 2 -' % pcm_args + sox_proc = await asyncio.create_subprocess_shell(args, + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + + async def fill_buffer(): + while not sox_proc.stdout.at_eof(): + chunk = await sox_proc.stdout.read(256000) + if not chunk: + break + await buffer.put(chunk) + await sox_proc.wait() + await buffer.put('') # indicate EOF + LOGGER.info("streaming of queue for player %s completed" % player_id) + asyncio.create_task(fill_buffer()) + + last_fadeout_data = None + for queue_track in queue_tracks: + + while buffer.qsize() > 5 and not cancelled.is_set(): + await asyncio.sleep(1) + if cancelled.is_set(): + break + + params = urllib.parse.parse_qs(queue_track.uri.split('?')[1]) + track_id = params['track_id'][0] + provider = params['provider'][0] + LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name)) + temp_file = await self.__get_pcm_audio(track_id, provider, sample_rate) + + # get fade in part + args = 'sox -t %s %s -t %s - trim 0 %s fade t %s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + fade_in_part, stderr = await process.communicate() + LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part)) + if last_fadeout_data: + # perform crossfade with previous fadeout samples + fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + fadeinfile.write(fade_in_part) + fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + fadeoutfile.write(last_fadeout_data) + args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + crossfade_part, stderr = await process.communicate(fade_in_part) + LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) + sox_proc.stdin.write(crossfade_part) + await sox_proc.stdin.drain() + fadeinfile.close() + fadeoutfile.close() + else: + # simply put the fadein part in the final file + sox_proc.stdin.write(fade_in_part) + await sox_proc.stdin.drain() + + # get middle frames (main track without the fade-in and fade-out) + args = 'sox -t %s %s -t %s - trim %s -%s' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + middle_part, stderr = await process.communicate() + LOGGER.debug("Got %s bytes in memory for middle_part after sox" % len(middle_part)) + sox_proc.stdin.write(middle_part) + await sox_proc.stdin.drain() + + # get fade out part (all remaining chunks of 1 second) + args = 'sox -t %s %s -t %s - reverse trim 0 %s fade t %s reverse ' % (pcm_args, temp_file.name, pcm_args, fade_length, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + fade_out_part, stderr = await process.communicate() + LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(fade_out_part)) + last_fadeout_data = fade_out_part + # close temp file + temp_file.close() + # end of queue reached, pass last fadeout bits to final output + if last_fadeout_data: + sox_proc.stdin.write(last_fadeout_data) + await sox_proc.stdin.drain() + + async def __get_pcm_audio(self, track_id, provider, sample_rate=96000): + ''' get raw pcm data for a track upsampled to given sample_rate packed as wav ''' + temp_audiofile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + cachefile = self.__get_track_cache_filename(track_id, provider) + if self.mass.config['base']['http_streamer']['volume_normalisation']: + gain_correct = await self.__get_track_gain_correct(track_id, provider) + else: + gain_correct = -6 # always need some headroom for upsampling and crossfades + if os.path.isfile(cachefile): + # we have a cache file for this track which we can use + # always convert to 64 bit floating point to do any processing/effects + args = 'sox -t flac "%s" -t wav -c 2 -e floating-point -b 64 - | sox -t wav - -t raw %s vol %s dB rate -v %s' % (cachefile, temp_audiofile.name, gain_correct, sample_rate) + process = await asyncio.create_subprocess_shell(args) + else: + # stream from provider + # always convert to 64 bit floating point to do any processing/effects + input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) + assert(input_content_type) + args = 'sox -t %s - -t wav -c 2 -e floating-point -b 64 - | sox -t wav - -t raw %s vol %s dB rate -v %s' % (input_content_type, temp_audiofile.name, gain_correct, sample_rate) + process = await asyncio.create_subprocess_shell(args, + stdin=asyncio.subprocess.PIPE) + asyncio.get_event_loop().create_task( + self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) + await process.wait() + LOGGER.debug("__get_pcm_audio for track_id %s completed" % track_id) + return temp_audiofile + async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' cachefile = self.__get_track_cache_filename(track_id, provider) sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False) + if self.mass.config['base']['http_streamer']['volume_normalisation']: + gain_correct = await self.__get_track_gain_correct(track_id, provider) + sox_effects += ' vol %s dB ' % gain_correct if os.path.isfile(cachefile): # we have a cache file for this track which we can use - args = 'sox -t flac %s -t flac -C 0 - %s' % (cachefile, sox_effects) + if sox_effects.strip(): + # always convert to 64 bit floating point to do any processing/effects + args = 'sox -t flac "%s" -t wav -b 64 -e floating-point - | sox -t wav - -t flac -C 2 - %s' % (cachefile, sox_effects) + else: + args = 'sox -t flac "%s" -t flac -C 2 - %s' % cachefile LOGGER.info("Running sox with args: %s" % args) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) @@ -138,7 +290,11 @@ class HTTPStreamer(): # stream from provider input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) assert(input_content_type) - args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects) + if sox_effects.strip(): + # always convert to 64 bit floating point to do any processing/effects + args = 'sox -t %s - -t wav -b 64 -e floating-point - | sox -t wav - -t flac -C 2 - %s' % (input_content_type, sox_effects) + else: + args = 'sox -t %s - -t flac -C 0 -' % (input_content_type) LOGGER.info("Running sox with args: %s" % args) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) @@ -146,7 +302,7 @@ class HTTPStreamer(): self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) # put chunks from stdout into queue while not process.stdout.at_eof(): - chunk = await process.stdout.read(256000) + chunk = await process.stdout.read(705600) if not chunk: break if not cancelled.is_set(): @@ -156,9 +312,9 @@ class HTTPStreamer(): await process.wait() await audioqueue.put('') # indicate EOF if cancelled.is_set(): - LOGGER.info("streaming of track_id %s interrupted" % track_id) + LOGGER.info("__get_audio_stream for track_id %s interrupted" % track_id) else: - LOGGER.info("streaming of track_id %s completed" % track_id) + LOGGER.info("__get_audio_stream for track_id %s completed" % track_id) async def __get_player_sox_options(self, track_id, provider, player_id, is_radio): ''' get player specific sox options ''' @@ -185,9 +341,6 @@ class HTTPStreamer(): sox_effects += 'rate -v 48000' if player_id and self.mass.config['player_settings'][player_id]['sox_effects']: sox_effects += ' ' + self.mass.config['player_settings'][player_id]['sox_effects'] - if self.mass.config['base']['http_streamer']['volume_normalisation']: - gain_correct = await self.__get_track_gain_correct(track_id, provider) - sox_effects += ' vol %s dB ' % gain_correct return sox_effects async def __analyze_audio(self, tmpfile, track_id, provider, content_type): diff --git a/music_assistant/modules/music.py b/music_assistant/modules/music.py index 3445fb77..04b62f22 100755 --- a/music_assistant/modules/music.py +++ b/music_assistant/modules/music.py @@ -186,7 +186,10 @@ class Music(): for prov in self.providers.values(): prov_results = await prov.search(searchquery, media_types, limit) for item_type, items in prov_results.items(): - result[item_type] += items + if not item_type in result: + result[item_type] = items + else: + result[item_type] += items # filter out duplicates for item_type, items in result.items(): items = list(toolz.unique(items, key=operator.attrgetter('item_id'))) diff --git a/music_assistant/modules/player.py b/music_assistant/modules/player.py index 1928d1d2..33250db9 100755 --- a/music_assistant/modules/player.py +++ b/music_assistant/modules/player.py @@ -182,8 +182,6 @@ class Player(): # handle basic player settings player_details.enabled = player.settings['enabled'] player_details.name = player.settings['name'] if player.settings['name'] else player_details.name - if player.settings['group_parent']: - player_details.group_parent = player.settings['group_parent'] # handle hass integration await self.__update_player_hass_settings(player_details, player.settings) # handle mute as power setting @@ -279,9 +277,11 @@ class Player(): ("mute_as_power", False, "player_mute_power"), ("disable_volume", False, "player_disable_vol"), ("sox_effects", '', "http_streamer_sox_effects"), - ("max_sample_rate", '96000', "max_sample_rate"), + ("max_sample_rate", 96000, "max_sample_rate"), ("force_http_streamer", False, "force_http_streamer") ] + # append provider specific player settings + config_entries += await self.mass.player.providers[player_details.player_provider].player_config_entries() if player_details.is_group: config_entries += [ # group player settings ("apply_group_volume", False, "player_group_vol"), diff --git a/music_assistant/modules/playerproviders/chromecast.py b/music_assistant/modules/playerproviders/chromecast.py index 58d658d1..d383a2e2 100644 --- a/music_assistant/modules/playerproviders/chromecast.py +++ b/music_assistant/modules/playerproviders/chromecast.py @@ -58,6 +58,12 @@ class ChromecastProvider(PlayerProvider): ### Provider specific implementation ##### + async def player_config_entries(self): + ''' get the player config entries for this provider (list with key/value pairs)''' + return [ + ("crossfade_duration", 0, "crossfade_duration"), + ] + async def player_command(self, player_id, cmd:str, cmd_args=None): ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' count = 0 @@ -151,7 +157,7 @@ class ChromecastProvider(PlayerProvider): ''' load queue on player with given queue items ''' castplayer = self._chromecasts[player_id] player = self._players[player_id] - queue_items = await self.__create_queue_items(new_tracks[:50]) + queue_items = await self.__create_queue_items(new_tracks[:50], player_id=player_id) queuedata = { "type": 'QUEUE_LOAD', "repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF", @@ -169,7 +175,7 @@ class ChromecastProvider(PlayerProvider): async def __queue_insert(self, player_id, new_tracks, insert_before=None): ''' insert item into the player queue ''' castplayer = self._chromecasts[player_id] - queue_items = await self.__create_queue_items(new_tracks) + queue_items = await self.__create_queue_items(new_tracks, player_id=player_id) for chunk in chunks(queue_items, 50): queuedata = { "type": 'QUEUE_INSERT', @@ -203,16 +209,21 @@ class ChromecastProvider(PlayerProvider): tracks = self._player_queue[player_id] await self.__queue_load(player_id, tracks, queue_index) - async def __create_queue_items(self, tracks): + async def __create_queue_items(self, tracks, player_id): ''' create list of CC queue items from tracks ''' queue_items = [] for track in tracks: - queue_item = await self.__create_queue_item(track) + queue_item = await self.__create_queue_item(track, player_id) queue_items.append(queue_item) return queue_items - async def __create_queue_item(self, track): + async def __create_queue_item(self, track, player_id): '''create queue item from track info ''' + enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0 + if enable_crossfade: + uri = 'http://%s:%s/stream_queue?player_id=%s'% (self.mass.player.local_ip, self.mass.config['base']['web']['http_port'], player_id) + else: + uri = track.uri return { 'autoplay' : True, 'preloadTime' : 10, @@ -220,7 +231,7 @@ class ChromecastProvider(PlayerProvider): 'startTime' : 0, 'activeTrackIds' : [], 'media': { - 'contentId': track.uri, + 'contentId': uri, 'customData': { 'provider': track.provider, 'uri': track.uri, @@ -296,11 +307,14 @@ class ChromecastProvider(PlayerProvider): elif uri.startswith('qobuz://') and 'qobuz' in self.mass.music.providers: track_id = uri.replace('qobuz://','').replace('.flac','') track = await self.mass.music.providers['qobuz'].track(track_id) - elif uri.startswith('http') and '/stream' in uri: + elif uri.startswith('http') and '/stream_track' in uri: params = urllib.parse.parse_qs(uri.split('?')[1]) track_id = params['track_id'][0] provider = params['provider'][0] track = await self.mass.music.providers[provider].track(track_id) + elif uri.startswith('http') and '/stream_queue' in uri: + track = Track() + track.name = "Crossfade Queue streaming" return track async def __handle_group_members_update(self, mz, added_player=None, removed_player=None): diff --git a/music_assistant/modules/playerproviders/lms.py b/music_assistant/modules/playerproviders/lms.py index 9aa94263..a50595ad 100644 --- a/music_assistant/modules/playerproviders/lms.py +++ b/music_assistant/modules/playerproviders/lms.py @@ -59,6 +59,10 @@ class LMSProvider(PlayerProvider): ### Provider specific implementation ##### + async def player_config_entries(self): + ''' get the player config entries for this provider (list with key/value pairs)''' + return [] + async def player_command(self, player_id, cmd:str, cmd_args=None): ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' lms_commands = [] diff --git a/music_assistant/modules/playerproviders/pylms.py b/music_assistant/modules/playerproviders/pylms.py index 63fcd949..bbaf6595 100644 --- a/music_assistant/modules/playerproviders/pylms.py +++ b/music_assistant/modules/playerproviders/pylms.py @@ -54,6 +54,7 @@ class PyLMSServer(PlayerProvider): ### Provider specific implementation ##### + async def start_discovery(self): transport, protocol = await self.mass.event_loop.create_datagram_endpoint( lambda: DiscoveryProtocol(self.mass.web._http_port), @@ -64,6 +65,12 @@ class PyLMSServer(PlayerProvider): finally: transport.close() + async def player_config_entries(self): + ''' get the player config entries for this provider (list with key/value pairs)''' + return [ + ("crossfade_duration", 0, "crossfade_duration"), + ] + async def player_command(self, player_id, cmd:str, cmd_args=None): ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' if cmd == 'play': @@ -211,6 +218,7 @@ class PyLMSServer(PlayerProvider): ''' handle events from player''' if event == "connected": self._lmsplayers[lms_player.player_id] = lms_player + lms_player.player_settings = self.mass.config['player_settings'][lms_player.player_id] asyncio.create_task(self.__handle_player_event(lms_player.player_id, event, event_data)) try: @@ -249,6 +257,7 @@ class PyLMSPlayer(object): self.send_event = None self.stream_host = stream_host self.stream_port = stream_port + self.player_settings = {} self.playback_millis = 0 self._volume = PyLMSVolume() self._device_type = None @@ -356,12 +365,12 @@ class PyLMSPlayer(object): self._volume.volume = new_vol self.send_volume() - def play(self, uri, crossfade=True): - # TODO: attach crossfade to a config setting + def play(self, uri): + enable_crossfade = self.player_settings["crossfade_duration"] > 0 command = b's' autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers - transType= b'1' if crossfade else b'0' - transDuration = 10 if crossfade else 0 + transType= b'1' if enable_crossfade else b'0' + transDuration = self.player_settings["crossfade_duration"] formatbyte = b'f' # fixed to flac uri = '/stream' + uri.split('/stream')[1] data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration) diff --git a/music_assistant/modules/web.py b/music_assistant/modules/web.py index a1e5d37f..8db5d85a 100755 --- a/music_assistant/modules/web.py +++ b/music_assistant/modules/web.py @@ -71,6 +71,7 @@ class Web(): app.add_routes([web.get('/ws', self.websocket_handler)]) app.add_routes([web.get('/stream_track', self.mass.http_streamer.stream_track)]) app.add_routes([web.get('/stream_radio', self.mass.http_streamer.stream_radio)]) + app.add_routes([web.get('/stream_queue', self.mass.http_streamer.stream_queue)]) app.add_routes([web.get('/api/search', self.search)]) app.add_routes([web.get('/api/config', self.get_config)]) app.add_routes([web.post('/api/config', self.save_config)]) diff --git a/music_assistant/web/pages/config.vue.js b/music_assistant/web/pages/config.vue.js index 570b40bb..3fd24a1e 100755 --- a/music_assistant/web/pages/config.vue.js +++ b/music_assistant/web/pages/config.vue.js @@ -54,6 +54,8 @@ var Config = Vue.component('Config', { item-text="name" item-value="id" box> + + @@ -80,7 +82,8 @@ var Config = Vue.component('Config', { return { conf: {}, players: {}, - active: 0 + active: 0, + sample_rates: [44100, 48000, 88200, 96000, 192000, 384000] } }, computed: { -- 2.34.1