improve internal streamer and audio processing
authormarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Sun, 19 May 2019 15:29:01 +0000 (17:29 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Sun, 19 May 2019 15:29:01 +0000 (17:29 +0200)
music_assistant/models.py
music_assistant/modules/musicproviders/file.py
music_assistant/modules/musicproviders/qobuz.py
music_assistant/modules/musicproviders/spotify.py
music_assistant/modules/player.py
music_assistant/modules/playerproviders/chromecast.py
music_assistant/modules/web.py

index 456b10b9131f33323584df2f407a0d68b8290f91..5f0a534871891c821d24dda4378139a73cab00ff 100755 (executable)
@@ -138,7 +138,6 @@ class MusicProvider():
     name = 'My great Music provider' # display name
     prov_id = 'my_provider' # used as id
     icon = ''
-    audio_fmt = 'flac' # the audio format used by this provider when streaming
 
     def __init__(self, mass):
         self.mass = mass
@@ -445,6 +444,15 @@ class MusicProvider():
     async def remove_playlist_tracks(self, prov_playlist_id, prov_track_ids):
         ''' remove track(s) from playlist '''
         raise NotImplementedError
+
+    async def get_stream_content_type(self, track_id):
+        ''' return the content type for the given track when it will be streamed'''
+        raise NotImplementedError
+    
+    async def get_stream(self, track_id):
+        ''' get audio stream for a track '''
+        raise NotImplementedError
+    
     
 class PlayerState(str, Enum):
     Off = "off"
index 2a480ca21b71b658274da8d6ede31c321dce402b..7d883c40a5ff78e9bb7965ddd3bd130375561964 100644 (file)
@@ -229,18 +229,9 @@ class FileProvider(MusicProvider):
             tracks += await self.get_album_tracks(album.item_id)
         return tracks[:10]
 
-    async def get_stream_details(self, track_id):
-        ''' returns the stream details for the given track '''
-        track = await self.track(track_id)
-        import socket
-        host = socket.gethostbyname(socket.gethostname())
-        return {
-            'mime_type': 'audio/flac',
-            'duration': track.duration,
-            'sampling_rate': 44100,
-            'bit_depth': 16,
-            'url': 'http://%s/stream/file/%s' % (host, track_id)
-        }
+    async def get_stream_content_type(self, track_id):
+        ''' return the content type for the given track when it will be streamed'''
+        return track_id.split('.')[-1]
     
     async def get_stream(self, track_id):
         ''' get audio stream for a track '''
index 70a83b8ebc3edac183cc24b55a1549569825109b..fc12348fdbc736a2b9c518efa133479c1187a223 100644 (file)
@@ -40,7 +40,6 @@ class QobuzProvider(MusicProvider):
     def __init__(self, mass, username, password):
         self.name = 'Qobuz'
         self.prov_id = 'qobuz'
-        self.audio_fmt = 'flac'
         self._cur_user = None
         self.mass = mass
         self.cache = mass.cache
@@ -253,6 +252,10 @@ class QobuzProvider(MusicProvider):
         params = {'playlist_id': prov_playlist_id, 'track_ids': ",".join(playlist_track_ids)}
         return await self.__get_data('playlist/deleteTracks', params)
     
+    async def get_stream_content_type(self, track_id):
+        ''' return the content type for the given track when it will be streamed'''
+        return 'flac' #TODO handle other file formats on qobuz?
+
     async def get_audio_stream(self, track_id):
         ''' get audio stream for a track '''
         params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'}
index 49d06538c1ec2ded93eb827875b8f39234dbe1a6..1871b374c7b704a6dba4bd9254b0503483b16996 100644 (file)
@@ -39,7 +39,6 @@ class SpotifyProvider(MusicProvider):
     def __init__(self, mass, username, password):
         self.name = 'Spotify'
         self.prov_id = 'spotify'
-        self.audio_fmt = 'ogg'
         self._cur_user = None
         self.mass = mass
         self.cache = mass.cache
@@ -243,6 +242,10 @@ class SpotifyProvider(MusicProvider):
             opts["offset"] = {"uri": offset_uri }
         return await self.__put_data('me/player/play', {"device_id": device_id}, opts)
     
+    async def get_stream_content_type(self, track_id):
+        ''' return the content type for the given track when it will be streamed'''
+        return 'ogg'
+
     async def get_audio_stream(self, track_id):
         ''' get audio stream for a track '''
         import subprocess
index b3493ab4c1940cf085ee2fc7315d3c4fa26a8506..48d740eef80ac4918cf897de4915dadebabe2871 100755 (executable)
@@ -3,7 +3,7 @@
 
 import asyncio
 import os
-from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip
+from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task
 import aiohttp
 from difflib import SequenceMatcher as Matcher
 from models import MediaType, PlayerState, MusicPlayer
@@ -348,123 +348,126 @@ class Player():
 
     async def get_audio_stream(self, track_id, provider):
         ''' get audio stream from provider and apply additional effects/processing where needed'''
-        input_audio_fmt = self.mass.music.providers[provider].audio_fmt
+        input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
         cachefile = self.__get_track_cache_filename(track_id, provider)
         gain_correct = await self.__get_track_gain_correct(track_id, provider)
-        sox_effects=['vol', str(gain_correct), 'dB'    ]    
+        LOGGER.info("apply gain correction of %s" % gain_correct)
+        sox_effects='vol %s dB' % gain_correct
         if os.path.isfile(cachefile):
-            # we have a temp file for this track which we can use
-            args = ['-t', input_audio_fmt, cachefile, '-t', 'flac', '-', *sox_effects]
+            # we have a cache file for this track which we can use
+            args = ['-t', 'flac', cachefile, '-t', 'flac', '-C', '0', '-', *sox_effects.split(' ')]
             process = await asyncio.create_subprocess_exec('sox', *args, 
                     stdout=asyncio.subprocess.PIPE)
             buffer_task = None
         else:
             # stream from provider
-            args = ['-t', input_audio_fmt, '-', '-t', 'flac', '-', *sox_effects]
+            args = ['-t', input_content_type, '-', '-t', 'flac', '-C', '0', '-', *sox_effects.split(' ')]
             process = await asyncio.create_subprocess_exec('sox', *args, 
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
             buffer_task = asyncio.create_task(
-                    self.__fill_audio_buffer(process.stdin, track_id, provider))
-        try:
-            # yield the chunks from stdout
-            while not process.stdout.at_eof():
-                chunk = await process.stdout.read(2000000)
-                if not chunk:
-                    break
-                yield chunk
-        except (asyncio.CancelledError, concurrent.futures._base.CancelledError):
-            # client disconnected so cleanup
-            #if buffer_task:
-            #    buffer_task.cancel()
-            # Could not figure out how to reliably close process without deadlocks 
-            # so instead just read all data for a clean exit
-            while True:
-                if not await process.stdout.read(2000000):
-                    break
-            await process.wait()
-            LOGGER.info("streaming of track_id %s aborted (client disconnect ?)" % track_id)
-            raise asyncio.CancelledError()
-        except Exception as exc:
-            LOGGER.error(exc)
-        else:
-            await process.wait()
-            LOGGER.info("streaming of track_id %s completed" % track_id)
+                    self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+        # yield the chunks from stdout
+        while not process.stdout.at_eof():
+            chunk = await process.stdout.read(2000000)
+            if not chunk:
+                break
+            yield chunk
+        await process.wait()
+        LOGGER.info("streaming of track_id %s completed" % track_id)
 
-    async def __analyze_track_audio(self, musicfile, track_id, provider):
+    async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
         ''' analyze track audio, for now we only calculate EBU R128 loudness '''
-        import platform
-        analyse_dir = os.path.join(self.mass.datapath, 'analyse_info')
-        analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
-        if not os.path.isdir(analyse_dir):
-            os.makedirs(analyse_dir)
-        bs1770_binary = None
-        if platform.system() == "Windows":
-            bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain")
-        elif platform.system() == "Darwin":
-            # macos binary is x86_64 intel
-            bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain")
-        elif platform.system() == "Linux":
-            architecture = platform.machine()
-            if architecture.startswith('AMD64') or architecture.startswith('x86_64'):
-                bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain")
-            # TODO: build armhf binary
-        cmd = '%s %s --loglevel quiet --xml --ebu -f %s' % (bs1770_binary, musicfile, analysis_file)
-        process = await asyncio.create_subprocess_shell(cmd)
-        await process.wait()
+        LOGGER.info('Start analyzing file %s' % tmpfile)
+        cachefile = self.__get_track_cache_filename(track_id, provider)
+        strip_silence = True # TODO: attach config setting
+        if not os.path.isfile(cachefile):
+            # not needed to do processing if there already is a cachedfile
+            bs1770_binary = self.__get_bs1770_binary()
+            if bs1770_binary:
+                # calculate integrated r128 loudness with bs1770
+                analyse_dir = os.path.join(self.mass.datapath, 'analyse_info')
+                analysis_file = os.path.join(analyse_dir, "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
+                if not os.path.isfile(analysis_file):
+                    if not os.path.isdir(analyse_dir):
+                        os.makedirs(analyse_dir)
+                    cmd = '%s %s --loglevel quiet --xml --ebu -f %s' % (bs1770_binary, tmpfile, analysis_file)
+                    process = await asyncio.create_subprocess_shell(cmd)
+                    await process.wait()
+            # use sox to store cache file (optionally strip silence from start and end)
+            if strip_silence:
+                cmd = 'sox -t %s %s -t flac -C 5 %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(content_type, tmpfile, cachefile)
+            else:
+                # cachefile is always stored as flac 
+                cmd = 'sox -t %s %s -t flac -C 5 %s' %(content_type, tmpfile, cachefile)
+            process = await asyncio.create_subprocess_shell(cmd)
+            await process.wait()
+        # always clean up temp file
+        if os.path.isfile(tmpfile):
+            os.remove(tmpfile)
+        LOGGER.info('Fininished analyzing file %s' % tmpfile)
     
     async def __get_track_gain_correct(self, track_id, provider):
         ''' get the gain correction that should be applied to a track '''
         target_gain = -23
         fallback_gain = -14 # fallback if no analyse info is available
         analysis_file = os.path.join(self.mass.datapath, 'analyse_info', "%s_%s.xml" %(provider, track_id.split(os.sep)[-1]))
+        if not os.path.isfile(analysis_file):
+            return fallback_gain
         try: # read audio analysis if available
             tree = ET.parse(analysis_file)
             trackinfo = tree.getroot().find("album").find("track")
             track_lufs = trackinfo.find('integrated').get('lufs')
             gain_correct = target_gain - float(track_lufs)
-            LOGGER.info("apply gain correction of %s" % gain_correct)
-        except Exception:
+        except Exception as exc:
+            LOGGER.error('could not retrieve track gain - %s' % exc)
             gain_correct = fallback_gain # fallback value
             if os.path.isfile(analysis_file):
                 os.remove(analysis_file)
-                cachefile = self.__get_track_cache_filename(track_id, provider)
+                cachefile = self.__get_track_cache_filename(track_id, provider)
                 # reschedule analyze task to try again
-                asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider))
-        return gain_correct
+                asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider))
+        return round(gain_correct,2)
 
-    async def __fill_audio_buffer(self, buf, track_id, provider):
+    async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
         ''' get audio data from provider and write to buffer'''
         # fill the buffer with audio data
         # a tempfile is created so we can do audio analysis
-        try:
-            tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
-            finalfile = self.__get_track_cache_filename(track_id, provider)
-            fd = open(tmpfile, 'wb')
-            async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
-                buf.write(chunk)
-                await buf.drain()
-                fd.write(chunk)
+        tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
+        fd = open(tmpfile, 'wb')
+        async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
+            buf.write(chunk)
             await buf.drain()
-            buf.write_eof()
-            fd.close()
-        except Exception as exc:
-            LOGGER.error(exc)
-        else:
-            # successfull completion
-            if os.path.isfile(tmpfile) and not os.path.isfile(finalfile):
-                shutil.move(tmpfile, finalfile)
-            asyncio.create_task(self.__analyze_track_audio(finalfile, track_id, provider))
-            LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
-        finally:
-            # always clean up temp file
-            if os.path.isfile(tmpfile):
-                of.remove(tmpfile)
+            fd.write(chunk)
+        await buf.drain()
+        buf.write_eof()
+        fd.close()
+        # successfull completion, send tmpfile to be processed in the background
+        #asyncio.create_task(self.__process_audio(tmpfile, track_id, provider))
+        run_async_background_task(self.mass.bg_executor, self.__analyze_audio, tmpfile, track_id, provider, content_type)
+        LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
+        return
 
     @staticmethod
     def __get_track_cache_filename(track_id, provider):
         ''' get filename for a track to use as cache file '''
         return os.path.join(AUDIO_CACHE_DIR, '%s_%s' %(provider, track_id.split(os.sep)[-1]))
 
+    @staticmethod
+    def __get_bs1770_binary():
+        ''' get the path to the bs1770 binary for the current OS '''
+        import platform
+        bs1770_binary = None
+        if platform.system() == "Windows":
+            bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain")
+        elif platform.system() == "Darwin":
+            # macos binary is x86_64 intel
+            bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain")
+        elif platform.system() == "Linux":
+            architecture = platform.machine()
+            if architecture.startswith('AMD64') or architecture.startswith('x86_64'):
+                bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain")
+            # TODO: build armhf binary
+        return bs1770_binary
 
     def load_providers(self):
         ''' dynamically load providers '''
index f381fa5a307a3c9d2b8b55526349e5130f5ed5b6..4423e683c768d3722e86410313d60f1abc008b06 100644 (file)
@@ -59,7 +59,10 @@ class ChromecastProvider(PlayerProvider):
     async def player_command(self, player_id, cmd:str, cmd_args=None):
         ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
         if cmd == 'play':
-            self._chromecasts[player_id].media_controller.play()
+            if self._chromecasts[player_id].media_controller.status.media_session_id:
+                self._chromecasts[player_id].media_controller.play()
+            else:
+                await self.__resume_queue(player_id)
         elif cmd == 'pause':
             self._chromecasts[player_id].media_controller.pause()
         elif cmd == 'stop':
@@ -67,20 +70,17 @@ class ChromecastProvider(PlayerProvider):
         elif cmd == 'next':
             self._chromecasts[player_id].media_controller.queue_next()
         elif cmd == 'previous':
-            self._chromecasts[player_id].media_controller.queue_previous()
+            self._chromecasts[player_id].media_controller.queue_prev()
         elif cmd == 'power' and cmd_args == 'off':
-            self._players[player_id].powered = False # power is not supported
-            await self.mass.player.update_player(self._players[player_id])
+            self._chromecasts[player_id].quit_app() # power is not supported so send quit app instead
         elif cmd == 'power':
-            self._players[player_id].powered = True # power is not supported
+            self._chromecasts[player_id].media_controller.launch()
         elif cmd == 'volume':
             self._chromecasts[player_id].set_volume(try_parse_int(cmd_args)/100)
         elif cmd == 'mute' and cmd_args == 'off':
             self._chromecasts[player_id].set_volume_muted(False)
         elif cmd == 'mute':
             self._chromecasts[player_id].set_volume_muted(True)
-        elif cmd == 'power':
-            pass # power is not supported on chromecast
 
     async def player_queue(self, player_id, offset=0, limit=50):
         ''' return the items in the player's queue '''
@@ -153,7 +153,7 @@ class ChromecastProvider(PlayerProvider):
                 startindex = 0
             elif queue_opt == 'next':
                 # play the new items after the current playing item (insert before current next item)
-                castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + plcastplayerayer.queue[:cur_queue_index]
+                castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index]
                 startindex = cur_queue_index
             else:
                 # overwrite the whole queue with new item(s)
@@ -169,11 +169,12 @@ class ChromecastProvider(PlayerProvider):
                     "items": castplayer.queue[:10]
             }
             await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+            await asyncio.sleep(1)
             # append the rest of the items in the queue in chunks
             for chunk in chunks(castplayer.queue[10:], 100):
-                await asyncio.sleep(1)
                 queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
                 await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+                await asyncio.sleep(0.1)
         elif queue_opt == 'add':
             # existing queue is playing: simply append items to the end of the queue (in small chunks)
             castplayer.queue = castplayer.queue + new_queue_items
@@ -181,7 +182,7 @@ class ChromecastProvider(PlayerProvider):
             for chunk in chunks(new_queue_items, 100):
                 queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
                 await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-                await asyncio.sleep(1)
+                await asyncio.sleep(0.1)
         elif queue_opt == 'next':
             # play the new items after the current playing item (insert before current next item)
             player.queue = castplayer.queue[:cur_queue_index] + new_queue_items + castplayer.queue[cur_queue_index:]
@@ -194,6 +195,34 @@ class ChromecastProvider(PlayerProvider):
             
     ### Provider specific (helper) methods #####
 
+    async def __resume_queue(self, player_id):
+        ''' resume queue play after power off '''
+        player = self._players[player_id]
+        castplayer = self._chromecasts[player_id]
+        media_controller = castplayer.media_controller
+        receiver_ctrl = media_controller._socket_client.receiver_controller
+        startindex = 0
+        if player.cur_item and player.cur_item.name:
+            for index, item in enumerate(castplayer.queue):
+                if item['media']['metadata']['title'] == player.cur_item.name:
+                    startindex = index
+                    break
+        queuedata = { 
+                "type": 'QUEUE_LOAD',
+                "repeatMode":  "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
+                "shuffle": player.shuffle_enabled,
+                "queueType": "PLAYLIST",
+                "startIndex":    startindex,    # Item index to play after this request or keep same item if undefined
+                "items": castplayer.queue[:10]
+        }
+        await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+        await asyncio.sleep(1)
+        # append the rest of the items in the queue in chunks
+        for chunk in chunks(castplayer.queue[10:], 100):
+            await asyncio.sleep(0.1)
+            queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
+            await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+        
     async def __send_player_queue(self, receiver_ctrl, media_controller, queuedata):
         '''send new data to the CC queue'''
         def app_launched_callback():
@@ -215,7 +244,7 @@ class ChromecastProvider(PlayerProvider):
         if caststatus:
             player.muted = caststatus.volume_muted
             player.volume_level = caststatus.volume_level * 100
-            player.powered = not caststatus.is_stand_by
+            player.powered = chromecast.media_controller.status.media_session_id != None
         if mediastatus:
             if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
                 player.state = PlayerState.Playing
index 75d38dc8d8eaaeaa740e26144808673dcd853b30..cb1e9700e8e688498e019dce8235358ecd8f1d54 100755 (executable)
@@ -11,6 +11,7 @@ from models import MediaType, media_type_from_string
 from functools import partial
 json_serializer = partial(json.dumps, default=lambda x: x.__dict__)
 import ssl
+import concurrent
 
 def setup(mass):
     ''' setup the module and read/apply config'''
@@ -267,7 +268,17 @@ class Web():
         resp = web.StreamResponse(status=200,
                                  reason='OK',
                                  headers={'Content-Type': 'audio/flac'})
+        if request.method.upper() == 'HEAD':
+            return resp
         await resp.prepare(request)
+        cancelled = False
         async for chunk in self.mass.player.get_audio_stream(track_id, provider):
-            await resp.write(chunk)
-        return resp
\ No newline at end of file
+            if cancelled:
+                continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators
+            try:
+                await resp.write(chunk)
+            except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError):
+                LOGGER.error('client disconnect?')
+                cancelled = True
+        if not cancelled:
+            return resp
\ No newline at end of file