fix crossfade and database lockups
authormarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Tue, 12 Nov 2019 13:55:46 +0000 (14:55 +0100)
committermarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Tue, 12 Nov 2019 13:55:46 +0000 (14:55 +0100)
14 files changed:
music_assistant/__init__.py
music_assistant/cache.py
music_assistant/database.py
music_assistant/homeassistant.py
music_assistant/http_streamer.py
music_assistant/models/musicprovider.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/models/playerprovider.py
music_assistant/music_manager.py
music_assistant/musicproviders/file.py
music_assistant/musicproviders/qobuz.py
music_assistant/musicproviders/spotify.py
music_assistant/musicproviders/tunein.py

index 42572f33fe53e438b92a9bf06f76759f6e083e89..105dae8807006ad9d40888369c7b8b0372c4f0d1 100644 (file)
@@ -65,6 +65,8 @@ class MusicAssistant():
         except asyncio.CancelledError:
             LOGGER.info("Application shutdown")
             await self.signal_event("shutdown")
+            await self.db.close()
+            await self.cache.close()
 
     def handle_exception(self, loop, context):
         ''' global exception handler '''
@@ -78,10 +80,7 @@ class MusicAssistant():
         listeners = list(self.event_listeners.values())
         for callback, eventfilter in listeners:
             if not eventfilter or eventfilter in msg:
-                if msg == 'shutdown':
-                    await callback(msg, msg_details)
-                else:
-                    self.event_loop.create_task(callback(msg, msg_details))
+                await callback(msg, msg_details)
 
     async def add_event_listener(self, cb, eventfilter=None):
         ''' add callback to our event listeners '''
index 83dc2e9346222ab089de60582c059ccb00e63384..c62f0a5727ce874222505ab266fe0225568c3ebf 100644 (file)
@@ -29,13 +29,12 @@ class Cache(object):
         """Async initialize of cache module."""
         self._db = await aiosqlite.connect(self._dbfile, timeout=30)
         self._db.row_factory = aiosqlite.Row
-        await self.mass.add_event_listener(self.on_shutdown, "shutdown")
         await self._db.execute("""CREATE TABLE IF NOT EXISTS simplecache(
             id TEXT UNIQUE, expires INTEGER, data TEXT, checksum INTEGER)""")
         await self._db.commit()
         self.mass.event_loop.create_task(self.auto_cleanup())
 
-    async def on_shutdown(self, msg, msg_details):
+    async def close(self):
         """Handle shutdown event, close db connection."""
         await self._db.close()
         LOGGER.info("cache db connection closed")
@@ -80,7 +79,7 @@ class Cache(object):
             (id, expires, data, checksum) VALUES (?, ?, ?, ?)"""
         await self._db.execute(sql_query, (cache_key, expires, data, checksum))
         await self._db.commit()
-
+    
     @run_periodic(3600)
     async def auto_cleanup(self):
         """ (scheduled) auto cleanup task """
@@ -126,39 +125,26 @@ async def cached_iterator(cache, iter_func, cache_key, expires=(86400*30), check
             cache_result.append(item)
         await cache.set(cache_key, cache_result, checksum, expires)
 
-def use_cache(cache_days=14):
+async def cached(cache, cache_key, coro_func, *args, **kwargs):
+    """Helper method to store results of a coroutine in the cache."""
+    cache_result = await cache.get(cache_key)
+    if cache_result is not None:
+        return cache_result
+    result = await coro_func(*args, **kwargs)
+    await cache.set(cache_key, result)
+    return result
+
+def use_cache(cache_days=14, cache_checksum=None):
     """ decorator that can be used to cache a method's result."""
     def wrapper(func):
         @functools.wraps(func)
         async def wrapped(*args, **kwargs):
-            if kwargs.get("ignore_cache"):
-                return await func(*args, **kwargs)
-            cache_checksum = kwargs.get("cache_checksum")
             method_class = args[0]
             method_class_name = method_class.__class__.__name__
             cache_str = "%s.%s" % (method_class_name, func.__name__)
-            # append args to cache identifier
-            for item in args[1:]:
-                if isinstance(item, dict):
-                    for subkey in sorted(list(item.keys())):
-                        subvalue = item[subkey]
-                        cache_str += ".%s%s" % (subkey, subvalue)
-                else:
-                    cache_str += ".%s" % item
-            # append kwargs to cache identifier
-            for key in sorted(list(kwargs.keys())):
-                if key in ["ignore_cache", "cache_checksum"]:
-                    continue
-                value = kwargs[key]
-                if isinstance(value, dict):
-                    for subkey in sorted(list(value.keys())):
-                        subvalue = value[subkey]
-                        cache_str += ".%s%s" % (subkey, subvalue)
-                else:
-                    cache_str += ".%s%s" % (key, value)
+            cache_str += __cache_id_from_args(*args, **kwargs)
             cache_str = cache_str.lower()
-            cachedata = await method_class.cache.get(cache_str,
-                                                     checksum=cache_checksum)
+            cachedata = await method_class.cache.get(cache_str)
             if cachedata is not None:
                 return cachedata
             else:
@@ -172,3 +158,25 @@ def use_cache(cache_days=14):
                 return result
         return wrapped
     return wrapper
+
+def __cache_id_from_args(*args, **kwargs):
+    ''' parse arguments to build cache id '''
+    cache_str = ''
+    # append args to cache identifier
+    for item in args[1:]:
+        if isinstance(item, dict):
+            for subkey in sorted(list(item.keys())):
+                subvalue = item[subkey]
+                cache_str += ".%s%s" % (subkey, subvalue)
+        else:
+            cache_str += ".%s" % item
+    # append kwargs to cache identifier
+    for key in sorted(list(kwargs.keys())):
+        value = kwargs[key]
+        if isinstance(value, dict):
+            for subkey in sorted(list(value.keys())):
+                subvalue = value[subkey]
+                cache_str += ".%s%s" % (subkey, subvalue)
+        else:
+            cache_str += ".%s%s" % (key, value)
+    return cache_str
index beacf770b952daf5ecf6c709b2ae5945dca41c0c..4c46b585a7af2af9c49011b5e49d3c3dd3bf4542 100755 (executable)
@@ -7,11 +7,25 @@ from typing import List
 import aiosqlite
 import operator
 import logging
+import functools
 
 from .utils import run_periodic, LOGGER, get_sort_name, try_parse_int
 from .models.media_types import MediaType, Artist, Album, Track, Playlist, Radio
 
+def commit_guard(func):
+    """ decorator to guard against multiple db writes """
+    async def wrapped(*args, **kwargs):
+        method_class = args[0]
+        while method_class.commit_guard_active:
+            await asyncio.sleep(0.1)
+        method_class.commit_guard_active = True
+        res = await func(*args, **kwargs)
+        method_class.commit_guard_active = False
+        return res
+    return wrapped
+
 class Database():
+    commit_guard_active = False
 
     def __init__(self, mass):
         self.mass = mass
@@ -20,7 +34,7 @@ class Database():
         self._dbfile = os.path.join(mass.datapath, "database.db")
         logging.getLogger('aiosqlite').setLevel(logging.INFO)
 
-    async def on_shutdown(self, msg, msg_details):
+    async def close(self):
         ''' handle shutdown event, close db connection '''
         await self._db.close()
         LOGGER.info("db connection closed")
@@ -29,12 +43,11 @@ class Database():
         ''' init database '''
         self._db = await aiosqlite.connect(self._dbfile)
         self._db.row_factory = aiosqlite.Row
-        await self.mass.add_event_listener(self.on_shutdown, "shutdown")
 
         await self._db.execute('CREATE TABLE IF NOT EXISTS library_items(item_id INTEGER NOT NULL, provider TEXT NOT NULL, media_type INTEGER NOT NULL, UNIQUE(item_id, provider, media_type));')
 
         await self._db.execute('CREATE TABLE IF NOT EXISTS artists(artist_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, sort_name TEXT, musicbrainz_id TEXT NOT NULL UNIQUE);')            
-        await self._db.execute('CREATE TABLE IF NOT EXISTS albums(album_id INTEGER PRIMARY KEY AUTOINCREMENT, artist_id INTEGER NOT NULL, name TEXT NOT NULL, albumtype TEXT, year INTEGER, version TEXT, UNIQUE(artist_id, name, version, year, albumtype));')
+        await self._db.execute('CREATE TABLE IF NOT EXISTS albums(album_id INTEGER PRIMARY KEY AUTOINCREMENT, artist_id INTEGER NOT NULL, name TEXT NOT NULL, albumtype TEXT, year INTEGER, version TEXT, UNIQUE(artist_id, name, version, year));')
         await self._db.execute('CREATE TABLE IF NOT EXISTS labels(label_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE);')
         await self._db.execute('CREATE TABLE IF NOT EXISTS album_labels(album_id INTEGER, label_id INTEGER, UNIQUE(album_id, label_id));')
 
@@ -175,6 +188,7 @@ class Database():
             return item
         return None
 
+    @commit_guard
     async def add_playlist(self, playlist:Playlist):
         ''' add a new playlist record into table'''
         assert(playlist.name)
@@ -187,10 +201,13 @@ class Database():
                 await self._db.execute(sql_query, (playlist.is_editable, playlist.checksum, playlist_id))
             else:
                 # insert playlist
-                sql_query = 'INSERT OR REPLACE INTO playlists (name, owner, is_editable, checksum) VALUES(?,?,?,?);'
-                await self._db.execute(sql_query, (playlist.name, playlist.owner, playlist.is_editable, playlist.checksum))
-                # get id from newly created item (the safe way)
-                async with self._db.execute('SELECT (playlist_id) FROM playlists WHERE name=? AND owner=?;', (playlist.name,playlist.owner)) as cursor:
+                sql_query = 'INSERT INTO playlists (name, owner, is_editable, checksum) VALUES(?,?,?,?);'
+                async with self._db.execute(sql_query, (playlist.name, playlist.owner, playlist.is_editable, playlist.checksum)) as cursor:
+                    last_row_id = cursor.lastrowid
+                    await self._db.commit()
+                # get id from newly created item
+                sql_query = 'SELECT (playlist_id) FROM playlists WHERE ROWID=?'
+                async with self._db.execute(sql_query, (last_row_id,)) as cursor:
                     playlist_id = await cursor.fetchone()
                     playlist_id = playlist_id[0]
                 LOGGER.debug('added playlist %s to database: %s' %(playlist.name, playlist_id))
@@ -201,6 +218,7 @@ class Database():
             await self._db.commit()
         return playlist_id
 
+    @commit_guard
     async def add_radio(self, radio:Radio):
         ''' add a new radio record into table'''
         assert(radio.name)
@@ -210,10 +228,13 @@ class Database():
                 radio_id = result[0]
             else:
                 # insert radio
-                sql_query = 'INSERT OR REPLACE INTO radios (name) VALUES(?);'
-                await self._db.execute(sql_query, (radio.name,))
-                # get id from newly created item (the safe way)
-                async with self._db.execute('SELECT (radio_id) FROM radios WHERE name=?;', (radio.name,)) as cursor:
+                sql_query = 'INSERT INTO radios (name) VALUES(?);'
+                async with self._db.execute(sql_query, (radio.name,)) as cursor:
+                    last_row_id = cursor.lastrowid
+                    await self._db.commit()
+                # get id from newly created item
+                sql_query = 'SELECT (radio_id) FROM radios WHERE ROWID=?'
+                async with self._db.execute(sql_query, (last_row_id,)) as cursor:
                     radio_id = await cursor.fetchone()
                     radio_id = radio_id[0]
                 LOGGER.debug('added radio station %s to database: %s' %(radio.name, radio_id))
@@ -272,6 +293,7 @@ class Database():
             return item
         return None
 
+    @commit_guard
     async def add_artist(self, artist:Artist):
         ''' add a new artist record into table'''
         artist_id = None
@@ -290,7 +312,6 @@ class Database():
             sql_query = 'INSERT INTO artists (name, sort_name, musicbrainz_id) VALUES(?,?,?);'
             async with self._db.execute(sql_query, (artist.name, artist.sort_name, musicbrainz_id)) as cursor:
                 last_row_id = cursor.lastrowid
-                await self._db.commit()
             # get id from (newly created) item
             async with self._db.execute('SELECT artist_id FROM artists WHERE ROWID=?;', (last_row_id,)) as cursor:
                 artist_id = await cursor.fetchone()
@@ -338,38 +359,37 @@ class Database():
             return item
         return None
 
+    @commit_guard
     async def add_album(self, album:Album):
         ''' add a new album record into table'''
+        assert(album.name and album.artist)
         album_id = None
+        assert(album.artist.provider == 'database')
         # always try to grab existing album with external_id
         album_id = await self.__get_item_by_external_id(album)
         # fallback to matching on artist_id, name and version
         if not album_id:
+            # search exact match first
+            sql_query = 'SELECT album_id FROM albums WHERE artist_id=? AND name=? AND version=? AND year=? AND albumtype=?'
+            async with self._db.execute(sql_query, (album.artist.item_id, album.name, album.version, album.year, album.albumtype)) as cursor:
+                album_id = await cursor.fetchone()
+                if album_id:
+                    album_id = album_id['album_id']
+            # fallback to almost exact match
             sql_query = 'SELECT album_id, year, version, albumtype FROM albums WHERE artist_id=? AND name=?'
             async with self._db.execute(sql_query, (album.artist.item_id, album.name)) as cursor:
                 albums = await cursor.fetchall()
-                # search exact match first
                 for result in albums:
-                    if (result['albumtype'] == album.albumtype and 
-                            result['year'] == album.year and 
-                            result['version'] == album.version):
+                    if ((not album.version and result['year'] == album.year) or 
+                            (album.version and result['version'] == album.version)):
                         album_id = result['album_id']
                         break
-                # fallback to almost exact match
-                if not album_id:
-                    for result in albums:
-                        if (result['albumtype'] == album.albumtype and 
-                                (not album.version and result['year'] == album.year) or 
-                                (album.version and result['version'] == album.version)):
-                            album_id = result['album_id']
-                            break
         if not album_id:
             # insert album
-            sql_query = 'INSERT OR REPLACE INTO albums (artist_id, name, albumtype, year, version) VALUES(?,?,?,?,?);'
+            sql_query = 'INSERT INTO albums (artist_id, name, albumtype, year, version) VALUES(?,?,?,?,?);'
             query_params =  (album.artist.item_id, album.name, album.albumtype, album.year, album.version)
             async with self._db.execute(sql_query, query_params) as cursor:
                 last_row_id = cursor.lastrowid
-                await self._db.commit()
             # get id from newly created item
             sql_query = 'SELECT (album_id) FROM albums WHERE ROWID=?'
             async with self._db.execute(sql_query, (last_row_id,)) as cursor:
@@ -428,13 +448,14 @@ class Database():
             return item
         return None
 
+    @commit_guard
     async def add_track(self, track:Track):
         ''' add a new track record into table'''
         assert(track.name and track.album)
         assert(track.album.provider == 'database')
+        assert(track.artists)
         for artist in track.artists:
             assert(artist.provider == 'database')
-        track_id = None
         # always try to grab existing track with external_id
         track_id = await self.__get_item_by_external_id(track)
         # fallback to matching on album_id, name and version
@@ -455,7 +476,6 @@ class Database():
             query_params = (track.name, track.album.item_id, track.duration, track.version)
             async with self._db.execute(sql_query, query_params) as cursor:
                 last_row_id = cursor.lastrowid
-            await self._db.commit()
             # get id from newly created item (the safe way)
             async with self._db.execute('SELECT track_id FROM tracks WHERE ROWID=?', (last_row_id,)) as cursor:
                 track_id = await cursor.fetchone()
index f7ab51b6de85bb3f770b48c953a7e5b0257ced83..3ec5f741f4384e8d6ca3275de80b071e5eb64a8b 100644 (file)
@@ -17,7 +17,6 @@ import json
 from .utils import run_periodic, LOGGER, IS_HASSIO, try_parse_int
 from .models.media_types import Track
 from .constants import CONF_ENABLED, CONF_URL, CONF_TOKEN, EVENT_PLAYER_CHANGED, EVENT_PLAYER_ADDED, EVENT_HASS_ENTITY_CHANGED
-from .cache import use_cache
 
 CONF_KEY = 'homeassistant'
 CONF_PUBLISH_PLAYERS = "publish_players"
index b2ce0e1c6f6197cc4e59cfa2160d323010bd4096..ebc7e9f222215b7e2e3887ab2b77fd6bb4e28594 100755 (executable)
@@ -4,6 +4,7 @@
 import asyncio
 import os
 import operator
+import concurrent
 from aiohttp import web
 import threading
 import urllib
@@ -129,14 +130,16 @@ class HTTPStreamer():
                     self.mass.run_task(buffer.write(chunk),
                                        wait_for_result=True,
                                        ignore_exception=(BrokenPipeError,
-                                                         ConnectionResetError))
+                                                         ConnectionResetError,
+                                                         concurrent.futures._base.CancelledError))
                 del chunk
             # indicate EOF if no more data
             if not cancelled.is_set():
                 self.mass.run_task(buffer.write_eof(),
                                    wait_for_result=True,
                                    ignore_exception=(BrokenPipeError,
-                                                     ConnectionResetError))
+                                                     ConnectionResetError,
+                                                     concurrent.futures._base.CancelledError))
 
         # start fill buffer task in background
         fill_buffer_thread = threading.Thread(target=fill_buffer)
@@ -201,7 +204,6 @@ class HTTPStreamer():
                         # part is too short after the strip action?!
                         # so we just use the full first part
                         first_part = prev_chunk + chunk
-                        LOGGER.warning("Not enough data after strip action: %s", len(first_part))
                     fade_in_part = first_part[:fade_bytes]
                     remaining_bytes = first_part[fade_bytes:]
                     del first_part
@@ -234,8 +236,9 @@ class HTTPStreamer():
                         # part is too short after the strip action
                         # so we just use the entire original data
                         last_part = prev_chunk + chunk
-                        LOGGER.warning("Not enough data for last_part after strip action: %s", len(last_part))
-                    if not player.queue.crossfade_enabled:
+                        if len(last_part) < fade_bytes:
+                            LOGGER.warning("Not enough data for crossfade: %s", len(last_part))
+                    if not player.queue.crossfade_enabled or len(last_part) < fade_bytes:
                         # crossfading is not enabled so just pass the (stripped) audio data
                         sox_proc.stdin.write(last_part)
                         bytes_written += len(last_part)
@@ -301,17 +304,18 @@ class HTTPStreamer():
                            chunksize=128000,
                            resample=None):
         ''' get audio stream from provider and apply additional effects/processing where/if needed'''
-        # get stream details from provider
-        # sort by quality and check track availability
         streamdetails = None
+        # always request the full db track as there might be other qualities available
         full_track = self.mass.run_task(
                 self.mass.music.track(queue_item.item_id, queue_item.provider, lazy=True),
                 wait_for_result=True)
+        # sort by quality and check track availability
         for prov_media in sorted(full_track.provider_ids,
                                  key=operator.itemgetter('quality'),
                                  reverse=True):
             if not prov_media['provider'] in self.mass.music.providers:
                 continue
+            # get stream details from provider
             streamdetails = self.mass.run_task(self.mass.music.providers[
                 prov_media['provider']].get_stream_details(
                     prov_media['item_id']), wait_for_result=True)
@@ -336,14 +340,15 @@ class HTTPStreamer():
             outputfmt = 'raw -b 32 -c 2 -e signed-integer'
             sox_options += ' rate -v %s' % resample
         streamdetails['sox_options'] = sox_options
-        # determine how to proceed based on input file ype
+        # determine how to proceed based on input file type
         if streamdetails["content_type"] == 'aac':
             # support for AAC created with ffmpeg in between
             args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (
                 streamdetails["path"], outputfmt, sox_options)
             process = subprocess.Popen(args,
                                        shell=True,
-                                       stdout=subprocess.PIPE)
+                                       stdout=subprocess.PIPE,
+                                       bufsize=chunksize)
         elif streamdetails['type'] in ['url', 'file']:
             args = 'sox -t %s "%s" -t %s - %s' % (
                 streamdetails["content_type"], streamdetails["path"],
@@ -351,16 +356,18 @@ class HTTPStreamer():
             args = shlex.split(args)
             process = subprocess.Popen(args,
                                        shell=False,
-                                       stdout=subprocess.PIPE)
+                                       stdout=subprocess.PIPE,
+                                       bufsize=chunksize)
         elif streamdetails['type'] == 'executable':
             args = '%s | sox -t %s - -t %s - %s' % (
                 streamdetails["path"], streamdetails["content_type"],
                 outputfmt, sox_options)
             process = subprocess.Popen(args,
                                        shell=True,
-                                       stdout=subprocess.PIPE)
+                                       stdout=subprocess.PIPE,
+                                       bufsize=chunksize)
         else:
-            LOGGER.warning(f"no streaming options for {queue_item.name}")
+            LOGGER.warning("no streaming options for %s", queue_item.name)
             yield (True, b'')
             return
         # fire event that streaming has started for this track
@@ -368,8 +375,8 @@ class HTTPStreamer():
             self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails))
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
-        bytes_sent = 0
-        while process.poll() == None:
+        prev_chunk = b''
+        while True:
             if cancelled.is_set():
                 # http session ended
                 # send terminate and pick up left over bytes
@@ -378,12 +385,12 @@ class HTTPStreamer():
             chunk = process.stdout.read(chunksize)
             if len(chunk) < chunksize:
                 # last chunk
-                bytes_sent += len(chunk)
-                yield (True, chunk)
+                yield (True, prev_chunk + chunk)
                 break
             else:
-                bytes_sent += len(chunk)
-                yield (False, chunk)
+                if prev_chunk:
+                    yield (False, prev_chunk)
+                prev_chunk = chunk
         # fire event that streaming has ended
         self.mass.run_task(
             self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails))
@@ -452,7 +459,8 @@ class HTTPStreamer():
                          (item_key, loudness))
         self.analyze_jobs.pop(item_key, None)
 
-    def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args,
+    @staticmethod
+    def __crossfade_pcm_parts(fade_in_part, fade_out_part, pcm_args,
                               fade_length):
         ''' crossfade two chunks of audio using sox '''
         # create fade-in part
index 591cb5d0f28030c21aa02b4d2e714f3cad5f297a..0b0d5c1ae3c110276121bd72108ddfbe78ca8184 100755 (executable)
@@ -4,7 +4,7 @@
 import asyncio
 from typing import List
 from ..utils import LOGGER, compare_strings
-from ..cache import use_cache, cached_iterator
+from ..cache import use_cache, cached_iterator, cached
 from ..constants import CONF_ENABLED
 from .media_types import Album, Artist, Track, Playlist, MediaType, Radio
 
@@ -42,7 +42,8 @@ class MusicProvider():
                                                      MediaType.Artist)
         if not item_id:
             # artist not yet in local database so fetch details
-            artist_details = await self.get_artist(prov_item_id)
+            cache_key = f'{self.prov_id}.get_artist.{prov_item_id}'
+            artist_details = await cached(self.cache, cache_key, self.get_artist, prov_item_id )
             if not artist_details:
                 raise Exception('artist not found: %s' % prov_item_id)
             if lazy:
@@ -165,7 +166,8 @@ class MusicProvider():
         if not item_id:
             # album not yet in local database so fetch details
             if not album_details:
-                album_details = await self.get_album(prov_item_id)
+                cache_key = f'{self.prov_id}.get_album.{prov_item_id}'
+                album_details = await cached(self.cache, cache_key, self.get_album, prov_item_id)
             if not album_details:
                 raise Exception('album not found: %s' % prov_item_id)
             if lazy:
@@ -173,6 +175,7 @@ class MusicProvider():
                 album_details.is_lazy = True
                 return album_details
             item_id = await self.add_album(album_details)
+            LOGGER.info("item_id after add_album: %s", item_id)
         return await self.mass.db.album(item_id)
 
     async def add_album(self, album_details) -> int:
@@ -203,7 +206,8 @@ class MusicProvider():
         if not item_id:
             # track not yet in local database so fetch details
             if not track_details:
-                track_details = await self.get_track(prov_item_id)
+                cache_key = f'{self.prov_id}.get_track.{prov_item_id}'
+                track_details = await cached(self.cache, cache_key, self.get_track, prov_item_id)
             if not track_details:
                 raise Exception('track not found: %s' % prov_item_id)
             if lazy:
@@ -224,6 +228,7 @@ class MusicProvider():
             if db_track_artist:
                 track_artists.append(db_track_artist)
         track_details.artists = track_artists
+        # fetch album details
         if not prov_album_id:
             prov_album_id = track_details.album.item_id
         track_details.album = await self.album(prov_album_id, lazy=False)
index c61788ad8f5c253df0c7bfcc7b811bcb91f1d6fd..9789e72f32f4fced3f52faa9354c5b896db9cb2e 100755 (executable)
@@ -9,7 +9,6 @@ import time
 from ..utils import run_periodic, LOGGER, try_parse_int, \
        try_parse_bool, try_parse_float
 from ..constants import EVENT_PLAYER_CHANGED
-from ..cache import use_cache
 from .media_types import Track, MediaType
 from .player_queue import PlayerQueue, QueueItem
 from .playerstate import PlayerState
index e4d2dea334c5117f8b42d576dc745a2549897ffe..e61d00e071c8d5d08ae163e4ccf8dfee5a72c92a 100755 (executable)
@@ -7,7 +7,6 @@ import operator
 import random
 import uuid
 import os
-import pickle
 from enum import Enum
 
 from ..utils import LOGGER, json, filename_from_string, serialize_values
@@ -55,18 +54,12 @@ class PlayerQueue():
         self._last_queue_startindex = 0
         self._next_queue_startindex = 0
         self._last_player_state = PlayerState.Stopped
-        self._save_busy_ = False
         self._last_track = None
-        self.mass.event_loop.create_task(
+        self.mass.run_task(
                 self.mass.add_event_listener(self.on_shutdown, "shutdown"))
         # load previous queue settings from disk
-        self.mass.event_loop.run_in_executor(None, self.__load_from_file)
+        self.mass.run_task(self.__restore_saved_state())
 
-    async def on_shutdown(self, msg, msg_details):
-        """Handle shutdown event, save queue state."""
-        self.__save_to_file()
-        LOGGER.info("queue state saved to file for player %s", self._player.player_id)
-        
     @property
     def shuffle_enabled(self):
         return self._shuffle_enabled
@@ -362,9 +355,8 @@ class PlayerQueue():
             try:
                 await self._player.cmd_queue_update(self._items)
             except NotImplementedError:
-                # not supported by player, use load queue instead
-                LOGGER.debug("cmd_queue_update not supported by player, fallback to cmd_queue_load ")
-                await self._player.cmd_queue_load(self._items)
+                # not supported by player, ignore
+                pass
         self.mass.event_loop.create_task(
             self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict()))
 
@@ -471,42 +463,28 @@ class PlayerQueue():
                 item_index = index
         return item_index
     
-    def __load_from_file(self):
-        ''' try to load the saved queue for this player from file '''
-        player_safe_str = filename_from_string(self._player.player_id)
-        settings_dir = os.path.join(self.mass.datapath, 'queue')
-        player_file = os.path.join(settings_dir, player_safe_str)
-        if os.path.isfile(player_file):
-            try:
-                with open(player_file, 'rb') as f:
-                    data = pickle.load(f)
-                    self._shuffle_enabled = data["shuffle_enabled"]
-                    self._repeat_enabled = data["repeat_enabled"]
-                    self._items = data["items"]
-                    self._cur_index = data["cur_item"]
-                    self._last_queue_startindex = data["last_index"]
-            except Exception as exc:
-                LOGGER.debug("Could not load queue from disk - %s" % str(exc))
+    async def __restore_saved_state(self):
+        ''' try to load the saved queue for this player from cache file '''
+        cache_str = 'queue_%s' % self._player.player_id
+        cache_data = await self.mass.cache.get(cache_str)
+        if cache_data:
+            self._shuffle_enabled = cache_data["shuffle_enabled"]
+            self._repeat_enabled = cache_data["repeat_enabled"]
+            self._items = cache_data["items"]
+            self._cur_index = cache_data["cur_item"]
+            self._next_queue_startindex = cache_data["next_queue_index"]
 
-    def __save_to_file(self):
+    async def on_shutdown(self, msg, msg_details):
+        """Handle shutdown event, save queue state."""
         ''' save current queue settings to file '''
-        if self._save_busy_:
-            return
-        self._save_busy_ = True
-        player_safe_str = filename_from_string(self._player.player_id)
-        settings_dir = os.path.join(self.mass.datapath, 'queue')
-        player_file = os.path.join(settings_dir, player_safe_str)
-        data = {
+        cache_str = 'queue_%s' % self._player.player_id
+        cache_data = {
             "shuffle_enabled": self._shuffle_enabled,
             "repeat_enabled": self._repeat_enabled,
             "items": self._items,
             "cur_item": self._cur_index,
-            "last_index": self._cur_index
+            "next_queue_index": self._next_queue_startindex
         }
-        if not os.path.isdir(settings_dir):
-            os.mkdir(settings_dir)
-        with open(player_file, 'wb') as f:
-            data = pickle.dump(data, f)
-        self._save_busy_ = False
-
-
+        await self.mass.cache.set(cache_str, cache_data)
+        LOGGER.info("queue state saved to file for player %s", self._player.player_id)
+        
\ No newline at end of file
index 60fff45d342d1d2252b643519282b71112fe58ce..24d8704223eb23c672a71eb9de52f55aa88be2ed 100755 (executable)
@@ -6,7 +6,6 @@ from enum import Enum
 from typing import List
 from ..utils import run_periodic, LOGGER
 from ..constants import CONF_ENABLED
-from ..cache import use_cache
 from .player_queue import PlayerQueue
 from .media_types import Track
 from .player import Player
index 4899d2e27af4aecdcedaf67aedf1cb81f5ebb722..85399ea7576b51f01cbd4639f64f71a4cda6f8e4 100755 (executable)
@@ -402,7 +402,10 @@ class MusicManager():
         ]
         cur_db_ids = []
         async for item in music_provider.get_library_albums():
-            db_album = await music_provider.album(item.item_id, lazy=False)
+            
+            db_album = await music_provider.album(item.item_id, album_details=item, lazy=False)
+            if not db_album:
+                LOGGER.error("provider %s album: %s", prov_id, item.__dict__)
             cur_db_ids.append(db_album.item_id)
             if not db_album.item_id in prev_db_ids:
                 await self.mass.db.add_to_library(db_album.item_id,
index b1ae425d1611913c9ad7f0dea37f1d7644ceef42..6d2650758fae831bae29d39ed5730d46cd3c1a2d 100644 (file)
@@ -9,7 +9,6 @@ import time
 import base64
 import taglib
 
-from ..cache import use_cache
 from ..utils import run_periodic, LOGGER, parse_title_and_version
 from ..models import MusicProvider, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
 from ..constants import CONF_ENABLED
index 7a474b9c83653020a29d0c892720391b0c847723..71cd4edd198eb1f4881c28546b1bb6fc720646e9 100644 (file)
@@ -390,8 +390,6 @@ class QobuzProvider(MusicProvider):
         album.name, album.version = parse_title_and_version(
             album_obj['title'], album_obj.get('version'))
         album.artist = await self.__parse_artist(album_obj['artist'])
-        if not album.artist:
-            raise Exception("No album artist ! %s" % album_obj)
         if album_obj.get('product_type', '') == 'single':
             album.albumtype = AlbumType.Single
         elif album_obj.get(
@@ -440,8 +438,6 @@ class QobuzProvider(MusicProvider):
         if track_obj.get(
                 'performer') and not 'Various ' in track_obj['performer']:
             artist = await self.__parse_artist(track_obj['performer'])
-            if not artist:
-                artist = self.get_artist(track_obj['performer']['id'])
             if artist:
                 track.artists.append(artist)
         if not track.artists:
index 7d1e5d4fbc75a14ad7ec83b6beee33c6fb89a7b2..0bf773958120cfc0f00829509508087688a563e9 100644 (file)
@@ -295,8 +295,6 @@ class SpotifyProvider(MusicProvider):
             album.artist = await self.__parse_artist(artist)
             if album.artist:
                 break
-        if not album.artist:
-            raise Exception("No album artist ! %s" % album_obj)
         if album_obj['album_type'] == 'single':
             album.albumtype = AlbumType.Single
         elif album_obj['album_type'] == 'compilation':
@@ -351,8 +349,6 @@ class SpotifyProvider(MusicProvider):
                 track.external_ids.append({key: value})
         if 'album' in track_obj:
             track.album = await self.__parse_album(track_obj['album'])
-            if not track.album:
-                track.album = await self.get_album(track_obj['album']['id'])
         if track_obj.get('copyright'):
             track.metadata["copyright"] = track_obj['copyright']
         if track_obj.get('explicit'):
index d54a2cafca3fc42a83aafe189d2f787375837c23..d7988a8e17e11306019962a45cca39205e0aa434 100644 (file)
@@ -10,7 +10,6 @@ from asyncio_throttle import Throttler
 import json
 import aiohttp
 
-from ..cache import use_cache
 from ..utils import run_periodic, LOGGER
 from ..models import MusicProvider, MediaType, TrackQuality, Radio
 from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, CONF_TYPE_PASSWORD
@@ -31,10 +30,9 @@ class TuneInProvider(MusicProvider):
 
     def __init__(self, mass, conf):
         ''' Support for streaming radio provider TuneIn '''
+        super().__init__(mass)
         self.name = PROV_NAME
         self.prov_id = PROV_ID
-        self.mass = mass
-        self.cache = mass.cache
         if not conf[CONF_USERNAME] or not conf[CONF_PASSWORD]:
             raise Exception("Username and password must not be empty")
         self._username = conf[CONF_USERNAME]
@@ -60,7 +58,7 @@ class TuneInProvider(MusicProvider):
     async def get_radios(self):
         ''' get favorited/library radio stations '''
         params = {"c": "presets"}
-        result = await self.__get_data("Browse.ashx", params, ignore_cache=True)
+        result = await self.__get_data("Browse.ashx", params)
         if result and "body" in result:
             for item in result["body"]:
                 # TODO: expand folders
@@ -72,7 +70,7 @@ class TuneInProvider(MusicProvider):
         ''' get radio station details '''
         radio = None
         params = {"c": "composite", "detail": "listing", "id": radio_id}
-        result = await self.__get_data("Describe.ashx", params, ignore_cache=True)
+        result = await self.__get_data("Describe.ashx", params)
         if result and result.get("body") and result["body"][0].get("children"):
             item = result["body"][0]["children"][0]
             radio = await self.__parse_radio(item)
@@ -139,8 +137,7 @@ class TuneInProvider(MusicProvider):
                 }
         return {}
         
-    @use_cache(7)
-    async def __get_data(self, endpoint, params={}, ignore_cache=False, cache_checksum=None):
+    async def __get_data(self, endpoint, params={}):
         ''' get data from api'''
         url = 'https://opml.radiotime.com/%s' % endpoint
         params['render'] = 'json'