From: marcelveldt Date: Sun, 19 May 2019 15:29:01 +0000 (+0200) Subject: improve internal streamer and audio processing X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=cfeced235206e8af28ac8160f1743825e9594cf2;p=music-assistant-server.git improve internal streamer and audio processing --- diff --git a/music_assistant/models.py b/music_assistant/models.py index 456b10b9..5f0a5348 100755 --- a/music_assistant/models.py +++ b/music_assistant/models.py @@ -138,7 +138,6 @@ class MusicProvider(): name = 'My great Music provider' # display name prov_id = 'my_provider' # used as id icon = '' - audio_fmt = 'flac' # the audio format used by this provider when streaming def __init__(self, mass): self.mass = mass @@ -445,6 +444,15 @@ class MusicProvider(): async def remove_playlist_tracks(self, prov_playlist_id, prov_track_ids): ''' remove track(s) from playlist ''' raise NotImplementedError + + async def get_stream_content_type(self, track_id): + ''' return the content type for the given track when it will be streamed''' + raise NotImplementedError + + async def get_stream(self, track_id): + ''' get audio stream for a track ''' + raise NotImplementedError + class PlayerState(str, Enum): Off = "off" diff --git a/music_assistant/modules/musicproviders/file.py b/music_assistant/modules/musicproviders/file.py index 2a480ca2..7d883c40 100644 --- a/music_assistant/modules/musicproviders/file.py +++ b/music_assistant/modules/musicproviders/file.py @@ -229,18 +229,9 @@ class FileProvider(MusicProvider): tracks += await self.get_album_tracks(album.item_id) return tracks[:10] - async def get_stream_details(self, track_id): - ''' returns the stream details for the given track ''' - track = await self.track(track_id) - import socket - host = socket.gethostbyname(socket.gethostname()) - return { - 'mime_type': 'audio/flac', - 'duration': track.duration, - 'sampling_rate': 44100, - 'bit_depth': 16, - 'url': 'http://%s/stream/file/%s' % (host, track_id) - } + async def get_stream_content_type(self, track_id): + ''' return the content type for the given track when it will be streamed''' + return track_id.split('.')[-1] async def get_stream(self, track_id): ''' get audio stream for a track ''' diff --git a/music_assistant/modules/musicproviders/qobuz.py b/music_assistant/modules/musicproviders/qobuz.py index 70a83b8e..fc12348f 100644 --- a/music_assistant/modules/musicproviders/qobuz.py +++ b/music_assistant/modules/musicproviders/qobuz.py @@ -40,7 +40,6 @@ class QobuzProvider(MusicProvider): def __init__(self, mass, username, password): self.name = 'Qobuz' self.prov_id = 'qobuz' - self.audio_fmt = 'flac' self._cur_user = None self.mass = mass self.cache = mass.cache @@ -253,6 +252,10 @@ class QobuzProvider(MusicProvider): params = {'playlist_id': prov_playlist_id, 'track_ids': ",".join(playlist_track_ids)} return await self.__get_data('playlist/deleteTracks', params) + async def get_stream_content_type(self, track_id): + ''' return the content type for the given track when it will be streamed''' + return 'flac' #TODO handle other file formats on qobuz? + async def get_audio_stream(self, track_id): ''' get audio stream for a track ''' params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'} diff --git a/music_assistant/modules/musicproviders/spotify.py b/music_assistant/modules/musicproviders/spotify.py index 49d06538..1871b374 100644 --- a/music_assistant/modules/musicproviders/spotify.py +++ b/music_assistant/modules/musicproviders/spotify.py @@ -39,7 +39,6 @@ class SpotifyProvider(MusicProvider): def __init__(self, mass, username, password): self.name = 'Spotify' self.prov_id = 'spotify' - self.audio_fmt = 'ogg' self._cur_user = None self.mass = mass self.cache = mass.cache @@ -243,6 +242,10 @@ class SpotifyProvider(MusicProvider): opts["offset"] = {"uri": offset_uri } return await self.__put_data('me/player/play', {"device_id": device_id}, opts) + async def get_stream_content_type(self, track_id): + ''' return the content type for the given track when it will be streamed''' + return 'ogg' + async def get_audio_stream(self, track_id): ''' get audio stream for a track ''' import subprocess diff --git a/music_assistant/modules/player.py b/music_assistant/modules/player.py index b3493ab4..48d740ee 100755 --- a/music_assistant/modules/player.py +++ b/music_assistant/modules/player.py @@ -3,7 +3,7 @@ import asyncio import os -from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip +from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task import aiohttp from difflib import SequenceMatcher as Matcher from models import MediaType, PlayerState, MusicPlayer @@ -348,123 +348,126 @@ class Player(): async def get_audio_stream(self, track_id, provider): ''' get audio stream from provider and apply additional effects/processing where needed''' - input_audio_fmt = self.mass.music.providers[provider].audio_fmt + input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) cachefile = self.__get_track_cache_filename(track_id, provider) gain_correct = await self.__get_track_gain_correct(track_id, provider) - sox_effects=['vol', str(gain_correct), 'dB' ] + LOGGER.info("apply gain correction of %s" % gain_correct) + sox_effects='vol %s dB' % gain_correct if os.path.isfile(cachefile): - # we have a temp file for this track which we can use - args = ['-t', input_audio_fmt, cachefile, '-t', 'flac', '-', *sox_effects] + # we have a cache file for this track which we can use + args = ['-t', 'flac', cachefile, '-t', 'flac', '-C', '0', '-', *sox_effects.split(' ')] process = await asyncio.create_subprocess_exec('sox', *args, stdout=asyncio.subprocess.PIPE) buffer_task = None else: # stream from provider - args = ['-t', input_audio_fmt, '-', '-t', 'flac', '-', *sox_effects] + args = ['-t', input_content_type, '-', '-t', 'flac', '-C', '0', '-', *sox_effects.split(' ')] process = await asyncio.create_subprocess_exec('sox', *args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) buffer_task = asyncio.create_task( - self.__fill_audio_buffer(process.stdin, track_id, provider)) - try: - # yield the chunks from stdout - while not process.stdout.at_eof(): - chunk = await process.stdout.read(2000000) - if not chunk: - break - yield chunk - except (asyncio.CancelledError, concurrent.futures._base.CancelledError): - # client disconnected so cleanup - #if buffer_task: - # buffer_task.cancel() - # Could not figure out how to reliably close process without deadlocks - # so instead just read all data for a clean exit - while True: - if not await process.stdout.read(2000000): - break - await process.wait() - LOGGER.info("streaming of track_id %s aborted (client disconnect ?)" % track_id) - raise asyncio.CancelledError() - except Exception as exc: - LOGGER.error(exc) - else: - await process.wait() - LOGGER.info("streaming of track_id %s completed" % track_id) + self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) + # yield the chunks from stdout + while not process.stdout.at_eof(): + chunk = await process.stdout.read(2000000) + if not chunk: + break + yield chunk + await process.wait() + LOGGER.info("streaming of track_id %s completed" % track_id) - async def __analyze_track_audio(self, musicfile, track_id, provider): + async def __analyze_audio(self, tmpfile, track_id, provider, content_type): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' - import platform - analyse_dir = os.path.join(self.mass.datapath, 'analyse_info') - analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1])) - if not os.path.isdir(analyse_dir): - os.makedirs(analyse_dir) - bs1770_binary = None - if platform.system() == "Windows": - bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain") - elif platform.system() == "Darwin": - # macos binary is x86_64 intel - bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain") - elif platform.system() == "Linux": - architecture = platform.machine() - if architecture.startswith('AMD64') or architecture.startswith('x86_64'): - bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain") - # TODO: build armhf binary - cmd = '%s %s --loglevel quiet --xml --ebu -f %s' % (bs1770_binary, musicfile, analysis_file) - process = await asyncio.create_subprocess_shell(cmd) - await process.wait() + LOGGER.info('Start analyzing file %s' % tmpfile) + cachefile = self.__get_track_cache_filename(track_id, provider) + strip_silence = True # TODO: attach config setting + if not os.path.isfile(cachefile): + # not needed to do processing if there already is a cachedfile + bs1770_binary = self.__get_bs1770_binary() + if bs1770_binary: + # calculate integrated r128 loudness with bs1770 + analyse_dir = os.path.join(self.mass.datapath, 'analyse_info') + analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1])) + if not os.path.isfile(analysis_file): + if not os.path.isdir(analyse_dir): + os.makedirs(analyse_dir) + cmd = '%s %s --loglevel quiet --xml --ebu -f %s' % (bs1770_binary, tmpfile, analysis_file) + process = await asyncio.create_subprocess_shell(cmd) + await process.wait() + # use sox to store cache file (optionally strip silence from start and end) + if strip_silence: + cmd = 'sox -t %s %s -t flac -C 5 %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(content_type, tmpfile, cachefile) + else: + # cachefile is always stored as flac + cmd = 'sox -t %s %s -t flac -C 5 %s' %(content_type, tmpfile, cachefile) + process = await asyncio.create_subprocess_shell(cmd) + await process.wait() + # always clean up temp file + if os.path.isfile(tmpfile): + os.remove(tmpfile) + LOGGER.info('Fininished analyzing file %s' % tmpfile) async def __get_track_gain_correct(self, track_id, provider): ''' get the gain correction that should be applied to a track ''' target_gain = -23 fallback_gain = -14 # fallback if no analyse info is available analysis_file = os.path.join(self.mass.datapath, 'analyse_info', "%s_%s.xml" %(provider, track_id.split(os.sep)[-1])) + if not os.path.isfile(analysis_file): + return fallback_gain try: # read audio analysis if available tree = ET.parse(analysis_file) trackinfo = tree.getroot().find("album").find("track") track_lufs = trackinfo.find('integrated').get('lufs') gain_correct = target_gain - float(track_lufs) - LOGGER.info("apply gain correction of %s" % gain_correct) - except Exception: + except Exception as exc: + LOGGER.error('could not retrieve track gain - %s' % exc) gain_correct = fallback_gain # fallback value if os.path.isfile(analysis_file): os.remove(analysis_file) - cachefile = self.__get_track_cache_filename(track_id, provider) + # cachefile = self.__get_track_cache_filename(track_id, provider) # reschedule analyze task to try again - asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider)) - return gain_correct + # asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider)) + return round(gain_correct,2) - async def __fill_audio_buffer(self, buf, track_id, provider): + async def __fill_audio_buffer(self, buf, track_id, provider, content_type): ''' get audio data from provider and write to buffer''' # fill the buffer with audio data # a tempfile is created so we can do audio analysis - try: - tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999))) - finalfile = self.__get_track_cache_filename(track_id, provider) - fd = open(tmpfile, 'wb') - async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id): - buf.write(chunk) - await buf.drain() - fd.write(chunk) + tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999))) + fd = open(tmpfile, 'wb') + async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id): + buf.write(chunk) await buf.drain() - buf.write_eof() - fd.close() - except Exception as exc: - LOGGER.error(exc) - else: - # successfull completion - if os.path.isfile(tmpfile) and not os.path.isfile(finalfile): - shutil.move(tmpfile, finalfile) - asyncio.create_task(self.__analyze_track_audio(finalfile, track_id, provider)) - LOGGER.info("fill_audio_buffer complete for track %s" % track_id) - finally: - # always clean up temp file - if os.path.isfile(tmpfile): - of.remove(tmpfile) + fd.write(chunk) + await buf.drain() + buf.write_eof() + fd.close() + # successfull completion, send tmpfile to be processed in the background + #asyncio.create_task(self.__process_audio(tmpfile, track_id, provider)) + run_async_background_task(self.mass.bg_executor, self.__analyze_audio, tmpfile, track_id, provider, content_type) + LOGGER.info("fill_audio_buffer complete for track %s" % track_id) + return @staticmethod def __get_track_cache_filename(track_id, provider): ''' get filename for a track to use as cache file ''' return os.path.join(AUDIO_CACHE_DIR, '%s_%s' %(provider, track_id.split(os.sep)[-1])) + @staticmethod + def __get_bs1770_binary(): + ''' get the path to the bs1770 binary for the current OS ''' + import platform + bs1770_binary = None + if platform.system() == "Windows": + bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain") + elif platform.system() == "Darwin": + # macos binary is x86_64 intel + bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain") + elif platform.system() == "Linux": + architecture = platform.machine() + if architecture.startswith('AMD64') or architecture.startswith('x86_64'): + bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain") + # TODO: build armhf binary + return bs1770_binary def load_providers(self): ''' dynamically load providers ''' diff --git a/music_assistant/modules/playerproviders/chromecast.py b/music_assistant/modules/playerproviders/chromecast.py index f381fa5a..4423e683 100644 --- a/music_assistant/modules/playerproviders/chromecast.py +++ b/music_assistant/modules/playerproviders/chromecast.py @@ -59,7 +59,10 @@ class ChromecastProvider(PlayerProvider): async def player_command(self, player_id, cmd:str, cmd_args=None): ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' if cmd == 'play': - self._chromecasts[player_id].media_controller.play() + if self._chromecasts[player_id].media_controller.status.media_session_id: + self._chromecasts[player_id].media_controller.play() + else: + await self.__resume_queue(player_id) elif cmd == 'pause': self._chromecasts[player_id].media_controller.pause() elif cmd == 'stop': @@ -67,20 +70,17 @@ class ChromecastProvider(PlayerProvider): elif cmd == 'next': self._chromecasts[player_id].media_controller.queue_next() elif cmd == 'previous': - self._chromecasts[player_id].media_controller.queue_previous() + self._chromecasts[player_id].media_controller.queue_prev() elif cmd == 'power' and cmd_args == 'off': - self._players[player_id].powered = False # power is not supported - await self.mass.player.update_player(self._players[player_id]) + self._chromecasts[player_id].quit_app() # power is not supported so send quit app instead elif cmd == 'power': - self._players[player_id].powered = True # power is not supported + self._chromecasts[player_id].media_controller.launch() elif cmd == 'volume': self._chromecasts[player_id].set_volume(try_parse_int(cmd_args)/100) elif cmd == 'mute' and cmd_args == 'off': self._chromecasts[player_id].set_volume_muted(False) elif cmd == 'mute': self._chromecasts[player_id].set_volume_muted(True) - elif cmd == 'power': - pass # power is not supported on chromecast async def player_queue(self, player_id, offset=0, limit=50): ''' return the items in the player's queue ''' @@ -153,7 +153,7 @@ class ChromecastProvider(PlayerProvider): startindex = 0 elif queue_opt == 'next': # play the new items after the current playing item (insert before current next item) - castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + plcastplayerayer.queue[:cur_queue_index] + castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index] startindex = cur_queue_index else: # overwrite the whole queue with new item(s) @@ -169,11 +169,12 @@ class ChromecastProvider(PlayerProvider): "items": castplayer.queue[:10] } await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + await asyncio.sleep(1) # append the rest of the items in the queue in chunks for chunk in chunks(castplayer.queue[10:], 100): - await asyncio.sleep(1) queuedata = { "type": 'QUEUE_INSERT', "items": chunk } await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + await asyncio.sleep(0.1) elif queue_opt == 'add': # existing queue is playing: simply append items to the end of the queue (in small chunks) castplayer.queue = castplayer.queue + new_queue_items @@ -181,7 +182,7 @@ class ChromecastProvider(PlayerProvider): for chunk in chunks(new_queue_items, 100): queuedata = { "type": 'QUEUE_INSERT', "items": chunk } await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) - await asyncio.sleep(1) + await asyncio.sleep(0.1) elif queue_opt == 'next': # play the new items after the current playing item (insert before current next item) player.queue = castplayer.queue[:cur_queue_index] + new_queue_items + castplayer.queue[cur_queue_index:] @@ -194,6 +195,34 @@ class ChromecastProvider(PlayerProvider): ### Provider specific (helper) methods ##### + async def __resume_queue(self, player_id): + ''' resume queue play after power off ''' + player = self._players[player_id] + castplayer = self._chromecasts[player_id] + media_controller = castplayer.media_controller + receiver_ctrl = media_controller._socket_client.receiver_controller + startindex = 0 + if player.cur_item and player.cur_item.name: + for index, item in enumerate(castplayer.queue): + if item['media']['metadata']['title'] == player.cur_item.name: + startindex = index + break + queuedata = { + "type": 'QUEUE_LOAD', + "repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF", + "shuffle": player.shuffle_enabled, + "queueType": "PLAYLIST", + "startIndex": startindex, # Item index to play after this request or keep same item if undefined + "items": castplayer.queue[:10] + } + await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + await asyncio.sleep(1) + # append the rest of the items in the queue in chunks + for chunk in chunks(castplayer.queue[10:], 100): + await asyncio.sleep(0.1) + queuedata = { "type": 'QUEUE_INSERT', "items": chunk } + await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + async def __send_player_queue(self, receiver_ctrl, media_controller, queuedata): '''send new data to the CC queue''' def app_launched_callback(): @@ -215,7 +244,7 @@ class ChromecastProvider(PlayerProvider): if caststatus: player.muted = caststatus.volume_muted player.volume_level = caststatus.volume_level * 100 - player.powered = not caststatus.is_stand_by + player.powered = chromecast.media_controller.status.media_session_id != None if mediastatus: if mediastatus.player_state in ['PLAYING', 'BUFFERING']: player.state = PlayerState.Playing diff --git a/music_assistant/modules/web.py b/music_assistant/modules/web.py index 75d38dc8..cb1e9700 100755 --- a/music_assistant/modules/web.py +++ b/music_assistant/modules/web.py @@ -11,6 +11,7 @@ from models import MediaType, media_type_from_string from functools import partial json_serializer = partial(json.dumps, default=lambda x: x.__dict__) import ssl +import concurrent def setup(mass): ''' setup the module and read/apply config''' @@ -267,7 +268,17 @@ class Web(): resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'}) + if request.method.upper() == 'HEAD': + return resp await resp.prepare(request) + cancelled = False async for chunk in self.mass.player.get_audio_stream(track_id, provider): - await resp.write(chunk) - return resp \ No newline at end of file + if cancelled: + continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators + try: + await resp.write(chunk) + except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError): + LOGGER.error('client disconnect?') + cancelled = True + if not cancelled: + return resp \ No newline at end of file