speed improvements
authormarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Tue, 21 May 2019 22:22:59 +0000 (00:22 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Tue, 21 May 2019 22:22:59 +0000 (00:22 +0200)
music_assistant/database.py
music_assistant/main.py
music_assistant/modules/music.py
music_assistant/modules/musicproviders/qobuz.py
music_assistant/modules/musicproviders/spotify.py
music_assistant/modules/player.py
music_assistant/modules/playerproviders/chromecast.py
music_assistant/modules/web.py
music_assistant/utils.py
music_assistant/web/components/player.vue.js

index 7e1fcd611554b5d539cb71bf8683670338fc208f..3eb48809a4718408c279cc7353811af1fbed76fc 100755 (executable)
@@ -7,6 +7,7 @@ from utils import run_periodic, LOGGER, get_sort_name, try_parse_int
 from models import MediaType, Artist, Album, Track, Playlist
 from typing import List
 import aiosqlite
+import operator
 
 class Database():
 
@@ -184,7 +185,7 @@ class Database():
                 await db.execute(sql_query, (item_id,media_type, provider))
             await db.commit()
     
-    async def artists(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Artist]:
+    async def artists(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Artist]:
         ''' fetch artist records from table'''
         artists = []
         sql_query = 'SELECT * FROM artists'
@@ -193,23 +194,29 @@ class Database():
         sql_query += ' ORDER BY %s' % orderby
         if limit:
             sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
-        async with aiosqlite.connect(self.dbfile) as db:
-            async with db.execute(sql_query) as cursor:
-                db_rows = await cursor.fetchall()
-            for db_row in db_rows:
-                artist = Artist()
-                artist.item_id = db_row[0]
-                artist.name = db_row[1]
-                artist.sort_name = db_row[2]
-                artist.provider_ids = await self.__get_prov_ids(artist.item_id, MediaType.Artist, db)
-                artist.in_library = await self.__get_library_providers(artist.item_id, MediaType.Artist, db)
-                artist.external_ids = await self.__get_external_ids(artist.item_id, MediaType.Artist, db)
-                if fulldata:
-                    artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db)
-                    artist.tags = await self.__get_tags(artist.item_id, MediaType.Artist, db)
-                else:
-                    artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db, filter_key='image')
-                artists.append(artist)
+        if not db:
+            db = await aiosqlite.connect(self.dbfile)
+            should_close_db = True
+        else:
+            should_close_db = False
+        async with db.execute(sql_query) as cursor:
+            db_rows = await cursor.fetchall()
+        for db_row in db_rows:
+            artist = Artist()
+            artist.item_id = db_row[0]
+            artist.name = db_row[1]
+            artist.sort_name = db_row[2]
+            artist.provider_ids = await self.__get_prov_ids(artist.item_id, MediaType.Artist, db)
+            artist.in_library = await self.__get_library_providers(artist.item_id, MediaType.Artist, db)
+            artist.external_ids = await self.__get_external_ids(artist.item_id, MediaType.Artist, db)
+            if fulldata:
+                artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db)
+                artist.tags = await self.__get_tags(artist.item_id, MediaType.Artist, db)
+            else:
+                artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db, filter_key='image')
+            artists.append(artist)
+        if should_close_db:
+            await db.close()
         return artists
 
     async def artist(self, artist_id:int, fulldata=True) -> Artist:
@@ -255,7 +262,7 @@ class Database():
         LOGGER.info('added artist %s (%s) to database: %s' %(artist.name, artist.provider_ids, artist_id))
         return artist_id
     
-    async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Album]:
+    async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Album]:
         ''' fetch all album records from table'''
         albums = []
         sql_query = 'SELECT * FROM albums'
@@ -264,9 +271,13 @@ class Database():
         sql_query += ' ORDER BY %s' % orderby
         if limit:
             sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
-        async with aiosqlite.connect(self.dbfile) as db:
-            async with db.execute(sql_query) as cursor:
-                db_rows = await cursor.fetchall()
+        if not db:
+            db = await aiosqlite.connect(self.dbfile)
+            should_close_db = True
+        else:
+            should_close_db = False
+        async with db.execute(sql_query) as cursor:
+            db_rows = await cursor.fetchall()
             for db_row in db_rows:
                 album = Album()
                 album.item_id = db_row[0]
@@ -285,12 +296,14 @@ class Database():
                 else:
                     album.metadata = await self.__get_metadata(album.item_id, MediaType.Album, db, filter_key='image')
                 albums.append(album)
+        if should_close_db:
+            await db.close()
         return albums
 
-    async def album(self, album_id:int, fulldata=True) -> Album:
+    async def album(self, album_id:int, fulldata=True, db=None) -> Album:
         ''' get album record by id '''
         album_id = try_parse_int(album_id)
-        albums = await self.albums('WHERE album_id = %s' % album_id, fulldata=fulldata)
+        albums = await self.albums('WHERE album_id = %s' % album_id, fulldata=fulldata, db=db)
         if not albums:
             return None
         return albums[0]
@@ -333,7 +346,7 @@ class Database():
         LOGGER.info('added album %s (%s) to database: %s' %(album.name, album.provider_ids, album_id))
         return album_id
 
-    async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Track]:
+    async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Track]:
         ''' fetch all track records from table'''
         tracks = []
         sql_query = 'SELECT * FROM tracks'
@@ -342,25 +355,31 @@ class Database():
         sql_query += ' ORDER BY %s' % orderby
         if limit:
             sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
-        async with aiosqlite.connect(self.dbfile) as db:
-            async with db.execute(sql_query) as cursor:
-                db_rows = await cursor.fetchall()
-            for db_row in db_rows:
-                track = Track()
-                track.item_id = db_row[0]
-                track.name = db_row[1]
-                track.album = await self.album(db_row[2], fulldata=fulldata)
-                track.duration = db_row[3]
-                track.version = db_row[4]
-                track.disc_number = db_row[5]
-                track.track_number = db_row[6]
-                track.metadata = await self.__get_metadata(track.item_id, MediaType.Track, db)
-                track.tags = await self.__get_tags(track.item_id, MediaType.Track, db)
-                track.provider_ids = await self.__get_prov_ids(track.item_id, MediaType.Track, db)
-                track.in_library = await self.__get_library_providers(track.item_id, MediaType.Track, db)
-                track.artists = await self.__get_track_artists(track.item_id, db, fulldata=fulldata)
-                track.external_ids = await self.__get_external_ids(track.item_id, MediaType.Track, db)
-                tracks.append(track)
+        if not db:
+            db = await aiosqlite.connect(self.dbfile)
+            should_close_db = True
+        else:
+            should_close_db = False
+        async with db.execute(sql_query) as cursor:
+            db_rows = await cursor.fetchall()
+        for db_row in db_rows:
+            track = Track()
+            track.item_id = db_row[0]
+            track.name = db_row[1]
+            track.album = await self.album(db_row[2], fulldata=fulldata, db=db)
+            track.duration = db_row[3]
+            track.version = db_row[4]
+            track.disc_number = db_row[5]
+            track.track_number = db_row[6]
+            track.metadata = await self.__get_metadata(track.item_id, MediaType.Track, db)
+            track.tags = await self.__get_tags(track.item_id, MediaType.Track, db)
+            track.provider_ids = await self.__get_prov_ids(track.item_id, MediaType.Track, db)
+            track.in_library = await self.__get_library_providers(track.item_id, MediaType.Track, db)
+            track.artists = await self.__get_track_artists(track.item_id, db, fulldata=fulldata)
+            track.external_ids = await self.__get_external_ids(track.item_id, MediaType.Track, db)
+            tracks.append(track)
+        if should_close_db:
+            await db.close()
         return tracks
 
     async def track(self, track_id:int, fulldata=True) -> Track:
@@ -429,16 +448,19 @@ class Database():
         ''' get playlist tracks for the given playlist_id '''
         playlist_id = try_parse_int(playlist_id)
         playlist_tracks = []
-        sql_query = 'SELECT track_id, position FROM playlist_tracks WHERE playlist_id = ? ORDER BY %s' % orderby
+        sql_query = 'SELECT track_id, position FROM playlist_tracks WHERE playlist_id = ? ORDER BY track_id'
         if limit:
             sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
         async with aiosqlite.connect(self.dbfile) as db:
             async with db.execute(sql_query, (playlist_id,)) as cursor:
                 db_rows = await cursor.fetchall()
-        for db_row in db_rows:
-            playlist_track = await self.track(db_row[0], fulldata=fulldata)
-            playlist_track.position = db_row[1]
-            playlist_tracks.append(playlist_track)
+            playlist_track_ids = [str(item[0]) for item in db_rows]
+            sql_query = 'WHERE track_id in (%s)' % ','.join(playlist_track_ids)
+            tracks = await self.tracks(sql_query, orderby='track_id', db=db)
+            for index, track in enumerate(tracks):
+                track.position = db_rows[index][1]
+                playlist_tracks.append(track)
+            playlist_tracks = sorted(playlist_tracks, key=operator.attrgetter(orderby), reverse=False)
         return playlist_tracks
 
     async def add_playlist_track(self, playlist_id:int, track_id, position):
@@ -516,14 +538,8 @@ class Database():
     
     async def __get_track_artists(self, track_id, db, fulldata=False) -> List[Artist]:
         ''' get artists for track '''
-        artists = []
-        sql_query = 'SELECT artist_id FROM track_artists WHERE track_id = ?'
-        async with db.execute(sql_query, (track_id,)) as cursor:
-            db_rows = await cursor.fetchall()
-        for db_row in db_rows:
-            artist = await self.artist(db_row[0], fulldata=fulldata)
-            artists.append(artist)
-        return artists
+        sql_query = 'WHERE artist_id in (SELECT artist_id FROM track_artists WHERE track_id = %s)' % track_id
+        return await self.artists(sql_query, db=db)
     
     async def __add_external_ids(self, item_id, media_type, external_ids, db):
         ''' add or update external_ids'''
index e3e0d8d281ec1129960bc5073eca4d0c0623cb01..fc09a5117eee1c527c44ec402635e1b4d08313c4 100755 (executable)
@@ -30,7 +30,8 @@ class Main():
         self.datapath = datapath
         self.parse_config()
         self.event_loop = asyncio.get_event_loop()
-        self.bg_executor = ThreadPoolExecutor(max_workers=5)
+        self.bg_executor = ThreadPoolExecutor()
+        self.event_loop.set_default_executor(self.bg_executor)
         self.event_listeners = {}
 
         # init database and metadata modules
index 2053457fc880026249c1451ff751bfd7e27c4eff..b8b9ecc70e8aa595372af41d31bde53c22c2dda2 100755 (executable)
@@ -147,20 +147,14 @@ class Music():
             playlist = await self.mass.db.playlist(playlist_id)
         if playlist and playlist.is_editable:
             # database synced playlist, return tracks from db...
-            return await self.mass.db.playlist_tracks(playlist.item_id, offset=offset, limit=limit)
+            return await self.mass.db.playlist_tracks(
+                    playlist.item_id, offset=offset, limit=limit)
         else:
             # return playlist tracks from provider
-            items = []
-            playlist = await self.playlist(playlist_id)
-            for prov_mapping in playlist.provider_ids:
-                prov_id = prov_mapping['provider']
-                prov_item_id = prov_mapping['item_id']
-                prov_obj = self.providers[prov_id]
-                items += await prov_obj.playlist_tracks(prov_item_id, offset=offset, limit=limit)
-                if items:
-                    break
-            items = list(toolz.unique(items, key=operator.attrgetter('item_id')))
-            return items
+            playlist = await self.playlist(playlist_id, provider)
+            prov = playlist.provider_ids[0]
+            return await self.providers[prov['provider']].playlist_tracks(
+                    prov['item_id'], offset=offset, limit=limit)
 
     async def search(self, searchquery, media_types:List[MediaType], limit=10, online=False) -> dict:
         ''' search database or providers '''
index 51ec96352ff9c6ada40258877b8039791ea4fc0d..69a8413022f7b01e6f11d9675e5e8b56c3ed9b41 100644 (file)
@@ -259,13 +259,20 @@ class QobuzProvider(MusicProvider):
     async def get_audio_stream(self, track_id):
         ''' get audio stream for a track '''
         params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'}
-        streamdetails = await self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True)
-        async with self.http_session.get(streamdetails['url']) as resp:
-            while True:
-                chunk = await resp.content.read(2000000)
-                if not chunk:
-                    break
-                yield chunk
+        # we are called from other thread
+        streamdetails_future = asyncio.run_coroutine_threadsafe(
+            self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True),
+            self.mass.event_loop
+        )
+        streamdetails = streamdetails_future.result()
+        async with aiohttp.ClientSession(loop=asyncio.get_event_loop(), connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
+            async with session.get(streamdetails['url']) as resp:
+                while True:
+                    chunk = await resp.content.read(512000)
+                    if not chunk:
+                        break
+                    yield chunk
+                    await asyncio.sleep(0.1)
         LOGGER.info("end of stream for track_id %s" % track_id)
     
     async def __parse_artist(self, artist_obj):
index 1871b374c7b704a6dba4bd9254b0503483b16996..ef92f9e92153b3e9db681195292b76c7087a77d1 100644 (file)
@@ -252,22 +252,13 @@ class SpotifyProvider(MusicProvider):
         spotty = self.get_spotty_binary()
         args = ['-n', 'temp', '-u', self._username, '-p', self._password, '--pass-through', '--single-track', track_id]
         process = await asyncio.create_subprocess_exec(spotty, *args, stdout=asyncio.subprocess.PIPE)
-        try:
-            while not process.stdout.at_eof():
-                chunk = await process.stdout.read(2000000)
-                if chunk:
-                    yield chunk
-                else:
-                    break
-        except (GeneratorExit, Exception):
-            while True:
-                if not await process.stdout.read(2000000):
-                    break
-            await process.wait()
-            LOGGER.info("stream cancelled for track_id %s" % track_id)
-        else:
-            await process.wait()
-            LOGGER.info("end of stream for track_id %s" % track_id)
+        while not process.stdout.at_eof():
+            chunk = await process.stdout.read(512000)
+            if not chunk:
+                break
+            yield chunk
+            await asyncio.sleep(0.1)
+        LOGGER.info("end of stream for track_id %s" % track_id)
         
     async def __parse_artist(self, artist_obj):
         ''' parse spotify artist object to generic layout '''
index 87529ff4da76b9ac66ee0431dd01933329118e01..d3161507eef608bf7c9b38384f50bae67d7e251a 100755 (executable)
@@ -187,7 +187,8 @@ class Player():
 
     async def trigger_update(self, player_id):
         ''' manually trigger update for a player '''
-        await self.update_player(self._players[player_id])
+        if player_id in self._players:
+            await self.update_player(self._players[player_id])
     
     async def update_player(self, player_details):
         ''' update (or add) player '''
@@ -368,6 +369,18 @@ class Player():
         return await player_prov.player_queue(player_id, offset=offset, limit=limit)
 
     async def get_audio_stream(self, track_id, provider, player_id=None):
+        ''' get audio stream for a track '''
+        queue = asyncio.Queue()
+        run_async_background_task(
+            self.mass.bg_executor, self.__get_audio_stream, queue, track_id, provider, player_id)
+        while True:
+            chunk = await queue.get()
+            if not chunk:
+                break
+            yield chunk
+            queue.task_done()
+
+    async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None):
         ''' get audio stream from provider and apply additional effects/processing where/if needed'''
         input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
         cachefile = self.__get_track_cache_filename(track_id, provider)
@@ -391,15 +404,17 @@ class Player():
             LOGGER.info("Running sox with args: %s" % args)
             process = await asyncio.create_subprocess_shell(args,
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-            buffer_task = asyncio.create_task(
-                    self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
-        # yield the chunks from stdout
+            buffer_task = asyncio.get_event_loop().create_task(
+                     self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+        # put chunks from stdout into queue
         while not process.stdout.at_eof():
-            chunk = await process.stdout.read(2000000)
+            chunk = await process.stdout.read(512000)
+            await audioqueue.put(chunk)
             if not chunk:
                 break
-            yield chunk
+            await asyncio.sleep(0.1)
         await process.wait()
+        await audioqueue.put('') # indicate EOF
         LOGGER.info("streaming of track_id %s completed" % track_id)
 
     async def __get_player_sox_options(self, track_id, provider, player_id):
@@ -412,7 +427,11 @@ class Player():
             max_sample_rate = try_parse_int(self.mass.config['player_settings'][player_id]['max_sample_rate'])
             if max_sample_rate:
                 quality = TrackQuality.LOSSY_MP3
-                track = await self.mass.music.track(track_id, provider)
+                track_future = asyncio.run_coroutine_threadsafe(
+                    self.mass.music.track(track_id, provider),
+                    self.mass.event_loop
+                )
+                track = track_future.result()
                 for item in track.provider_ids:
                     if item['provider'] == provider and item['item_id'] == track_id:
                         quality = item['quality']
@@ -475,9 +494,9 @@ class Player():
             gain_correct = fallback_gain # fallback value
             if os.path.isfile(analysis_file):
                 os.remove(analysis_file)
-                # cachefile = self.__get_track_cache_filename(track_id, provider)
                 # reschedule analyze task to try again
-                # asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider))
+                cachefile = self.__get_track_cache_filename(track_id, provider)
+                self.mass.event_loop.create_task(self.__analyze_audio(cachefile, track_id, provider, 'flac'))
         return round(gain_correct,2)
 
     async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
@@ -493,9 +512,8 @@ class Player():
         await buf.drain()
         buf.write_eof()
         fd.close()
-        # successfull completion, send tmpfile to be processed in the background
-        #asyncio.create_task(self.__process_audio(tmpfile, track_id, provider))
-        run_async_background_task(self.mass.bg_executor, self.__analyze_audio, tmpfile, track_id, provider, content_type)
+        # successfull completion, send tmpfile to be processed in the background in main loop
+        self.mass.event_loop.create_task(self.__analyze_audio(tmpfile, track_id, provider, content_type))
         LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
         return
 
index c53d8e48686135ff93ce946d5a608c86614f55eb..5592e9d384e28ff6f281b6e26b5839e0b01f9aea 100644 (file)
@@ -262,7 +262,7 @@ class ChromecastProvider(PlayerProvider):
                 player.state = PlayerState.Stopped
                 player.powered = player.powered
             player.cur_item = await self.__parse_track(mediastatus)
-            player.cur_item_time = try_parse_int(mediastatus.current_time)
+            player.cur_item_time =  chromecast.media_controller.status.adjusted_current_time
         await self.mass.player.update_player(player)
 
     async def __parse_track(self, mediastatus):
index 9930b4d80cfe92e0bd78324a4ca47d6d692ef0b3..e3dbfc24b5d4994d86ba5ef4a30f682a5c940549 100755 (executable)
@@ -269,20 +269,8 @@ class Web():
         resp = web.StreamResponse(status=200,
                                  reason='OK',
                                  headers={'Content-Type': 'audio/flac'})
-        try:
-            await resp.prepare(request)
-            if request.method.upper() == 'HEAD':
-                return resp
-            cancelled = False
+        await resp.prepare(request)
+        if request.method.upper() != 'HEAD':
             async for chunk in self.mass.player.get_audio_stream(track_id, provider, player_id):
-                if cancelled:
-                    continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators
-                try:
-                    await resp.write(chunk)
-                except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError):
-                    LOGGER.error('client disconnect?')
-                    cancelled = True
-            if not cancelled:
-                return resp
-        except AttributeError:
-            LOGGER.error('client disconnect?')
\ No newline at end of file
+                await resp.write(chunk)
+        return resp
\ No newline at end of file
index 7a188fd2b6d80bbb404c14cde71b9cdbc209d1f3..6e56c61525af857183566b06af098dfeeaf64d39 100755 (executable)
@@ -30,13 +30,13 @@ def run_background_task(executor, corofn, *args):
 def run_async_background_task(executor, corofn, *args):
     ''' run async task in background '''
     def run_task(corofn, *args):
-        loop = asyncio.new_event_loop()
-        try:
-            coro = corofn(*args)
-            asyncio.set_event_loop(loop)
-            return loop.run_until_complete(coro)
-        finally:
-            loop.close()
+        LOGGER.info('running %s in background task' % corofn.__name__)
+        new_loop = asyncio.new_event_loop()
+        coro = corofn(*args)
+        res = new_loop.run_until_complete(coro)
+        new_loop.close()
+        LOGGER.info('completed %s in background task' % corofn.__name__)
+        return res
     return asyncio.get_event_loop().run_in_executor(executor, run_task, corofn, *args)
 
 def get_sort_name(name):
index 4dc6e81d8a5c44cb1a1d44d094e8740cceba8bba..ba8e71f92889c4e27b4dd31b180ef6b1e36b43d7 100755 (executable)
@@ -188,6 +188,7 @@ Vue.component("player", {
     },
     playItem(item, queueopt) {
       console.log('playItem: ' + item);
+      this.$globals.loading = true;
       var api_url = 'api/players/' + this.active_player_id + '/play_media/' + item.media_type + '/' + item.item_id + '/' + queueopt;
       axios
       .get(api_url, {
@@ -197,9 +198,11 @@ Vue.component("player", {
       })
       .then(result => {
         console.log(result.data);
+        this.$globals.loading = false;
       })
       .catch(error => {
         console.log("error", error);
+        this.$globals.loading = false;
       });
     },
     switchPlayer (new_player_id) {