From 1987114ee0b59fcd9e3479a60020bd4e00ad8ce7 Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Wed, 22 May 2019 00:22:59 +0200 Subject: [PATCH] speed improvements --- music_assistant/database.py | 130 ++++++++++-------- music_assistant/main.py | 3 +- music_assistant/modules/music.py | 18 +-- .../modules/musicproviders/qobuz.py | 21 ++- .../modules/musicproviders/spotify.py | 23 +--- music_assistant/modules/player.py | 42 ++++-- .../modules/playerproviders/chromecast.py | 2 +- music_assistant/modules/web.py | 20 +-- music_assistant/utils.py | 14 +- music_assistant/web/components/player.vue.js | 3 + 10 files changed, 147 insertions(+), 129 deletions(-) diff --git a/music_assistant/database.py b/music_assistant/database.py index 7e1fcd61..3eb48809 100755 --- a/music_assistant/database.py +++ b/music_assistant/database.py @@ -7,6 +7,7 @@ from utils import run_periodic, LOGGER, get_sort_name, try_parse_int from models import MediaType, Artist, Album, Track, Playlist from typing import List import aiosqlite +import operator class Database(): @@ -184,7 +185,7 @@ class Database(): await db.execute(sql_query, (item_id,media_type, provider)) await db.commit() - async def artists(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Artist]: + async def artists(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Artist]: ''' fetch artist records from table''' artists = [] sql_query = 'SELECT * FROM artists' @@ -193,23 +194,29 @@ class Database(): sql_query += ' ORDER BY %s' % orderby if limit: sql_query += ' LIMIT %d OFFSET %d' %(limit, offset) - async with aiosqlite.connect(self.dbfile) as db: - async with db.execute(sql_query) as cursor: - db_rows = await cursor.fetchall() - for db_row in db_rows: - artist = Artist() - artist.item_id = db_row[0] - artist.name = db_row[1] - artist.sort_name = db_row[2] - artist.provider_ids = await self.__get_prov_ids(artist.item_id, MediaType.Artist, db) - artist.in_library = await self.__get_library_providers(artist.item_id, MediaType.Artist, db) - artist.external_ids = await self.__get_external_ids(artist.item_id, MediaType.Artist, db) - if fulldata: - artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db) - artist.tags = await self.__get_tags(artist.item_id, MediaType.Artist, db) - else: - artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db, filter_key='image') - artists.append(artist) + if not db: + db = await aiosqlite.connect(self.dbfile) + should_close_db = True + else: + should_close_db = False + async with db.execute(sql_query) as cursor: + db_rows = await cursor.fetchall() + for db_row in db_rows: + artist = Artist() + artist.item_id = db_row[0] + artist.name = db_row[1] + artist.sort_name = db_row[2] + artist.provider_ids = await self.__get_prov_ids(artist.item_id, MediaType.Artist, db) + artist.in_library = await self.__get_library_providers(artist.item_id, MediaType.Artist, db) + artist.external_ids = await self.__get_external_ids(artist.item_id, MediaType.Artist, db) + if fulldata: + artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db) + artist.tags = await self.__get_tags(artist.item_id, MediaType.Artist, db) + else: + artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db, filter_key='image') + artists.append(artist) + if should_close_db: + await db.close() return artists async def artist(self, artist_id:int, fulldata=True) -> Artist: @@ -255,7 +262,7 @@ class Database(): LOGGER.info('added artist %s (%s) to database: %s' %(artist.name, artist.provider_ids, artist_id)) return artist_id - async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Album]: + async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Album]: ''' fetch all album records from table''' albums = [] sql_query = 'SELECT * FROM albums' @@ -264,9 +271,13 @@ class Database(): sql_query += ' ORDER BY %s' % orderby if limit: sql_query += ' LIMIT %d OFFSET %d' %(limit, offset) - async with aiosqlite.connect(self.dbfile) as db: - async with db.execute(sql_query) as cursor: - db_rows = await cursor.fetchall() + if not db: + db = await aiosqlite.connect(self.dbfile) + should_close_db = True + else: + should_close_db = False + async with db.execute(sql_query) as cursor: + db_rows = await cursor.fetchall() for db_row in db_rows: album = Album() album.item_id = db_row[0] @@ -285,12 +296,14 @@ class Database(): else: album.metadata = await self.__get_metadata(album.item_id, MediaType.Album, db, filter_key='image') albums.append(album) + if should_close_db: + await db.close() return albums - async def album(self, album_id:int, fulldata=True) -> Album: + async def album(self, album_id:int, fulldata=True, db=None) -> Album: ''' get album record by id ''' album_id = try_parse_int(album_id) - albums = await self.albums('WHERE album_id = %s' % album_id, fulldata=fulldata) + albums = await self.albums('WHERE album_id = %s' % album_id, fulldata=fulldata, db=db) if not albums: return None return albums[0] @@ -333,7 +346,7 @@ class Database(): LOGGER.info('added album %s (%s) to database: %s' %(album.name, album.provider_ids, album_id)) return album_id - async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Track]: + async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Track]: ''' fetch all track records from table''' tracks = [] sql_query = 'SELECT * FROM tracks' @@ -342,25 +355,31 @@ class Database(): sql_query += ' ORDER BY %s' % orderby if limit: sql_query += ' LIMIT %d OFFSET %d' %(limit, offset) - async with aiosqlite.connect(self.dbfile) as db: - async with db.execute(sql_query) as cursor: - db_rows = await cursor.fetchall() - for db_row in db_rows: - track = Track() - track.item_id = db_row[0] - track.name = db_row[1] - track.album = await self.album(db_row[2], fulldata=fulldata) - track.duration = db_row[3] - track.version = db_row[4] - track.disc_number = db_row[5] - track.track_number = db_row[6] - track.metadata = await self.__get_metadata(track.item_id, MediaType.Track, db) - track.tags = await self.__get_tags(track.item_id, MediaType.Track, db) - track.provider_ids = await self.__get_prov_ids(track.item_id, MediaType.Track, db) - track.in_library = await self.__get_library_providers(track.item_id, MediaType.Track, db) - track.artists = await self.__get_track_artists(track.item_id, db, fulldata=fulldata) - track.external_ids = await self.__get_external_ids(track.item_id, MediaType.Track, db) - tracks.append(track) + if not db: + db = await aiosqlite.connect(self.dbfile) + should_close_db = True + else: + should_close_db = False + async with db.execute(sql_query) as cursor: + db_rows = await cursor.fetchall() + for db_row in db_rows: + track = Track() + track.item_id = db_row[0] + track.name = db_row[1] + track.album = await self.album(db_row[2], fulldata=fulldata, db=db) + track.duration = db_row[3] + track.version = db_row[4] + track.disc_number = db_row[5] + track.track_number = db_row[6] + track.metadata = await self.__get_metadata(track.item_id, MediaType.Track, db) + track.tags = await self.__get_tags(track.item_id, MediaType.Track, db) + track.provider_ids = await self.__get_prov_ids(track.item_id, MediaType.Track, db) + track.in_library = await self.__get_library_providers(track.item_id, MediaType.Track, db) + track.artists = await self.__get_track_artists(track.item_id, db, fulldata=fulldata) + track.external_ids = await self.__get_external_ids(track.item_id, MediaType.Track, db) + tracks.append(track) + if should_close_db: + await db.close() return tracks async def track(self, track_id:int, fulldata=True) -> Track: @@ -429,16 +448,19 @@ class Database(): ''' get playlist tracks for the given playlist_id ''' playlist_id = try_parse_int(playlist_id) playlist_tracks = [] - sql_query = 'SELECT track_id, position FROM playlist_tracks WHERE playlist_id = ? ORDER BY %s' % orderby + sql_query = 'SELECT track_id, position FROM playlist_tracks WHERE playlist_id = ? ORDER BY track_id' if limit: sql_query += ' LIMIT %d OFFSET %d' %(limit, offset) async with aiosqlite.connect(self.dbfile) as db: async with db.execute(sql_query, (playlist_id,)) as cursor: db_rows = await cursor.fetchall() - for db_row in db_rows: - playlist_track = await self.track(db_row[0], fulldata=fulldata) - playlist_track.position = db_row[1] - playlist_tracks.append(playlist_track) + playlist_track_ids = [str(item[0]) for item in db_rows] + sql_query = 'WHERE track_id in (%s)' % ','.join(playlist_track_ids) + tracks = await self.tracks(sql_query, orderby='track_id', db=db) + for index, track in enumerate(tracks): + track.position = db_rows[index][1] + playlist_tracks.append(track) + playlist_tracks = sorted(playlist_tracks, key=operator.attrgetter(orderby), reverse=False) return playlist_tracks async def add_playlist_track(self, playlist_id:int, track_id, position): @@ -516,14 +538,8 @@ class Database(): async def __get_track_artists(self, track_id, db, fulldata=False) -> List[Artist]: ''' get artists for track ''' - artists = [] - sql_query = 'SELECT artist_id FROM track_artists WHERE track_id = ?' - async with db.execute(sql_query, (track_id,)) as cursor: - db_rows = await cursor.fetchall() - for db_row in db_rows: - artist = await self.artist(db_row[0], fulldata=fulldata) - artists.append(artist) - return artists + sql_query = 'WHERE artist_id in (SELECT artist_id FROM track_artists WHERE track_id = %s)' % track_id + return await self.artists(sql_query, db=db) async def __add_external_ids(self, item_id, media_type, external_ids, db): ''' add or update external_ids''' diff --git a/music_assistant/main.py b/music_assistant/main.py index e3e0d8d2..fc09a511 100755 --- a/music_assistant/main.py +++ b/music_assistant/main.py @@ -30,7 +30,8 @@ class Main(): self.datapath = datapath self.parse_config() self.event_loop = asyncio.get_event_loop() - self.bg_executor = ThreadPoolExecutor(max_workers=5) + self.bg_executor = ThreadPoolExecutor() + self.event_loop.set_default_executor(self.bg_executor) self.event_listeners = {} # init database and metadata modules diff --git a/music_assistant/modules/music.py b/music_assistant/modules/music.py index 2053457f..b8b9ecc7 100755 --- a/music_assistant/modules/music.py +++ b/music_assistant/modules/music.py @@ -147,20 +147,14 @@ class Music(): playlist = await self.mass.db.playlist(playlist_id) if playlist and playlist.is_editable: # database synced playlist, return tracks from db... - return await self.mass.db.playlist_tracks(playlist.item_id, offset=offset, limit=limit) + return await self.mass.db.playlist_tracks( + playlist.item_id, offset=offset, limit=limit) else: # return playlist tracks from provider - items = [] - playlist = await self.playlist(playlist_id) - for prov_mapping in playlist.provider_ids: - prov_id = prov_mapping['provider'] - prov_item_id = prov_mapping['item_id'] - prov_obj = self.providers[prov_id] - items += await prov_obj.playlist_tracks(prov_item_id, offset=offset, limit=limit) - if items: - break - items = list(toolz.unique(items, key=operator.attrgetter('item_id'))) - return items + playlist = await self.playlist(playlist_id, provider) + prov = playlist.provider_ids[0] + return await self.providers[prov['provider']].playlist_tracks( + prov['item_id'], offset=offset, limit=limit) async def search(self, searchquery, media_types:List[MediaType], limit=10, online=False) -> dict: ''' search database or providers ''' diff --git a/music_assistant/modules/musicproviders/qobuz.py b/music_assistant/modules/musicproviders/qobuz.py index 51ec9635..69a84130 100644 --- a/music_assistant/modules/musicproviders/qobuz.py +++ b/music_assistant/modules/musicproviders/qobuz.py @@ -259,13 +259,20 @@ class QobuzProvider(MusicProvider): async def get_audio_stream(self, track_id): ''' get audio stream for a track ''' params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'} - streamdetails = await self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True) - async with self.http_session.get(streamdetails['url']) as resp: - while True: - chunk = await resp.content.read(2000000) - if not chunk: - break - yield chunk + # we are called from other thread + streamdetails_future = asyncio.run_coroutine_threadsafe( + self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True), + self.mass.event_loop + ) + streamdetails = streamdetails_future.result() + async with aiohttp.ClientSession(loop=asyncio.get_event_loop(), connector=aiohttp.TCPConnector(verify_ssl=False)) as session: + async with session.get(streamdetails['url']) as resp: + while True: + chunk = await resp.content.read(512000) + if not chunk: + break + yield chunk + await asyncio.sleep(0.1) LOGGER.info("end of stream for track_id %s" % track_id) async def __parse_artist(self, artist_obj): diff --git a/music_assistant/modules/musicproviders/spotify.py b/music_assistant/modules/musicproviders/spotify.py index 1871b374..ef92f9e9 100644 --- a/music_assistant/modules/musicproviders/spotify.py +++ b/music_assistant/modules/musicproviders/spotify.py @@ -252,22 +252,13 @@ class SpotifyProvider(MusicProvider): spotty = self.get_spotty_binary() args = ['-n', 'temp', '-u', self._username, '-p', self._password, '--pass-through', '--single-track', track_id] process = await asyncio.create_subprocess_exec(spotty, *args, stdout=asyncio.subprocess.PIPE) - try: - while not process.stdout.at_eof(): - chunk = await process.stdout.read(2000000) - if chunk: - yield chunk - else: - break - except (GeneratorExit, Exception): - while True: - if not await process.stdout.read(2000000): - break - await process.wait() - LOGGER.info("stream cancelled for track_id %s" % track_id) - else: - await process.wait() - LOGGER.info("end of stream for track_id %s" % track_id) + while not process.stdout.at_eof(): + chunk = await process.stdout.read(512000) + if not chunk: + break + yield chunk + await asyncio.sleep(0.1) + LOGGER.info("end of stream for track_id %s" % track_id) async def __parse_artist(self, artist_obj): ''' parse spotify artist object to generic layout ''' diff --git a/music_assistant/modules/player.py b/music_assistant/modules/player.py index 87529ff4..d3161507 100755 --- a/music_assistant/modules/player.py +++ b/music_assistant/modules/player.py @@ -187,7 +187,8 @@ class Player(): async def trigger_update(self, player_id): ''' manually trigger update for a player ''' - await self.update_player(self._players[player_id]) + if player_id in self._players: + await self.update_player(self._players[player_id]) async def update_player(self, player_details): ''' update (or add) player ''' @@ -368,6 +369,18 @@ class Player(): return await player_prov.player_queue(player_id, offset=offset, limit=limit) async def get_audio_stream(self, track_id, provider, player_id=None): + ''' get audio stream for a track ''' + queue = asyncio.Queue() + run_async_background_task( + self.mass.bg_executor, self.__get_audio_stream, queue, track_id, provider, player_id) + while True: + chunk = await queue.get() + if not chunk: + break + yield chunk + queue.task_done() + + async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) cachefile = self.__get_track_cache_filename(track_id, provider) @@ -391,15 +404,17 @@ class Player(): LOGGER.info("Running sox with args: %s" % args) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - buffer_task = asyncio.create_task( - self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) - # yield the chunks from stdout + buffer_task = asyncio.get_event_loop().create_task( + self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) + # put chunks from stdout into queue while not process.stdout.at_eof(): - chunk = await process.stdout.read(2000000) + chunk = await process.stdout.read(512000) + await audioqueue.put(chunk) if not chunk: break - yield chunk + await asyncio.sleep(0.1) await process.wait() + await audioqueue.put('') # indicate EOF LOGGER.info("streaming of track_id %s completed" % track_id) async def __get_player_sox_options(self, track_id, provider, player_id): @@ -412,7 +427,11 @@ class Player(): max_sample_rate = try_parse_int(self.mass.config['player_settings'][player_id]['max_sample_rate']) if max_sample_rate: quality = TrackQuality.LOSSY_MP3 - track = await self.mass.music.track(track_id, provider) + track_future = asyncio.run_coroutine_threadsafe( + self.mass.music.track(track_id, provider), + self.mass.event_loop + ) + track = track_future.result() for item in track.provider_ids: if item['provider'] == provider and item['item_id'] == track_id: quality = item['quality'] @@ -475,9 +494,9 @@ class Player(): gain_correct = fallback_gain # fallback value if os.path.isfile(analysis_file): os.remove(analysis_file) - # cachefile = self.__get_track_cache_filename(track_id, provider) # reschedule analyze task to try again - # asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider)) + cachefile = self.__get_track_cache_filename(track_id, provider) + self.mass.event_loop.create_task(self.__analyze_audio(cachefile, track_id, provider, 'flac')) return round(gain_correct,2) async def __fill_audio_buffer(self, buf, track_id, provider, content_type): @@ -493,9 +512,8 @@ class Player(): await buf.drain() buf.write_eof() fd.close() - # successfull completion, send tmpfile to be processed in the background - #asyncio.create_task(self.__process_audio(tmpfile, track_id, provider)) - run_async_background_task(self.mass.bg_executor, self.__analyze_audio, tmpfile, track_id, provider, content_type) + # successfull completion, send tmpfile to be processed in the background in main loop + self.mass.event_loop.create_task(self.__analyze_audio(tmpfile, track_id, provider, content_type)) LOGGER.info("fill_audio_buffer complete for track %s" % track_id) return diff --git a/music_assistant/modules/playerproviders/chromecast.py b/music_assistant/modules/playerproviders/chromecast.py index c53d8e48..5592e9d3 100644 --- a/music_assistant/modules/playerproviders/chromecast.py +++ b/music_assistant/modules/playerproviders/chromecast.py @@ -262,7 +262,7 @@ class ChromecastProvider(PlayerProvider): player.state = PlayerState.Stopped player.powered = player.powered player.cur_item = await self.__parse_track(mediastatus) - player.cur_item_time = try_parse_int(mediastatus.current_time) + player.cur_item_time = chromecast.media_controller.status.adjusted_current_time await self.mass.player.update_player(player) async def __parse_track(self, mediastatus): diff --git a/music_assistant/modules/web.py b/music_assistant/modules/web.py index 9930b4d8..e3dbfc24 100755 --- a/music_assistant/modules/web.py +++ b/music_assistant/modules/web.py @@ -269,20 +269,8 @@ class Web(): resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'}) - try: - await resp.prepare(request) - if request.method.upper() == 'HEAD': - return resp - cancelled = False + await resp.prepare(request) + if request.method.upper() != 'HEAD': async for chunk in self.mass.player.get_audio_stream(track_id, provider, player_id): - if cancelled: - continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators - try: - await resp.write(chunk) - except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError): - LOGGER.error('client disconnect?') - cancelled = True - if not cancelled: - return resp - except AttributeError: - LOGGER.error('client disconnect?') \ No newline at end of file + await resp.write(chunk) + return resp \ No newline at end of file diff --git a/music_assistant/utils.py b/music_assistant/utils.py index 7a188fd2..6e56c615 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -30,13 +30,13 @@ def run_background_task(executor, corofn, *args): def run_async_background_task(executor, corofn, *args): ''' run async task in background ''' def run_task(corofn, *args): - loop = asyncio.new_event_loop() - try: - coro = corofn(*args) - asyncio.set_event_loop(loop) - return loop.run_until_complete(coro) - finally: - loop.close() + LOGGER.info('running %s in background task' % corofn.__name__) + new_loop = asyncio.new_event_loop() + coro = corofn(*args) + res = new_loop.run_until_complete(coro) + new_loop.close() + LOGGER.info('completed %s in background task' % corofn.__name__) + return res return asyncio.get_event_loop().run_in_executor(executor, run_task, corofn, *args) def get_sort_name(name): diff --git a/music_assistant/web/components/player.vue.js b/music_assistant/web/components/player.vue.js index 4dc6e81d..ba8e71f9 100755 --- a/music_assistant/web/components/player.vue.js +++ b/music_assistant/web/components/player.vue.js @@ -188,6 +188,7 @@ Vue.component("player", { }, playItem(item, queueopt) { console.log('playItem: ' + item); + this.$globals.loading = true; var api_url = 'api/players/' + this.active_player_id + '/play_media/' + item.media_type + '/' + item.item_id + '/' + queueopt; axios .get(api_url, { @@ -197,9 +198,11 @@ Vue.component("player", { }) .then(result => { console.log(result.data); + this.$globals.loading = false; }) .catch(error => { console.log("error", error); + this.$globals.loading = false; }); }, switchPlayer (new_player_id) { -- 2.34.1