From 02aa9c2ac7fc48750093c06aec497102034a30df Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Tue, 12 Nov 2019 14:55:46 +0100 Subject: [PATCH] fix crossfade and database lockups --- music_assistant/__init__.py | 7 +-- music_assistant/cache.py | 64 ++++++++++--------- music_assistant/database.py | 76 ++++++++++++++--------- music_assistant/homeassistant.py | 1 - music_assistant/http_streamer.py | 46 ++++++++------ music_assistant/models/musicprovider.py | 13 ++-- music_assistant/models/player.py | 1 - music_assistant/models/player_queue.py | 66 +++++++------------- music_assistant/models/playerprovider.py | 1 - music_assistant/music_manager.py | 5 +- music_assistant/musicproviders/file.py | 1 - music_assistant/musicproviders/qobuz.py | 4 -- music_assistant/musicproviders/spotify.py | 4 -- music_assistant/musicproviders/tunein.py | 11 ++-- 14 files changed, 153 insertions(+), 147 deletions(-) diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index 42572f33..105dae88 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -65,6 +65,8 @@ class MusicAssistant(): except asyncio.CancelledError: LOGGER.info("Application shutdown") await self.signal_event("shutdown") + await self.db.close() + await self.cache.close() def handle_exception(self, loop, context): ''' global exception handler ''' @@ -78,10 +80,7 @@ class MusicAssistant(): listeners = list(self.event_listeners.values()) for callback, eventfilter in listeners: if not eventfilter or eventfilter in msg: - if msg == 'shutdown': - await callback(msg, msg_details) - else: - self.event_loop.create_task(callback(msg, msg_details)) + await callback(msg, msg_details) async def add_event_listener(self, cb, eventfilter=None): ''' add callback to our event listeners ''' diff --git a/music_assistant/cache.py b/music_assistant/cache.py index 83dc2e93..c62f0a57 100644 --- a/music_assistant/cache.py +++ b/music_assistant/cache.py @@ -29,13 +29,12 @@ class Cache(object): """Async initialize of cache module.""" self._db = await aiosqlite.connect(self._dbfile, timeout=30) self._db.row_factory = aiosqlite.Row - await self.mass.add_event_listener(self.on_shutdown, "shutdown") await self._db.execute("""CREATE TABLE IF NOT EXISTS simplecache( id TEXT UNIQUE, expires INTEGER, data TEXT, checksum INTEGER)""") await self._db.commit() self.mass.event_loop.create_task(self.auto_cleanup()) - async def on_shutdown(self, msg, msg_details): + async def close(self): """Handle shutdown event, close db connection.""" await self._db.close() LOGGER.info("cache db connection closed") @@ -80,7 +79,7 @@ class Cache(object): (id, expires, data, checksum) VALUES (?, ?, ?, ?)""" await self._db.execute(sql_query, (cache_key, expires, data, checksum)) await self._db.commit() - + @run_periodic(3600) async def auto_cleanup(self): """ (scheduled) auto cleanup task """ @@ -126,39 +125,26 @@ async def cached_iterator(cache, iter_func, cache_key, expires=(86400*30), check cache_result.append(item) await cache.set(cache_key, cache_result, checksum, expires) -def use_cache(cache_days=14): +async def cached(cache, cache_key, coro_func, *args, **kwargs): + """Helper method to store results of a coroutine in the cache.""" + cache_result = await cache.get(cache_key) + if cache_result is not None: + return cache_result + result = await coro_func(*args, **kwargs) + await cache.set(cache_key, result) + return result + +def use_cache(cache_days=14, cache_checksum=None): """ decorator that can be used to cache a method's result.""" def wrapper(func): @functools.wraps(func) async def wrapped(*args, **kwargs): - if kwargs.get("ignore_cache"): - return await func(*args, **kwargs) - cache_checksum = kwargs.get("cache_checksum") method_class = args[0] method_class_name = method_class.__class__.__name__ cache_str = "%s.%s" % (method_class_name, func.__name__) - # append args to cache identifier - for item in args[1:]: - if isinstance(item, dict): - for subkey in sorted(list(item.keys())): - subvalue = item[subkey] - cache_str += ".%s%s" % (subkey, subvalue) - else: - cache_str += ".%s" % item - # append kwargs to cache identifier - for key in sorted(list(kwargs.keys())): - if key in ["ignore_cache", "cache_checksum"]: - continue - value = kwargs[key] - if isinstance(value, dict): - for subkey in sorted(list(value.keys())): - subvalue = value[subkey] - cache_str += ".%s%s" % (subkey, subvalue) - else: - cache_str += ".%s%s" % (key, value) + cache_str += __cache_id_from_args(*args, **kwargs) cache_str = cache_str.lower() - cachedata = await method_class.cache.get(cache_str, - checksum=cache_checksum) + cachedata = await method_class.cache.get(cache_str) if cachedata is not None: return cachedata else: @@ -172,3 +158,25 @@ def use_cache(cache_days=14): return result return wrapped return wrapper + +def __cache_id_from_args(*args, **kwargs): + ''' parse arguments to build cache id ''' + cache_str = '' + # append args to cache identifier + for item in args[1:]: + if isinstance(item, dict): + for subkey in sorted(list(item.keys())): + subvalue = item[subkey] + cache_str += ".%s%s" % (subkey, subvalue) + else: + cache_str += ".%s" % item + # append kwargs to cache identifier + for key in sorted(list(kwargs.keys())): + value = kwargs[key] + if isinstance(value, dict): + for subkey in sorted(list(value.keys())): + subvalue = value[subkey] + cache_str += ".%s%s" % (subkey, subvalue) + else: + cache_str += ".%s%s" % (key, value) + return cache_str diff --git a/music_assistant/database.py b/music_assistant/database.py index beacf770..4c46b585 100755 --- a/music_assistant/database.py +++ b/music_assistant/database.py @@ -7,11 +7,25 @@ from typing import List import aiosqlite import operator import logging +import functools from .utils import run_periodic, LOGGER, get_sort_name, try_parse_int from .models.media_types import MediaType, Artist, Album, Track, Playlist, Radio +def commit_guard(func): + """ decorator to guard against multiple db writes """ + async def wrapped(*args, **kwargs): + method_class = args[0] + while method_class.commit_guard_active: + await asyncio.sleep(0.1) + method_class.commit_guard_active = True + res = await func(*args, **kwargs) + method_class.commit_guard_active = False + return res + return wrapped + class Database(): + commit_guard_active = False def __init__(self, mass): self.mass = mass @@ -20,7 +34,7 @@ class Database(): self._dbfile = os.path.join(mass.datapath, "database.db") logging.getLogger('aiosqlite').setLevel(logging.INFO) - async def on_shutdown(self, msg, msg_details): + async def close(self): ''' handle shutdown event, close db connection ''' await self._db.close() LOGGER.info("db connection closed") @@ -29,12 +43,11 @@ class Database(): ''' init database ''' self._db = await aiosqlite.connect(self._dbfile) self._db.row_factory = aiosqlite.Row - await self.mass.add_event_listener(self.on_shutdown, "shutdown") await self._db.execute('CREATE TABLE IF NOT EXISTS library_items(item_id INTEGER NOT NULL, provider TEXT NOT NULL, media_type INTEGER NOT NULL, UNIQUE(item_id, provider, media_type));') await self._db.execute('CREATE TABLE IF NOT EXISTS artists(artist_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, sort_name TEXT, musicbrainz_id TEXT NOT NULL UNIQUE);') - await self._db.execute('CREATE TABLE IF NOT EXISTS albums(album_id INTEGER PRIMARY KEY AUTOINCREMENT, artist_id INTEGER NOT NULL, name TEXT NOT NULL, albumtype TEXT, year INTEGER, version TEXT, UNIQUE(artist_id, name, version, year, albumtype));') + await self._db.execute('CREATE TABLE IF NOT EXISTS albums(album_id INTEGER PRIMARY KEY AUTOINCREMENT, artist_id INTEGER NOT NULL, name TEXT NOT NULL, albumtype TEXT, year INTEGER, version TEXT, UNIQUE(artist_id, name, version, year));') await self._db.execute('CREATE TABLE IF NOT EXISTS labels(label_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE);') await self._db.execute('CREATE TABLE IF NOT EXISTS album_labels(album_id INTEGER, label_id INTEGER, UNIQUE(album_id, label_id));') @@ -175,6 +188,7 @@ class Database(): return item return None + @commit_guard async def add_playlist(self, playlist:Playlist): ''' add a new playlist record into table''' assert(playlist.name) @@ -187,10 +201,13 @@ class Database(): await self._db.execute(sql_query, (playlist.is_editable, playlist.checksum, playlist_id)) else: # insert playlist - sql_query = 'INSERT OR REPLACE INTO playlists (name, owner, is_editable, checksum) VALUES(?,?,?,?);' - await self._db.execute(sql_query, (playlist.name, playlist.owner, playlist.is_editable, playlist.checksum)) - # get id from newly created item (the safe way) - async with self._db.execute('SELECT (playlist_id) FROM playlists WHERE name=? AND owner=?;', (playlist.name,playlist.owner)) as cursor: + sql_query = 'INSERT INTO playlists (name, owner, is_editable, checksum) VALUES(?,?,?,?);' + async with self._db.execute(sql_query, (playlist.name, playlist.owner, playlist.is_editable, playlist.checksum)) as cursor: + last_row_id = cursor.lastrowid + await self._db.commit() + # get id from newly created item + sql_query = 'SELECT (playlist_id) FROM playlists WHERE ROWID=?' + async with self._db.execute(sql_query, (last_row_id,)) as cursor: playlist_id = await cursor.fetchone() playlist_id = playlist_id[0] LOGGER.debug('added playlist %s to database: %s' %(playlist.name, playlist_id)) @@ -201,6 +218,7 @@ class Database(): await self._db.commit() return playlist_id + @commit_guard async def add_radio(self, radio:Radio): ''' add a new radio record into table''' assert(radio.name) @@ -210,10 +228,13 @@ class Database(): radio_id = result[0] else: # insert radio - sql_query = 'INSERT OR REPLACE INTO radios (name) VALUES(?);' - await self._db.execute(sql_query, (radio.name,)) - # get id from newly created item (the safe way) - async with self._db.execute('SELECT (radio_id) FROM radios WHERE name=?;', (radio.name,)) as cursor: + sql_query = 'INSERT INTO radios (name) VALUES(?);' + async with self._db.execute(sql_query, (radio.name,)) as cursor: + last_row_id = cursor.lastrowid + await self._db.commit() + # get id from newly created item + sql_query = 'SELECT (radio_id) FROM radios WHERE ROWID=?' + async with self._db.execute(sql_query, (last_row_id,)) as cursor: radio_id = await cursor.fetchone() radio_id = radio_id[0] LOGGER.debug('added radio station %s to database: %s' %(radio.name, radio_id)) @@ -272,6 +293,7 @@ class Database(): return item return None + @commit_guard async def add_artist(self, artist:Artist): ''' add a new artist record into table''' artist_id = None @@ -290,7 +312,6 @@ class Database(): sql_query = 'INSERT INTO artists (name, sort_name, musicbrainz_id) VALUES(?,?,?);' async with self._db.execute(sql_query, (artist.name, artist.sort_name, musicbrainz_id)) as cursor: last_row_id = cursor.lastrowid - await self._db.commit() # get id from (newly created) item async with self._db.execute('SELECT artist_id FROM artists WHERE ROWID=?;', (last_row_id,)) as cursor: artist_id = await cursor.fetchone() @@ -338,38 +359,37 @@ class Database(): return item return None + @commit_guard async def add_album(self, album:Album): ''' add a new album record into table''' + assert(album.name and album.artist) album_id = None + assert(album.artist.provider == 'database') # always try to grab existing album with external_id album_id = await self.__get_item_by_external_id(album) # fallback to matching on artist_id, name and version if not album_id: + # search exact match first + sql_query = 'SELECT album_id FROM albums WHERE artist_id=? AND name=? AND version=? AND year=? AND albumtype=?' + async with self._db.execute(sql_query, (album.artist.item_id, album.name, album.version, album.year, album.albumtype)) as cursor: + album_id = await cursor.fetchone() + if album_id: + album_id = album_id['album_id'] + # fallback to almost exact match sql_query = 'SELECT album_id, year, version, albumtype FROM albums WHERE artist_id=? AND name=?' async with self._db.execute(sql_query, (album.artist.item_id, album.name)) as cursor: albums = await cursor.fetchall() - # search exact match first for result in albums: - if (result['albumtype'] == album.albumtype and - result['year'] == album.year and - result['version'] == album.version): + if ((not album.version and result['year'] == album.year) or + (album.version and result['version'] == album.version)): album_id = result['album_id'] break - # fallback to almost exact match - if not album_id: - for result in albums: - if (result['albumtype'] == album.albumtype and - (not album.version and result['year'] == album.year) or - (album.version and result['version'] == album.version)): - album_id = result['album_id'] - break if not album_id: # insert album - sql_query = 'INSERT OR REPLACE INTO albums (artist_id, name, albumtype, year, version) VALUES(?,?,?,?,?);' + sql_query = 'INSERT INTO albums (artist_id, name, albumtype, year, version) VALUES(?,?,?,?,?);' query_params = (album.artist.item_id, album.name, album.albumtype, album.year, album.version) async with self._db.execute(sql_query, query_params) as cursor: last_row_id = cursor.lastrowid - await self._db.commit() # get id from newly created item sql_query = 'SELECT (album_id) FROM albums WHERE ROWID=?' async with self._db.execute(sql_query, (last_row_id,)) as cursor: @@ -428,13 +448,14 @@ class Database(): return item return None + @commit_guard async def add_track(self, track:Track): ''' add a new track record into table''' assert(track.name and track.album) assert(track.album.provider == 'database') + assert(track.artists) for artist in track.artists: assert(artist.provider == 'database') - track_id = None # always try to grab existing track with external_id track_id = await self.__get_item_by_external_id(track) # fallback to matching on album_id, name and version @@ -455,7 +476,6 @@ class Database(): query_params = (track.name, track.album.item_id, track.duration, track.version) async with self._db.execute(sql_query, query_params) as cursor: last_row_id = cursor.lastrowid - await self._db.commit() # get id from newly created item (the safe way) async with self._db.execute('SELECT track_id FROM tracks WHERE ROWID=?', (last_row_id,)) as cursor: track_id = await cursor.fetchone() diff --git a/music_assistant/homeassistant.py b/music_assistant/homeassistant.py index f7ab51b6..3ec5f741 100644 --- a/music_assistant/homeassistant.py +++ b/music_assistant/homeassistant.py @@ -17,7 +17,6 @@ import json from .utils import run_periodic, LOGGER, IS_HASSIO, try_parse_int from .models.media_types import Track from .constants import CONF_ENABLED, CONF_URL, CONF_TOKEN, EVENT_PLAYER_CHANGED, EVENT_PLAYER_ADDED, EVENT_HASS_ENTITY_CHANGED -from .cache import use_cache CONF_KEY = 'homeassistant' CONF_PUBLISH_PLAYERS = "publish_players" diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index b2ce0e1c..ebc7e9f2 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -4,6 +4,7 @@ import asyncio import os import operator +import concurrent from aiohttp import web import threading import urllib @@ -129,14 +130,16 @@ class HTTPStreamer(): self.mass.run_task(buffer.write(chunk), wait_for_result=True, ignore_exception=(BrokenPipeError, - ConnectionResetError)) + ConnectionResetError, + concurrent.futures._base.CancelledError)) del chunk # indicate EOF if no more data if not cancelled.is_set(): self.mass.run_task(buffer.write_eof(), wait_for_result=True, ignore_exception=(BrokenPipeError, - ConnectionResetError)) + ConnectionResetError, + concurrent.futures._base.CancelledError)) # start fill buffer task in background fill_buffer_thread = threading.Thread(target=fill_buffer) @@ -201,7 +204,6 @@ class HTTPStreamer(): # part is too short after the strip action?! # so we just use the full first part first_part = prev_chunk + chunk - LOGGER.warning("Not enough data after strip action: %s", len(first_part)) fade_in_part = first_part[:fade_bytes] remaining_bytes = first_part[fade_bytes:] del first_part @@ -234,8 +236,9 @@ class HTTPStreamer(): # part is too short after the strip action # so we just use the entire original data last_part = prev_chunk + chunk - LOGGER.warning("Not enough data for last_part after strip action: %s", len(last_part)) - if not player.queue.crossfade_enabled: + if len(last_part) < fade_bytes: + LOGGER.warning("Not enough data for crossfade: %s", len(last_part)) + if not player.queue.crossfade_enabled or len(last_part) < fade_bytes: # crossfading is not enabled so just pass the (stripped) audio data sox_proc.stdin.write(last_part) bytes_written += len(last_part) @@ -301,17 +304,18 @@ class HTTPStreamer(): chunksize=128000, resample=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' - # get stream details from provider - # sort by quality and check track availability streamdetails = None + # always request the full db track as there might be other qualities available full_track = self.mass.run_task( self.mass.music.track(queue_item.item_id, queue_item.provider, lazy=True), wait_for_result=True) + # sort by quality and check track availability for prov_media in sorted(full_track.provider_ids, key=operator.itemgetter('quality'), reverse=True): if not prov_media['provider'] in self.mass.music.providers: continue + # get stream details from provider streamdetails = self.mass.run_task(self.mass.music.providers[ prov_media['provider']].get_stream_details( prov_media['item_id']), wait_for_result=True) @@ -336,14 +340,15 @@ class HTTPStreamer(): outputfmt = 'raw -b 32 -c 2 -e signed-integer' sox_options += ' rate -v %s' % resample streamdetails['sox_options'] = sox_options - # determine how to proceed based on input file ype + # determine how to proceed based on input file type if streamdetails["content_type"] == 'aac': # support for AAC created with ffmpeg in between args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % ( streamdetails["path"], outputfmt, sox_options) process = subprocess.Popen(args, shell=True, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + bufsize=chunksize) elif streamdetails['type'] in ['url', 'file']: args = 'sox -t %s "%s" -t %s - %s' % ( streamdetails["content_type"], streamdetails["path"], @@ -351,16 +356,18 @@ class HTTPStreamer(): args = shlex.split(args) process = subprocess.Popen(args, shell=False, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + bufsize=chunksize) elif streamdetails['type'] == 'executable': args = '%s | sox -t %s - -t %s - %s' % ( streamdetails["path"], streamdetails["content_type"], outputfmt, sox_options) process = subprocess.Popen(args, shell=True, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + bufsize=chunksize) else: - LOGGER.warning(f"no streaming options for {queue_item.name}") + LOGGER.warning("no streaming options for %s", queue_item.name) yield (True, b'') return # fire event that streaming has started for this track @@ -368,8 +375,8 @@ class HTTPStreamer(): self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly - bytes_sent = 0 - while process.poll() == None: + prev_chunk = b'' + while True: if cancelled.is_set(): # http session ended # send terminate and pick up left over bytes @@ -378,12 +385,12 @@ class HTTPStreamer(): chunk = process.stdout.read(chunksize) if len(chunk) < chunksize: # last chunk - bytes_sent += len(chunk) - yield (True, chunk) + yield (True, prev_chunk + chunk) break else: - bytes_sent += len(chunk) - yield (False, chunk) + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk # fire event that streaming has ended self.mass.run_task( self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)) @@ -452,7 +459,8 @@ class HTTPStreamer(): (item_key, loudness)) self.analyze_jobs.pop(item_key, None) - def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, + @staticmethod + def __crossfade_pcm_parts(fade_in_part, fade_out_part, pcm_args, fade_length): ''' crossfade two chunks of audio using sox ''' # create fade-in part diff --git a/music_assistant/models/musicprovider.py b/music_assistant/models/musicprovider.py index 591cb5d0..0b0d5c1a 100755 --- a/music_assistant/models/musicprovider.py +++ b/music_assistant/models/musicprovider.py @@ -4,7 +4,7 @@ import asyncio from typing import List from ..utils import LOGGER, compare_strings -from ..cache import use_cache, cached_iterator +from ..cache import use_cache, cached_iterator, cached from ..constants import CONF_ENABLED from .media_types import Album, Artist, Track, Playlist, MediaType, Radio @@ -42,7 +42,8 @@ class MusicProvider(): MediaType.Artist) if not item_id: # artist not yet in local database so fetch details - artist_details = await self.get_artist(prov_item_id) + cache_key = f'{self.prov_id}.get_artist.{prov_item_id}' + artist_details = await cached(self.cache, cache_key, self.get_artist, prov_item_id ) if not artist_details: raise Exception('artist not found: %s' % prov_item_id) if lazy: @@ -165,7 +166,8 @@ class MusicProvider(): if not item_id: # album not yet in local database so fetch details if not album_details: - album_details = await self.get_album(prov_item_id) + cache_key = f'{self.prov_id}.get_album.{prov_item_id}' + album_details = await cached(self.cache, cache_key, self.get_album, prov_item_id) if not album_details: raise Exception('album not found: %s' % prov_item_id) if lazy: @@ -173,6 +175,7 @@ class MusicProvider(): album_details.is_lazy = True return album_details item_id = await self.add_album(album_details) + LOGGER.info("item_id after add_album: %s", item_id) return await self.mass.db.album(item_id) async def add_album(self, album_details) -> int: @@ -203,7 +206,8 @@ class MusicProvider(): if not item_id: # track not yet in local database so fetch details if not track_details: - track_details = await self.get_track(prov_item_id) + cache_key = f'{self.prov_id}.get_track.{prov_item_id}' + track_details = await cached(self.cache, cache_key, self.get_track, prov_item_id) if not track_details: raise Exception('track not found: %s' % prov_item_id) if lazy: @@ -224,6 +228,7 @@ class MusicProvider(): if db_track_artist: track_artists.append(db_track_artist) track_details.artists = track_artists + # fetch album details if not prov_album_id: prov_album_id = track_details.album.item_id track_details.album = await self.album(prov_album_id, lazy=False) diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index c61788ad..9789e72f 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -9,7 +9,6 @@ import time from ..utils import run_periodic, LOGGER, try_parse_int, \ try_parse_bool, try_parse_float from ..constants import EVENT_PLAYER_CHANGED -from ..cache import use_cache from .media_types import Track, MediaType from .player_queue import PlayerQueue, QueueItem from .playerstate import PlayerState diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index e4d2dea3..e61d00e0 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -7,7 +7,6 @@ import operator import random import uuid import os -import pickle from enum import Enum from ..utils import LOGGER, json, filename_from_string, serialize_values @@ -55,18 +54,12 @@ class PlayerQueue(): self._last_queue_startindex = 0 self._next_queue_startindex = 0 self._last_player_state = PlayerState.Stopped - self._save_busy_ = False self._last_track = None - self.mass.event_loop.create_task( + self.mass.run_task( self.mass.add_event_listener(self.on_shutdown, "shutdown")) # load previous queue settings from disk - self.mass.event_loop.run_in_executor(None, self.__load_from_file) + self.mass.run_task(self.__restore_saved_state()) - async def on_shutdown(self, msg, msg_details): - """Handle shutdown event, save queue state.""" - self.__save_to_file() - LOGGER.info("queue state saved to file for player %s", self._player.player_id) - @property def shuffle_enabled(self): return self._shuffle_enabled @@ -362,9 +355,8 @@ class PlayerQueue(): try: await self._player.cmd_queue_update(self._items) except NotImplementedError: - # not supported by player, use load queue instead - LOGGER.debug("cmd_queue_update not supported by player, fallback to cmd_queue_load ") - await self._player.cmd_queue_load(self._items) + # not supported by player, ignore + pass self.mass.event_loop.create_task( self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict())) @@ -471,42 +463,28 @@ class PlayerQueue(): item_index = index return item_index - def __load_from_file(self): - ''' try to load the saved queue for this player from file ''' - player_safe_str = filename_from_string(self._player.player_id) - settings_dir = os.path.join(self.mass.datapath, 'queue') - player_file = os.path.join(settings_dir, player_safe_str) - if os.path.isfile(player_file): - try: - with open(player_file, 'rb') as f: - data = pickle.load(f) - self._shuffle_enabled = data["shuffle_enabled"] - self._repeat_enabled = data["repeat_enabled"] - self._items = data["items"] - self._cur_index = data["cur_item"] - self._last_queue_startindex = data["last_index"] - except Exception as exc: - LOGGER.debug("Could not load queue from disk - %s" % str(exc)) + async def __restore_saved_state(self): + ''' try to load the saved queue for this player from cache file ''' + cache_str = 'queue_%s' % self._player.player_id + cache_data = await self.mass.cache.get(cache_str) + if cache_data: + self._shuffle_enabled = cache_data["shuffle_enabled"] + self._repeat_enabled = cache_data["repeat_enabled"] + self._items = cache_data["items"] + self._cur_index = cache_data["cur_item"] + self._next_queue_startindex = cache_data["next_queue_index"] - def __save_to_file(self): + async def on_shutdown(self, msg, msg_details): + """Handle shutdown event, save queue state.""" ''' save current queue settings to file ''' - if self._save_busy_: - return - self._save_busy_ = True - player_safe_str = filename_from_string(self._player.player_id) - settings_dir = os.path.join(self.mass.datapath, 'queue') - player_file = os.path.join(settings_dir, player_safe_str) - data = { + cache_str = 'queue_%s' % self._player.player_id + cache_data = { "shuffle_enabled": self._shuffle_enabled, "repeat_enabled": self._repeat_enabled, "items": self._items, "cur_item": self._cur_index, - "last_index": self._cur_index + "next_queue_index": self._next_queue_startindex } - if not os.path.isdir(settings_dir): - os.mkdir(settings_dir) - with open(player_file, 'wb') as f: - data = pickle.dump(data, f) - self._save_busy_ = False - - + await self.mass.cache.set(cache_str, cache_data) + LOGGER.info("queue state saved to file for player %s", self._player.player_id) + \ No newline at end of file diff --git a/music_assistant/models/playerprovider.py b/music_assistant/models/playerprovider.py index 60fff45d..24d87042 100755 --- a/music_assistant/models/playerprovider.py +++ b/music_assistant/models/playerprovider.py @@ -6,7 +6,6 @@ from enum import Enum from typing import List from ..utils import run_periodic, LOGGER from ..constants import CONF_ENABLED -from ..cache import use_cache from .player_queue import PlayerQueue from .media_types import Track from .player import Player diff --git a/music_assistant/music_manager.py b/music_assistant/music_manager.py index 4899d2e2..85399ea7 100755 --- a/music_assistant/music_manager.py +++ b/music_assistant/music_manager.py @@ -402,7 +402,10 @@ class MusicManager(): ] cur_db_ids = [] async for item in music_provider.get_library_albums(): - db_album = await music_provider.album(item.item_id, lazy=False) + + db_album = await music_provider.album(item.item_id, album_details=item, lazy=False) + if not db_album: + LOGGER.error("provider %s album: %s", prov_id, item.__dict__) cur_db_ids.append(db_album.item_id) if not db_album.item_id in prev_db_ids: await self.mass.db.add_to_library(db_album.item_id, diff --git a/music_assistant/musicproviders/file.py b/music_assistant/musicproviders/file.py index b1ae425d..6d265075 100644 --- a/music_assistant/musicproviders/file.py +++ b/music_assistant/musicproviders/file.py @@ -9,7 +9,6 @@ import time import base64 import taglib -from ..cache import use_cache from ..utils import run_periodic, LOGGER, parse_title_and_version from ..models import MusicProvider, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist from ..constants import CONF_ENABLED diff --git a/music_assistant/musicproviders/qobuz.py b/music_assistant/musicproviders/qobuz.py index 7a474b9c..71cd4edd 100644 --- a/music_assistant/musicproviders/qobuz.py +++ b/music_assistant/musicproviders/qobuz.py @@ -390,8 +390,6 @@ class QobuzProvider(MusicProvider): album.name, album.version = parse_title_and_version( album_obj['title'], album_obj.get('version')) album.artist = await self.__parse_artist(album_obj['artist']) - if not album.artist: - raise Exception("No album artist ! %s" % album_obj) if album_obj.get('product_type', '') == 'single': album.albumtype = AlbumType.Single elif album_obj.get( @@ -440,8 +438,6 @@ class QobuzProvider(MusicProvider): if track_obj.get( 'performer') and not 'Various ' in track_obj['performer']: artist = await self.__parse_artist(track_obj['performer']) - if not artist: - artist = self.get_artist(track_obj['performer']['id']) if artist: track.artists.append(artist) if not track.artists: diff --git a/music_assistant/musicproviders/spotify.py b/music_assistant/musicproviders/spotify.py index 7d1e5d4f..0bf77395 100644 --- a/music_assistant/musicproviders/spotify.py +++ b/music_assistant/musicproviders/spotify.py @@ -295,8 +295,6 @@ class SpotifyProvider(MusicProvider): album.artist = await self.__parse_artist(artist) if album.artist: break - if not album.artist: - raise Exception("No album artist ! %s" % album_obj) if album_obj['album_type'] == 'single': album.albumtype = AlbumType.Single elif album_obj['album_type'] == 'compilation': @@ -351,8 +349,6 @@ class SpotifyProvider(MusicProvider): track.external_ids.append({key: value}) if 'album' in track_obj: track.album = await self.__parse_album(track_obj['album']) - if not track.album: - track.album = await self.get_album(track_obj['album']['id']) if track_obj.get('copyright'): track.metadata["copyright"] = track_obj['copyright'] if track_obj.get('explicit'): diff --git a/music_assistant/musicproviders/tunein.py b/music_assistant/musicproviders/tunein.py index d54a2caf..d7988a8e 100644 --- a/music_assistant/musicproviders/tunein.py +++ b/music_assistant/musicproviders/tunein.py @@ -10,7 +10,6 @@ from asyncio_throttle import Throttler import json import aiohttp -from ..cache import use_cache from ..utils import run_periodic, LOGGER from ..models import MusicProvider, MediaType, TrackQuality, Radio from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, CONF_TYPE_PASSWORD @@ -31,10 +30,9 @@ class TuneInProvider(MusicProvider): def __init__(self, mass, conf): ''' Support for streaming radio provider TuneIn ''' + super().__init__(mass) self.name = PROV_NAME self.prov_id = PROV_ID - self.mass = mass - self.cache = mass.cache if not conf[CONF_USERNAME] or not conf[CONF_PASSWORD]: raise Exception("Username and password must not be empty") self._username = conf[CONF_USERNAME] @@ -60,7 +58,7 @@ class TuneInProvider(MusicProvider): async def get_radios(self): ''' get favorited/library radio stations ''' params = {"c": "presets"} - result = await self.__get_data("Browse.ashx", params, ignore_cache=True) + result = await self.__get_data("Browse.ashx", params) if result and "body" in result: for item in result["body"]: # TODO: expand folders @@ -72,7 +70,7 @@ class TuneInProvider(MusicProvider): ''' get radio station details ''' radio = None params = {"c": "composite", "detail": "listing", "id": radio_id} - result = await self.__get_data("Describe.ashx", params, ignore_cache=True) + result = await self.__get_data("Describe.ashx", params) if result and result.get("body") and result["body"][0].get("children"): item = result["body"][0]["children"][0] radio = await self.__parse_radio(item) @@ -139,8 +137,7 @@ class TuneInProvider(MusicProvider): } return {} - @use_cache(7) - async def __get_data(self, endpoint, params={}, ignore_cache=False, cache_checksum=None): + async def __get_data(self, endpoint, params={}): ''' get data from api''' url = 'https://opml.radiotime.com/%s' % endpoint params['render'] = 'json' -- 2.34.1