from models import MediaType, Artist, Album, Track, Playlist
from typing import List
import aiosqlite
+import operator
class Database():
await db.execute(sql_query, (item_id,media_type, provider))
await db.commit()
- async def artists(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Artist]:
+ async def artists(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Artist]:
''' fetch artist records from table'''
artists = []
sql_query = 'SELECT * FROM artists'
sql_query += ' ORDER BY %s' % orderby
if limit:
sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
- async with aiosqlite.connect(self.dbfile) as db:
- async with db.execute(sql_query) as cursor:
- db_rows = await cursor.fetchall()
- for db_row in db_rows:
- artist = Artist()
- artist.item_id = db_row[0]
- artist.name = db_row[1]
- artist.sort_name = db_row[2]
- artist.provider_ids = await self.__get_prov_ids(artist.item_id, MediaType.Artist, db)
- artist.in_library = await self.__get_library_providers(artist.item_id, MediaType.Artist, db)
- artist.external_ids = await self.__get_external_ids(artist.item_id, MediaType.Artist, db)
- if fulldata:
- artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db)
- artist.tags = await self.__get_tags(artist.item_id, MediaType.Artist, db)
- else:
- artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db, filter_key='image')
- artists.append(artist)
+ if not db:
+ db = await aiosqlite.connect(self.dbfile)
+ should_close_db = True
+ else:
+ should_close_db = False
+ async with db.execute(sql_query) as cursor:
+ db_rows = await cursor.fetchall()
+ for db_row in db_rows:
+ artist = Artist()
+ artist.item_id = db_row[0]
+ artist.name = db_row[1]
+ artist.sort_name = db_row[2]
+ artist.provider_ids = await self.__get_prov_ids(artist.item_id, MediaType.Artist, db)
+ artist.in_library = await self.__get_library_providers(artist.item_id, MediaType.Artist, db)
+ artist.external_ids = await self.__get_external_ids(artist.item_id, MediaType.Artist, db)
+ if fulldata:
+ artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db)
+ artist.tags = await self.__get_tags(artist.item_id, MediaType.Artist, db)
+ else:
+ artist.metadata = await self.__get_metadata(artist.item_id, MediaType.Artist, db, filter_key='image')
+ artists.append(artist)
+ if should_close_db:
+ await db.close()
return artists
async def artist(self, artist_id:int, fulldata=True) -> Artist:
LOGGER.info('added artist %s (%s) to database: %s' %(artist.name, artist.provider_ids, artist_id))
return artist_id
- async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Album]:
+ async def albums(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Album]:
''' fetch all album records from table'''
albums = []
sql_query = 'SELECT * FROM albums'
sql_query += ' ORDER BY %s' % orderby
if limit:
sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
- async with aiosqlite.connect(self.dbfile) as db:
- async with db.execute(sql_query) as cursor:
- db_rows = await cursor.fetchall()
+ if not db:
+ db = await aiosqlite.connect(self.dbfile)
+ should_close_db = True
+ else:
+ should_close_db = False
+ async with db.execute(sql_query) as cursor:
+ db_rows = await cursor.fetchall()
for db_row in db_rows:
album = Album()
album.item_id = db_row[0]
else:
album.metadata = await self.__get_metadata(album.item_id, MediaType.Album, db, filter_key='image')
albums.append(album)
+ if should_close_db:
+ await db.close()
return albums
- async def album(self, album_id:int, fulldata=True) -> Album:
+ async def album(self, album_id:int, fulldata=True, db=None) -> Album:
''' get album record by id '''
album_id = try_parse_int(album_id)
- albums = await self.albums('WHERE album_id = %s' % album_id, fulldata=fulldata)
+ albums = await self.albums('WHERE album_id = %s' % album_id, fulldata=fulldata, db=db)
if not albums:
return None
return albums[0]
LOGGER.info('added album %s (%s) to database: %s' %(album.name, album.provider_ids, album_id))
return album_id
- async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False) -> List[Track]:
+ async def tracks(self, filter_query=None, limit=100000, offset=0, orderby='name', fulldata=False, db=None) -> List[Track]:
''' fetch all track records from table'''
tracks = []
sql_query = 'SELECT * FROM tracks'
sql_query += ' ORDER BY %s' % orderby
if limit:
sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
- async with aiosqlite.connect(self.dbfile) as db:
- async with db.execute(sql_query) as cursor:
- db_rows = await cursor.fetchall()
- for db_row in db_rows:
- track = Track()
- track.item_id = db_row[0]
- track.name = db_row[1]
- track.album = await self.album(db_row[2], fulldata=fulldata)
- track.duration = db_row[3]
- track.version = db_row[4]
- track.disc_number = db_row[5]
- track.track_number = db_row[6]
- track.metadata = await self.__get_metadata(track.item_id, MediaType.Track, db)
- track.tags = await self.__get_tags(track.item_id, MediaType.Track, db)
- track.provider_ids = await self.__get_prov_ids(track.item_id, MediaType.Track, db)
- track.in_library = await self.__get_library_providers(track.item_id, MediaType.Track, db)
- track.artists = await self.__get_track_artists(track.item_id, db, fulldata=fulldata)
- track.external_ids = await self.__get_external_ids(track.item_id, MediaType.Track, db)
- tracks.append(track)
+ if not db:
+ db = await aiosqlite.connect(self.dbfile)
+ should_close_db = True
+ else:
+ should_close_db = False
+ async with db.execute(sql_query) as cursor:
+ db_rows = await cursor.fetchall()
+ for db_row in db_rows:
+ track = Track()
+ track.item_id = db_row[0]
+ track.name = db_row[1]
+ track.album = await self.album(db_row[2], fulldata=fulldata, db=db)
+ track.duration = db_row[3]
+ track.version = db_row[4]
+ track.disc_number = db_row[5]
+ track.track_number = db_row[6]
+ track.metadata = await self.__get_metadata(track.item_id, MediaType.Track, db)
+ track.tags = await self.__get_tags(track.item_id, MediaType.Track, db)
+ track.provider_ids = await self.__get_prov_ids(track.item_id, MediaType.Track, db)
+ track.in_library = await self.__get_library_providers(track.item_id, MediaType.Track, db)
+ track.artists = await self.__get_track_artists(track.item_id, db, fulldata=fulldata)
+ track.external_ids = await self.__get_external_ids(track.item_id, MediaType.Track, db)
+ tracks.append(track)
+ if should_close_db:
+ await db.close()
return tracks
async def track(self, track_id:int, fulldata=True) -> Track:
''' get playlist tracks for the given playlist_id '''
playlist_id = try_parse_int(playlist_id)
playlist_tracks = []
- sql_query = 'SELECT track_id, position FROM playlist_tracks WHERE playlist_id = ? ORDER BY %s' % orderby
+ sql_query = 'SELECT track_id, position FROM playlist_tracks WHERE playlist_id = ? ORDER BY track_id'
if limit:
sql_query += ' LIMIT %d OFFSET %d' %(limit, offset)
async with aiosqlite.connect(self.dbfile) as db:
async with db.execute(sql_query, (playlist_id,)) as cursor:
db_rows = await cursor.fetchall()
- for db_row in db_rows:
- playlist_track = await self.track(db_row[0], fulldata=fulldata)
- playlist_track.position = db_row[1]
- playlist_tracks.append(playlist_track)
+ playlist_track_ids = [str(item[0]) for item in db_rows]
+ sql_query = 'WHERE track_id in (%s)' % ','.join(playlist_track_ids)
+ tracks = await self.tracks(sql_query, orderby='track_id', db=db)
+ for index, track in enumerate(tracks):
+ track.position = db_rows[index][1]
+ playlist_tracks.append(track)
+ playlist_tracks = sorted(playlist_tracks, key=operator.attrgetter(orderby), reverse=False)
return playlist_tracks
async def add_playlist_track(self, playlist_id:int, track_id, position):
async def __get_track_artists(self, track_id, db, fulldata=False) -> List[Artist]:
''' get artists for track '''
- artists = []
- sql_query = 'SELECT artist_id FROM track_artists WHERE track_id = ?'
- async with db.execute(sql_query, (track_id,)) as cursor:
- db_rows = await cursor.fetchall()
- for db_row in db_rows:
- artist = await self.artist(db_row[0], fulldata=fulldata)
- artists.append(artist)
- return artists
+ sql_query = 'WHERE artist_id in (SELECT artist_id FROM track_artists WHERE track_id = %s)' % track_id
+ return await self.artists(sql_query, db=db)
async def __add_external_ids(self, item_id, media_type, external_ids, db):
''' add or update external_ids'''
self.datapath = datapath
self.parse_config()
self.event_loop = asyncio.get_event_loop()
- self.bg_executor = ThreadPoolExecutor(max_workers=5)
+ self.bg_executor = ThreadPoolExecutor()
+ self.event_loop.set_default_executor(self.bg_executor)
self.event_listeners = {}
# init database and metadata modules
playlist = await self.mass.db.playlist(playlist_id)
if playlist and playlist.is_editable:
# database synced playlist, return tracks from db...
- return await self.mass.db.playlist_tracks(playlist.item_id, offset=offset, limit=limit)
+ return await self.mass.db.playlist_tracks(
+ playlist.item_id, offset=offset, limit=limit)
else:
# return playlist tracks from provider
- items = []
- playlist = await self.playlist(playlist_id)
- for prov_mapping in playlist.provider_ids:
- prov_id = prov_mapping['provider']
- prov_item_id = prov_mapping['item_id']
- prov_obj = self.providers[prov_id]
- items += await prov_obj.playlist_tracks(prov_item_id, offset=offset, limit=limit)
- if items:
- break
- items = list(toolz.unique(items, key=operator.attrgetter('item_id')))
- return items
+ playlist = await self.playlist(playlist_id, provider)
+ prov = playlist.provider_ids[0]
+ return await self.providers[prov['provider']].playlist_tracks(
+ prov['item_id'], offset=offset, limit=limit)
async def search(self, searchquery, media_types:List[MediaType], limit=10, online=False) -> dict:
''' search database or providers '''
async def get_audio_stream(self, track_id):
''' get audio stream for a track '''
params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'}
- streamdetails = await self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True)
- async with self.http_session.get(streamdetails['url']) as resp:
- while True:
- chunk = await resp.content.read(2000000)
- if not chunk:
- break
- yield chunk
+ # we are called from other thread
+ streamdetails_future = asyncio.run_coroutine_threadsafe(
+ self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True),
+ self.mass.event_loop
+ )
+ streamdetails = streamdetails_future.result()
+ async with aiohttp.ClientSession(loop=asyncio.get_event_loop(), connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
+ async with session.get(streamdetails['url']) as resp:
+ while True:
+ chunk = await resp.content.read(512000)
+ if not chunk:
+ break
+ yield chunk
+ await asyncio.sleep(0.1)
LOGGER.info("end of stream for track_id %s" % track_id)
async def __parse_artist(self, artist_obj):
spotty = self.get_spotty_binary()
args = ['-n', 'temp', '-u', self._username, '-p', self._password, '--pass-through', '--single-track', track_id]
process = await asyncio.create_subprocess_exec(spotty, *args, stdout=asyncio.subprocess.PIPE)
- try:
- while not process.stdout.at_eof():
- chunk = await process.stdout.read(2000000)
- if chunk:
- yield chunk
- else:
- break
- except (GeneratorExit, Exception):
- while True:
- if not await process.stdout.read(2000000):
- break
- await process.wait()
- LOGGER.info("stream cancelled for track_id %s" % track_id)
- else:
- await process.wait()
- LOGGER.info("end of stream for track_id %s" % track_id)
+ while not process.stdout.at_eof():
+ chunk = await process.stdout.read(512000)
+ if not chunk:
+ break
+ yield chunk
+ await asyncio.sleep(0.1)
+ LOGGER.info("end of stream for track_id %s" % track_id)
async def __parse_artist(self, artist_obj):
''' parse spotify artist object to generic layout '''
async def trigger_update(self, player_id):
''' manually trigger update for a player '''
- await self.update_player(self._players[player_id])
+ if player_id in self._players:
+ await self.update_player(self._players[player_id])
async def update_player(self, player_details):
''' update (or add) player '''
return await player_prov.player_queue(player_id, offset=offset, limit=limit)
async def get_audio_stream(self, track_id, provider, player_id=None):
+ ''' get audio stream for a track '''
+ queue = asyncio.Queue()
+ run_async_background_task(
+ self.mass.bg_executor, self.__get_audio_stream, queue, track_id, provider, player_id)
+ while True:
+ chunk = await queue.get()
+ if not chunk:
+ break
+ yield chunk
+ queue.task_done()
+
+ async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
cachefile = self.__get_track_cache_filename(track_id, provider)
LOGGER.info("Running sox with args: %s" % args)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- buffer_task = asyncio.create_task(
- self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
- # yield the chunks from stdout
+ buffer_task = asyncio.get_event_loop().create_task(
+ self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+ # put chunks from stdout into queue
while not process.stdout.at_eof():
- chunk = await process.stdout.read(2000000)
+ chunk = await process.stdout.read(512000)
+ await audioqueue.put(chunk)
if not chunk:
break
- yield chunk
+ await asyncio.sleep(0.1)
await process.wait()
+ await audioqueue.put('') # indicate EOF
LOGGER.info("streaming of track_id %s completed" % track_id)
async def __get_player_sox_options(self, track_id, provider, player_id):
max_sample_rate = try_parse_int(self.mass.config['player_settings'][player_id]['max_sample_rate'])
if max_sample_rate:
quality = TrackQuality.LOSSY_MP3
- track = await self.mass.music.track(track_id, provider)
+ track_future = asyncio.run_coroutine_threadsafe(
+ self.mass.music.track(track_id, provider),
+ self.mass.event_loop
+ )
+ track = track_future.result()
for item in track.provider_ids:
if item['provider'] == provider and item['item_id'] == track_id:
quality = item['quality']
gain_correct = fallback_gain # fallback value
if os.path.isfile(analysis_file):
os.remove(analysis_file)
- # cachefile = self.__get_track_cache_filename(track_id, provider)
# reschedule analyze task to try again
- # asyncio.create_task(self.__analyze_track_audio(cachefile, track_id, provider))
+ cachefile = self.__get_track_cache_filename(track_id, provider)
+ self.mass.event_loop.create_task(self.__analyze_audio(cachefile, track_id, provider, 'flac'))
return round(gain_correct,2)
async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
await buf.drain()
buf.write_eof()
fd.close()
- # successfull completion, send tmpfile to be processed in the background
- #asyncio.create_task(self.__process_audio(tmpfile, track_id, provider))
- run_async_background_task(self.mass.bg_executor, self.__analyze_audio, tmpfile, track_id, provider, content_type)
+ # successfull completion, send tmpfile to be processed in the background in main loop
+ self.mass.event_loop.create_task(self.__analyze_audio(tmpfile, track_id, provider, content_type))
LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
return
player.state = PlayerState.Stopped
player.powered = player.powered
player.cur_item = await self.__parse_track(mediastatus)
- player.cur_item_time = try_parse_int(mediastatus.current_time)
+ player.cur_item_time = chromecast.media_controller.status.adjusted_current_time
await self.mass.player.update_player(player)
async def __parse_track(self, mediastatus):
resp = web.StreamResponse(status=200,
reason='OK',
headers={'Content-Type': 'audio/flac'})
- try:
- await resp.prepare(request)
- if request.method.upper() == 'HEAD':
- return resp
- cancelled = False
+ await resp.prepare(request)
+ if request.method.upper() != 'HEAD':
async for chunk in self.mass.player.get_audio_stream(track_id, provider, player_id):
- if cancelled:
- continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators
- try:
- await resp.write(chunk)
- except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError):
- LOGGER.error('client disconnect?')
- cancelled = True
- if not cancelled:
- return resp
- except AttributeError:
- LOGGER.error('client disconnect?')
\ No newline at end of file
+ await resp.write(chunk)
+ return resp
\ No newline at end of file
def run_async_background_task(executor, corofn, *args):
''' run async task in background '''
def run_task(corofn, *args):
- loop = asyncio.new_event_loop()
- try:
- coro = corofn(*args)
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(coro)
- finally:
- loop.close()
+ LOGGER.info('running %s in background task' % corofn.__name__)
+ new_loop = asyncio.new_event_loop()
+ coro = corofn(*args)
+ res = new_loop.run_until_complete(coro)
+ new_loop.close()
+ LOGGER.info('completed %s in background task' % corofn.__name__)
+ return res
return asyncio.get_event_loop().run_in_executor(executor, run_task, corofn, *args)
def get_sort_name(name):
},
playItem(item, queueopt) {
console.log('playItem: ' + item);
+ this.$globals.loading = true;
var api_url = 'api/players/' + this.active_player_id + '/play_media/' + item.media_type + '/' + item.item_id + '/' + queueopt;
axios
.get(api_url, {
})
.then(result => {
console.log(result.data);
+ this.$globals.loading = false;
})
.catch(error => {
console.log("error", error);
+ this.$globals.loading = false;
});
},
switchPlayer (new_player_id) {