some code cleanup
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Fri, 25 Oct 2019 11:39:11 +0000 (13:39 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Fri, 25 Oct 2019 11:39:11 +0000 (13:39 +0200)
16 files changed:
music_assistant/__init__.py
music_assistant/constants.py
music_assistant/homeassistant.py
music_assistant/http_streamer.py
music_assistant/models/musicprovider.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/music_manager.py
music_assistant/musicproviders/qobuz.py
music_assistant/musicproviders/spotify.py
music_assistant/playerproviders/chromecast.py
music_assistant/playerproviders/sonos.py
music_assistant/playerproviders/squeezebox.py
music_assistant/playerproviders/webplayer.py
music_assistant/utils.py
music_assistant/web.py

index f1bf784c5742ac44d390f9363bdfe6a35859f90c..d42c2fe71f78956882bae551fba570e0de287d1d 100644 (file)
@@ -83,20 +83,16 @@ class MusicAssistant():
         ''' remove callback from our event listeners '''
         self.event_listeners.pop(cb_id, None)
 
-    def create_task(self, corofcn, wait_for_result=False, ignore_exception=None):
-        ''' helper to create a new task on the main event loop '''
+    def run_task(self, corofcn, wait_for_result=False, ignore_exception=None):
+        ''' helper to run a task on the main event loop from another thread '''
         if threading.current_thread() is threading.main_thread():
-            if wait_for_result:
-                raise Exception("can not wait for result in main event loop!")
-            return self.event_loop.create_task(corofcn)
-        else:
-            # threadsafe
-            future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop)
-            if wait_for_result:
-                try:
-                    return future.result()
-                except Exception as exc:
-                    if ignore_exception and isinstance(exc, ignore_exception):
-                        return None
-                    raise exc
-            return future
+            raise Exception("Can not be called from main event loop!")
+        future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop)
+        if wait_for_result:
+            try:
+                return future.result()
+            except Exception as exc:
+                if ignore_exception and isinstance(exc, ignore_exception):
+                    return None
+                raise exc
+        return future
index 9c438c89077448652da3354799e15f93a617cd14..4edf53a7d402b37f633035345ee65410424b47c6 100755 (executable)
@@ -25,3 +25,5 @@ EVENT_CONFIG_CHANGED = "config changed"
 EVENT_PLAYBACK_STARTED = "playback started"
 EVENT_PLAYBACK_STOPPED = "playback stopped"
 EVENT_HASS_ENTITY_CHANGED = "hass entity changed"
+EVENT_MUSIC_SYNC_STARTED = "music sync started"
+EVENT_MUSIC_SYNC_COMPLETED = "music sync completed"
index 6c6cdc822d26feee5e23f94b162b744efc15c1fa..6508639c88254f890604ad07354f7ea8e744ef70 100644 (file)
@@ -14,7 +14,7 @@ from aiocometd import Client, ConnectionType, Extension
 import copy
 import slugify as slug
 import json
-from .utils import run_periodic, LOGGER, parse_track_title, try_parse_int
+from .utils import run_periodic, LOGGER, IS_HASSIO, parse_track_title, try_parse_int
 from .models.media_types import Track
 from .constants import CONF_ENABLED, CONF_URL, CONF_TOKEN, EVENT_PLAYER_CHANGED, EVENT_PLAYER_ADDED, EVENT_HASS_ENTITY_CHANGED
 from .cache import use_cache
@@ -23,13 +23,11 @@ CONF_KEY = 'homeassistant'
 CONF_PUBLISH_PLAYERS = "publish_players"
 
 ### auto detect hassio for auto config ####
-if os.path.isfile('/data/options.json'):
-    IS_HASSIO = True
+if IS_HASSIO:
     CONFIG_ENTRIES = [
         (CONF_ENABLED, False, CONF_ENABLED),
         (CONF_PUBLISH_PLAYERS, True, 'hass_publish')]
 else:
-    IS_HASSIO = False
     CONFIG_ENTRIES = [
         (CONF_ENABLED, False, CONF_ENABLED),
         (CONF_URL, 'localhost', 'hass_url'), 
@@ -81,10 +79,10 @@ class HomeAssistant():
             return
         self.http_session = aiohttp.ClientSession(
                 loop=self.mass.event_loop, connector=aiohttp.TCPConnector())
-        self.mass.create_task(self.__hass_websocket())
+        self.mass.event_loop.create_task(self.__hass_websocket())
         await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_CHANGED)
         await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_ADDED)
-        self.mass.create_task(self.__get_sources())
+        self.mass.event_loop.create_task(self.__get_sources())
 
     async def get_state_async(self, entity_id, attribute='state'):
         ''' get state of a hass entity (async)'''
@@ -105,7 +103,7 @@ class HomeAssistant():
             else:
                 return state_obj
         else:
-            self.mass.create_task(self.__request_state(entity_id))
+            self.mass.event_loop.create_task(self.__request_state(entity_id))
             return None
 
     async def __request_state(self, entity_id):
@@ -113,8 +111,7 @@ class HomeAssistant():
         state_obj = await self.__get_data('states/%s' % entity_id)
         if 'state' in state_obj:
             self._tracked_entities[entity_id] = state_obj
-            self.mass.create_task(
-                self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj))
+            await self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj)
     
     async def mass_event(self, msg, msg_details):
         ''' received event from mass '''
@@ -126,7 +123,7 @@ class HomeAssistant():
         if event_type == 'state_changed':
             if event_data['entity_id'] in self._tracked_entities:
                 self._tracked_entities[event_data['entity_id']] = event_data['new_state']
-                self.mass.create_task(
+                self.mass.event_loop.create_task(
                     self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, event_data))
         elif event_type == 'call_service' and event_data['domain'] == 'media_player':
             await self.__handle_player_command(event_data['service'], event_data['service_data'])
index 412bc6ec662629f0163a70277e9f9cf405c8fc16..ca19b1930ec03d306c199d7bea5d3cfdaeea26ba 100755 (executable)
@@ -14,6 +14,7 @@ import io
 import aiohttp
 import subprocess
 import gc
+import shlex
 
 from .constants import EVENT_STREAM_STARTED, EVENT_STREAM_ENDED
 from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
@@ -31,9 +32,7 @@ class HTTPStreamer():
 
     async def setup(self):
         ''' async initialize of module '''
-        pass
-        # self.mass.create_task(
-        #         asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093))
+        pass # we have nothing to initialize
         
     async def stream(self, http_request):
         ''' 
@@ -78,7 +77,7 @@ class HTTPStreamer():
                 # we must consume the data to prevent hanging subprocess instances
                 continue
             # put chunk in buffer
-            self.mass.create_task(
+            self.mass.run_task(
                     buffer.write(audio_chunk), wait_for_result=True, 
                         ignore_exception=(BrokenPipeError,ConnectionResetError))
         # all chunks received: streaming finished
@@ -86,7 +85,7 @@ class HTTPStreamer():
             LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
         else:
             # indicate EOF if no more data
-            self.mass.create_task(
+            self.mass.run_task(
                     buffer.write_eof(), wait_for_result=True, 
                         ignore_exception=(BrokenPipeError,ConnectionResetError))
             LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
@@ -105,7 +104,8 @@ class HTTPStreamer():
         pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
         args = 'sox -t %s - -t flac -C 0 -' % pcm_args
         # start sox process
-        sox_proc = subprocess.Popen(args, shell=True, 
+        args = shlex.split(args)
+        sox_proc = subprocess.Popen(args, shell=False, 
             stdout=subprocess.PIPE, stdin=subprocess.PIPE)
 
         def fill_buffer():
@@ -115,12 +115,12 @@ class HTTPStreamer():
                 if not chunk:
                     break
                 if chunk and not cancelled.is_set():
-                    self.mass.create_task(buffer.write(chunk), 
+                    self.mass.run_task(buffer.write(chunk), 
                         wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError))
                 del chunk
             # indicate EOF if no more data
             if not cancelled.is_set():
-                self.mass.create_task(buffer.write_eof(), 
+                self.mass.run_task(buffer.write_eof(), 
                     wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError))
         # start fill buffer task in background
         fill_buffer_thread = threading.Thread(target=fill_buffer)
@@ -270,7 +270,7 @@ class HTTPStreamer():
         # sort by quality and check track availability
         for prov_media in sorted(queue_item.provider_ids, 
                 key=operator.itemgetter('quality'), reverse=True):
-            streamdetails = self.mass.create_task(
+            streamdetails = self.mass.run_task(
                     self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), 
                     wait_for_result=True)
             if streamdetails:
@@ -298,16 +298,22 @@ class HTTPStreamer():
         if streamdetails["content_type"] == 'aac':
             # support for AAC created with ffmpeg in between
             args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_options)
-        elif streamdetails['type'] == 'url':
+            process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
+        elif streamdetails['type'] in ['url', 'file']:
             args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"], 
                     streamdetails["path"], outputfmt, sox_options)
+            args = shlex.split(args)
+            process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE)
         elif streamdetails['type'] == 'executable':
             args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], 
                     streamdetails["content_type"], outputfmt, sox_options)
-        # start sox process
-        process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
+            process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
+        else:
+            LOGGER.warning(f"no streaming options for {queue_item.name}")
+            yield (True, b'')
+            return
         # fire event that streaming has started for this track
-        self.mass.create_task(
+        self.mass.run_task(
                 self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails))
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
@@ -328,7 +334,7 @@ class HTTPStreamer():
                 bytes_sent += len(chunk)
                 yield (False, chunk)
         # fire event that streaming has ended
-        self.mass.create_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails))
+        self.mass.run_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails))
         # send task to background to analyse the audio
         if queue_item.media_type == MediaType.Track:
             self.mass.event_loop.run_in_executor(None, self.__analyze_audio, streamdetails)
@@ -337,7 +343,7 @@ class HTTPStreamer():
         ''' get player specific sox effect options '''
         sox_options = []
         # volume normalisation
-        gain_correct = self.mass.create_task(
+        gain_correct = self.mass.run_task(
                 self.mass.players.get_gain_correct(
                     player.player_id, streamdetails["item_id"], streamdetails["provider"]), 
                 wait_for_result=True)
@@ -364,7 +370,7 @@ class HTTPStreamer():
         if item_key in self.analyze_jobs:
             return # prevent multiple analyze jobs for same track
         self.analyze_jobs[item_key] = True
-        track_loudness = self.mass.create_task(self.mass.db.get_track_loudness(
+        track_loudness = self.mass.run_task(self.mass.db.get_track_loudness(
                 streamdetails["item_id"], streamdetails["provider"]), wait_for_result=True)
         if track_loudness == None:
             # only when needed we do the analyze stuff
@@ -380,7 +386,7 @@ class HTTPStreamer():
             meter = pyloudnorm.Meter(rate) # create BS.1770 meter
             loudness = meter.integrated_loudness(data) # measure loudness
             del data
-            self.mass.create_task(
+            self.mass.run_task(
                 self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness))
             del audio_data
             LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness))
@@ -391,18 +397,21 @@ class HTTPStreamer():
         # create fade-in part
         fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
         args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length)
-        process = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE)
+        args = shlex.split(args)
+        process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE)
         process.communicate(fade_in_part)
         # create fade-out part
         fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
         args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length)
-        process = subprocess.Popen(args, shell=True,
+        args = shlex.split(args)
+        process = subprocess.Popen(args, shell=False,
                 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
         process.communicate(fade_out_part)
         # create crossfade using sox and some temp files
         # TODO: figure out how to make this less complex and without the tempfiles
         args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
-        process = subprocess.Popen(args, shell=True,
+        args = shlex.split(args)
+        process = subprocess.Popen(args, shell=False,
                 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
         crossfade_part, stderr = process.communicate()
         fadeinfile.close()
index 0ed1376bbc0ea56ed0c30b9314db991c2a93c0d3..67192c7480edf8a31f94d67467e681882123887c 100755 (executable)
@@ -278,27 +278,32 @@ class MusicProvider():
 
     async def search(self, searchstring, media_types=List[MediaType], limit=5):
         ''' perform search on the provider '''
-        raise NotImplementedError
+        return {
+            "artists": [],
+            "albums": [],
+            "tracks": [],
+            "playlists": []
+        }
     
     async def get_library_artists(self) -> List[Artist]:
         ''' retrieve library artists from the provider '''
-        raise NotImplementedError
+        return []
     
     async def get_library_albums(self) -> List[Album]:
         ''' retrieve library albums from the provider '''
-        raise NotImplementedError
+        return []
 
     async def get_library_tracks(self) -> List[Track]:
         ''' retrieve library tracks from the provider '''
-        raise NotImplementedError
+        return []
 
     async def get_playlists(self) -> List[Playlist]:
         ''' retrieve library/subscribed playlists from the provider '''
-        raise NotImplementedError
+        return []
 
     async def get_radios(self) -> List[Radio]:
         ''' retrieve library/subscribed radio stations from the provider '''
-        raise NotImplementedError
+        return []
 
     async def get_artist(self, prov_item_id) -> Artist:
         ''' get full artist details by id '''
@@ -356,68 +361,7 @@ class MusicProvider():
         ''' return the content type for the given track when it will be streamed'''
         raise NotImplementedError
     
-    async def get_stream(self, track_id):
-        ''' get audio stream for a track '''
-        raise NotImplementedError
-    
-
-class PlayerProvider():
-    ''' 
-        Model for a Playerprovider
-        Common methods usable for every provider
-        Provider specific __get methods shoud be overriden in the provider specific implementation
-    '''
-    name = 'My great Musicplayer provider' # display name
-    prov_id = 'my_provider' # used as id
-    icon = ''
-
-    def __init__(self, mass):
-        self.mass = mass
-
-    ### Common methods and properties ####
-
-    async def players(self):
-        ''' return all players for this provider '''
-        return await self.mass.provider_players(self.prov_id)
-    
-    async def get_player(self, player_id):
-        ''' return player by id '''
-        return await self.mass.get_player(player_id)
-
-    async def add_player(self, player_id, name='', is_group=False):
-        ''' register a new player '''
-        return await self.mass.players.add_player(player_id, 
-                self.prov_id, name=name, is_group=is_group)
-
-    async def remove_player(self, player_id):
-        ''' remove a player '''
-        return await self.mass.players.remove_player(player_id)
-
-    ### Provider specific implementation #####
-
-    async def player_config_entries(self):
-        ''' get the player config entries for this provider (list with key/value pairs)'''
-        return [
-            (CONF_ENABLED, True, CONF_ENABLED)
-            ]
-
-    async def play_media(self, player_id, media_items:List[Track], queue_opt='play'):
-        ''' 
-            play media on a player
-            params:
-            - player_id: id of the player
-            - media_items: List of Tracks to play, each Track will contain uri attribute (e.g. spotify:track:1234 or http://pathtostream)
-            - queue_opt: 
-                replace: replace whatever is currently playing with this media
-                next: the given media will be played after the currently playing track
-                add: add to the end of the queue
-                play: keep existing queue but play the given item(s) now first
-        '''
-        raise NotImplementedError
-
-    async def player_command(self, player_id, cmd:str, cmd_args=None):
-        ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
+    async def get_stream_details(self, track_id):
+        ''' get streamdetails for a track '''
         raise NotImplementedError
-
-
-
+    
\ No newline at end of file
index 9ee63171e45adcf198697183563805761d1b2699..be284b976e3346db2589ff6250306a65d1d5cba1 100755 (executable)
@@ -147,7 +147,7 @@ class Player():
         ''' [PROTECTED] set (real) name of this player '''
         if name != self._name:
             self._name = name
-            self.mass.create_task(self.update())
+            self.mass.event_loop.create_task(self.update())
 
     @property
     def is_group(self):
@@ -177,25 +177,25 @@ class Player():
         ''' [PROTECTED] set group_childs property of this player '''
         if group_childs != self._group_childs:
             self._group_childs = group_childs
-            self.mass.create_task(self.update())
+            self.mass.event_loop.create_task(self.update())
             for child_player_id in group_childs:
-                self.mass.create_task(
+                self.mass.event_loop.create_task(
                     self.mass.players.trigger_update(child_player_id))
 
     def add_group_child(self, child_player_id):
         ''' add player as child to this group player '''
         if not child_player_id in self._group_childs:
             self._group_childs.append(child_player_id)
-            self.mass.create_task(self.update())
-            self.mass.create_task(
+            self.mass.event_loop.create_task(self.update())
+            self.mass.event_loop.create_task(
                     self.mass.players.trigger_update(child_player_id))
 
     def remove_group_child(self, child_player_id):
         ''' remove player as child from this group player '''
         if child_player_id in self._group_childs:
             self._group_childs.remove(child_player_id)
-            self.mass.create_task(self.update())
-            self.mass.create_task(
+            self.mass.event_loop.create_task(self.update())
+            self.mass.event_loop.create_task(
                 self.mass.players.trigger_update(child_player_id))
 
     @property
@@ -215,7 +215,7 @@ class Player():
         ''' [PROTECTED] set state property of this player '''
         if state != self._state:
             self._state = state
-            self.mass.create_task(self.update(update_queue=True))
+            self.mass.event_loop.create_task(self.update(update_queue=True))
 
     @property
     def powered(self):
@@ -242,7 +242,7 @@ class Player():
         ''' [PROTECTED] set (real) power state for this player '''
         if powered != self._powered:
             self._powered = powered
-            self.mass.create_task(self.update())
+            self.mass.event_loop.create_task(self.update())
 
     @property
     def cur_time(self):
@@ -262,7 +262,7 @@ class Player():
         if cur_time != self._cur_time:
             self._cur_time = cur_time
             self._media_position_updated_at = time.time()
-            self.mass.create_task(self.update(update_queue=True))
+            self.mass.event_loop.create_task(self.update(update_queue=True))
 
     @property
     def media_position_updated_at(self):
@@ -284,7 +284,7 @@ class Player():
         ''' [PROTECTED] set cur_uri (uri loaded in player) property of this player '''
         if cur_uri != self._cur_uri:
             self._cur_uri = cur_uri
-            self.mass.create_task(self.update(update_queue=True))
+            self.mass.event_loop.create_task(self.update(update_queue=True))
 
     @property
     def volume_level(self):
@@ -316,10 +316,10 @@ class Player():
         volume_level = try_parse_int(volume_level)
         if volume_level != self._volume_level:
             self._volume_level = volume_level
-            self.mass.create_task(self.update())
+            self.mass.event_loop.create_task(self.update())
             # trigger update on group player
             for group_parent_id in self.group_parents:
-                self.mass.create_task(
+                self.mass.event_loop.create_task(
                         self.mass.players.trigger_update(group_parent_id))
 
     @property
@@ -333,7 +333,7 @@ class Player():
         is_muted = try_parse_bool(is_muted)
         if is_muted != self._muted:
             self._muted = is_muted
-            self.mass.create_task(self.update())
+            self.mass.event_loop.create_task(self.update())
 
     @property
     def enabled(self):
index ed90f345fb3400c19624d101b197b40b6262d1ad..a04f26127f7470fa1e66b626308ae42dd097d387 100755 (executable)
@@ -151,11 +151,11 @@ class PlayerQueue():
             # shuffle requested
             self._shuffle_enabled = True
             await self.load(self._items)
-            self.mass.create_task(self._player.update())
+            self.mass.event_loop.create_task(self._player.update())
         elif self._shuffle_enabled and not enable_shuffle:
             self._shuffle_enabled = False
             # TODO: Unshuffle the list ?
-            self.mass.create_task(self._player.update())
+            self.mass.event_loop.create_task(self._player.update())
     
     async def next(self):
         ''' request next track in queue '''
@@ -301,11 +301,9 @@ class PlayerQueue():
             # account for track changing state so trigger track change after 1 second
             if self._last_track and self._last_track.streamdetails:
                 self._last_track.streamdetails["seconds_played"] = self._last_item_time
-                self.mass.create_task(
-                    self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails))
+                await self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails)
             if new_track and new_track.streamdetails:
-                self.mass.create_task(
-                    self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails))
+                await self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails)
                 self._last_track = new_track
             self.mass.event_loop.run_in_executor(None, self.__save_to_file)
         if self._last_player_state != self._player.state:
index 79a7df00b776c1e6e8a030040b0bd66b542a041f..641eab9a1812ea5855feb46c8016e40cc273fd3e 100755 (executable)
@@ -7,9 +7,9 @@ import toolz
 import operator
 import os
 
-from .utils import run_periodic, LOGGER, try_supported, load_provider_modules
+from .utils import run_periodic, LOGGER, load_provider_modules
 from .models.media_types import MediaType, Track, Artist, Album, Playlist, Radio
-from .constants import CONF_KEY_MUSICPROVIDERS
+from .constants import CONF_KEY_MUSICPROVIDERS, EVENT_MUSIC_SYNC_STARTED, EVENT_MUSIC_SYNC_COMPLETED
 
 
 class MusicManager():
@@ -27,7 +27,7 @@ class MusicManager():
         for prov in self.providers.values():
             await prov.setup()
         # schedule sync task
-        self.mass.create_task(self.sync_music_providers())
+        self.mass.event_loop.create_task(self.sync_music_providers())
 
     async def item(self, item_id, media_type:MediaType, provider='database', lazy=True):
         ''' get single music item by id and media type'''
@@ -254,21 +254,25 @@ class MusicManager():
         # actually add the tracks to the playlist on the provider
         await self.providers[playlist_prov['provider']].add_playlist_tracks(playlist_prov['item_id'], track_ids_to_add)
         # schedule sync
-        self.mass.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id']))
+        self.mass.event_loop.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id']))
 
     @run_periodic(3600)
     async def sync_music_providers(self):
         ''' periodic sync of all music providers '''
         if self.sync_running:
             return
+        LOGGER.info("Music provider sync started")
         for prov_id in self.providers.keys():
             self.sync_running = prov_id
+            await self.mass.signal_event(EVENT_MUSIC_SYNC_STARTED, prov_id)
             # sync library items for each provider (if supported)
-            await try_supported(self.sync_library_artists(prov_id))
-            await try_supported(self.sync_library_albums(prov_id))
-            await try_supported(self.sync_library_tracks(prov_id))
-            await try_supported(self.sync_playlists(prov_id))
-            await try_supported(self.sync_radios(prov_id))
+            await self.sync_library_artists(prov_id)
+            await self.sync_library_albums(prov_id)
+            await self.sync_library_tracks(prov_id)
+            await self.sync_playlists(prov_id)
+            await self.sync_radios(prov_id)
+        LOGGER.info("Music provider sync completed")
+        await self.mass.signal_event(EVENT_MUSIC_SYNC_COMPLETED, None)
         self.sync_running = None
         
     async def sync_library_artists(self, prov_id):
index b1c850daca498a32adecef6702e634fa709f054f..9fae439f94e01d90ce0d9faff62d405b78f49b79 100644 (file)
@@ -332,8 +332,7 @@ class QobuzProvider(MusicProvider):
         ''' parse qobuz album object to generic layout '''
         album = Album()
         if not album_obj.get('id') or not album_obj["streamable"] or not album_obj["displayable"]:
-            # some safety checks
-            LOGGER.warning("invalid/unavailable album found: %s" % album_obj.get('id'))
+            # do not return unavailable items
             return None
         album.item_id = album_obj['id']
         album.provider = self.prov_id
@@ -378,8 +377,7 @@ class QobuzProvider(MusicProvider):
         ''' parse qobuz track object to generic layout '''
         track = Track()
         if not track_obj.get('id') or not track_obj["streamable"] or not track_obj["displayable"]:
-            # some safety checks
-            LOGGER.warning("invalid/unavailable track found: %s - %s" % (track_obj.get('id'), track_obj.get('name')))
+            # do not return unavailable items
             return None
         track.item_id = track_obj['id']
         track.provider = self.prov_id
index 866f05268f1ea90d9e3885ba92fb2e54776d20bd..0e2a266235cd0291953cdd0a56282ffdc93a662a 100644 (file)
@@ -8,11 +8,10 @@ import sys
 import time
 import concurrent
 from asyncio_throttle import Throttler
-import json
 import aiohttp
 
 from ..cache import use_cache
-from ..utils import run_periodic, LOGGER, parse_track_title
+from ..utils import run_periodic, LOGGER, parse_track_title, json
 from ..app_vars import get_app_var
 from ..models import MusicProvider, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
 from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, CONF_TYPE_PASSWORD
@@ -246,8 +245,10 @@ class SpotifyProvider(MusicProvider):
 
     async def get_stream_details(self, track_id):
         ''' return the content details for the given track when it will be streamed'''
+        # make sure there is a valid token in cache
+        await self.get_token()
         spotty = self.get_spotty_binary()
-        spotty_exec = "%s -n temp -u %s -p %s --pass-through --single-track %s" %(spotty, self._username, self._password, track_id)
+        spotty_exec = '%s -n temp -c "%s" --pass-through --single-track %s' %(spotty, self.mass.datapath, track_id)
         return {
             "type": "executable",
             "path": spotty_exec,
@@ -328,7 +329,7 @@ class SpotifyProvider(MusicProvider):
         if 'track' in track_obj:
             track_obj = track_obj['track']
         if track_obj['is_local'] or not track_obj['id'] or not track_obj['is_playable']:
-            LOGGER.warning("invalid/unavailable track found: %s - %s" % (track_obj.get('id'), track_obj.get('name')))
+            # do not return unavailable items
             return None
         track = Track()
         track.item_id = track_obj['id']
@@ -388,42 +389,8 @@ class SpotifyProvider(MusicProvider):
         tokeninfo = {}
         if not self._username or not self._password:
             return tokeninfo
-        # try with spotipy-token module first, fallback to spotty
-        try:
-            import spotify_token as st
-            data = st.start_session(self._username, self._password)
-            if data and len(data) == 2:
-                tokeninfo = {"accessToken": data[0], "expiresIn": data[1] - int(time.time()), "expiresAt":data[1] }
-        except Exception as exc:
-            LOGGER.debug(exc)
-        if not tokeninfo:
-            # fallback to spotty approach
-            import subprocess
-            scopes = [
-                "user-read-playback-state",
-                "user-read-currently-playing",
-                "user-modify-playback-state",
-                "playlist-read-private",
-                "playlist-read-collaborative",
-                "playlist-modify-public",
-                "playlist-modify-private",
-                "user-follow-modify",
-                "user-follow-read",
-                "user-library-read",
-                "user-library-modify",
-                "user-read-private",
-                "user-read-email",
-                "user-read-birthdate",
-                "user-top-read"]
-            scope = ",".join(scopes)
-            args = [self.get_spotty_binary(), "-t", "--client-id", get_app_var(2), "--scope", scope, "-n", "temp-spotty", "-u", self._username, "-p", self._password, "--disable-discovery"]
-            spotty = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-            stdout, stderr = spotty.communicate()
-            result = json.loads(stdout)
-            # transform token info to spotipy compatible format
-            if result and "accessToken" in result:
-                tokeninfo = result
-                tokeninfo['expiresAt'] = tokeninfo['expiresIn'] + int(time.time())
+        # retrieve token with spotty
+        tokeninfo = await self.mass.event_loop.run_in_executor(None, self.__get_token)
         if tokeninfo:
             self.__auth_token = tokeninfo
             self.sp_user = await self.__get_data("me")
@@ -433,6 +400,44 @@ class SpotifyProvider(MusicProvider):
             raise Exception("Can't get Spotify token for user %s" % self._username)
         return tokeninfo
 
+    def __get_token(self):
+        ''' get spotify auth token with spotty bin '''
+        # get token with spotty
+        scopes = [
+            "user-read-playback-state",
+            "user-read-currently-playing",
+            "user-modify-playback-state",
+            "playlist-read-private",
+            "playlist-read-collaborative",
+            "playlist-modify-public",
+            "playlist-modify-private",
+            "user-follow-modify",
+            "user-follow-read",
+            "user-library-read",
+            "user-library-modify",
+            "user-read-private",
+            "user-read-email",
+            "user-read-birthdate",
+            "user-top-read"]
+        scope = ",".join(scopes)
+        args = [self.get_spotty_binary(), "-t",
+            "--client-id", get_app_var(2), 
+            "--scope", scope, 
+            "-n", "temp-spotty", 
+            "-u", self._username, 
+            "-p", self._password,
+            "-c", self.mass.datapath,
+            "--disable-discovery"]
+        import subprocess
+        spotty = subprocess.Popen(args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
+        stdout, stderr = spotty.communicate()
+        result = json.loads(stdout)
+        # transform token info to spotipy compatible format
+        if result and "accessToken" in result:
+            tokeninfo = result
+            tokeninfo['expiresAt'] = tokeninfo['expiresIn'] + int(time.time())
+        return tokeninfo
+
     async def __get_all_items(self, endpoint, params={}, limit=0, offset=0, cache_checksum=None):
         ''' get all items from a paged list '''
         if not cache_checksum:
index e2a01da664581a79664b8482969731cdc6831932..eb5ad69e53c2f8fffe4d837a46bcacf5f3dfa1ac 100644 (file)
@@ -213,7 +213,7 @@ class ChromecastPlayer(Player):
             self.cur_uri = mediastatus.content_id
             self.cur_time = mediastatus.adjusted_current_time
         if self._state == PlayerState.Playing and self.__cc_report_progress_task == None:
-            self.__cc_report_progress_task = self.mass.create_task(self.__report_progress())
+            self.__cc_report_progress_task = self.mass.event_loop.create_task(self.__report_progress())
 
 class ChromecastProvider(PlayerProvider):
     ''' support for ChromeCast Audio '''
@@ -228,7 +228,7 @@ class ChromecastProvider(PlayerProvider):
 
     async def setup(self):
         ''' perform async setup '''
-        self.mass.create_task(
+        self.mass.event_loop.create_task(
                 self.__periodic_chromecast_discovery())
 
     async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
@@ -259,7 +259,7 @@ class ChromecastProvider(PlayerProvider):
             if not player.cc.socket_client or not player.cc.socket_client.is_connected:
                 # cleanup cast object
                 del player.cc
-                self.mass.create_task(self.remove_player(player.player_id))
+                self.mass.run_task(self.remove_player(player.player_id))
         # search for available chromecasts
         from pychromecast.discovery import start_discovery, stop_discovery
         def discovered_callback(name):
@@ -302,7 +302,7 @@ class ChromecastProvider(PlayerProvider):
         chromecast.register_status_listener(status_listener)
         chromecast.media_controller.register_status_listener(status_listener)
         player.cc.wait()
-        self.mass.create_task(self.add_player(player))
+        self.mass.run_task(self.add_player(player))
         if player.mz:
             player.mz.update_members()
 
@@ -319,11 +319,11 @@ class StatusListener:
         self.player_id = player_id
     def new_cast_status(self, status):
         ''' chromecast status changed (like volume etc.)'''
-        self.mass.create_task(
+        self.mass.run_task(
                 self.__handle_callback(caststatus=status))
     def new_media_status(self, status):
         ''' mediacontroller has new state '''
-        self.mass.create_task(
+        self.mass.run_task(
                 self.__handle_callback(mediastatus=status))
     def new_connection_status(self, status):
         ''' will be called when the connection changes '''
index ba658f70fd31ffe3d2be13372b6b783c625d52b5..b77f32bec124f4dacc0d8620db73727eee5c8201 100644 (file)
@@ -114,7 +114,7 @@ class SonosPlayer(Player):
             await asyncio.sleep(1)
         self.__sonos_report_progress_task = None
     
-    def _update_state(self, event=None):
+    async def update_state(self, event=None):
         ''' update state, triggerer by event '''
         if event:
             variables = event.variables
@@ -141,7 +141,7 @@ class SonosPlayer(Player):
         rel_time = self.__timespan_secs(position_info.get("RelTime"))
         self.cur_time = rel_time
         if self._state == PlayerState.Playing and self.__sonos_report_progress_task == None:
-            self.__sonos_report_progress_task = self.mass.create_task(self.__report_progress())
+            self.__sonos_report_progress_task = self.mass.event_loop.create_task(self.__report_progress())
 
     @staticmethod
     def __convert_state(sonos_state):
@@ -173,7 +173,7 @@ class SonosProvider(PlayerProvider):
 
     async def setup(self):
         ''' perform async setup '''
-        self.mass.create_task(
+        self.mass.event_loop.create_task(
                 self.__periodic_discovery())
 
     @run_periodic(1800)
@@ -196,7 +196,7 @@ class SonosProvider(PlayerProvider):
         # remove any disconnected players...
         for player in self.players:
             if not player.is_group and not player.soco.uid in new_device_ids:
-                self.mass.create_task(self.remove_player(player.player_id))
+                self.mass.run_task(self.remove_player(player.player_id))
         # process new players
         for device in discovered_devices:
             if device.uid not in cur_player_ids and device.is_visible:
@@ -219,13 +219,13 @@ class SonosProvider(PlayerProvider):
         player._media_position_updated_at = None
         # handle subscriptions to events
         def subscribe(service, action):
-            queue = _ProcessSonosEventQueue(action)
+            queue = _ProcessSonosEventQueue(self.mass, action)
             sub = service.subscribe(auto_renew=True, event_queue=queue)
             player._subscriptions.append(sub)
-        subscribe(soco_device.avTransport, player._update_state)
-        subscribe(soco_device.renderingControl, player._update_state)
+        subscribe(soco_device.avTransport, player.update_state)
+        subscribe(soco_device.renderingControl, player.update_state)
         subscribe(soco_device.zoneGroupTopology, self.__topology_changed)
-        self.mass.create_task(self.add_player(player))
+        self.mass.run_task(self.add_player(player))
         return player
 
     def __process_groups(self, sonos_groups):
@@ -242,7 +242,7 @@ class SonosProvider(PlayerProvider):
             group_player.name = group.label
             group_player.group_childs = [item.uid for item in group.members]
             
-    def __topology_changed(self, event=None):
+    async def __topology_changed(self, event=None):
         ''' 
             received topology changed event 
             from one of the sonos players
@@ -253,13 +253,14 @@ class SonosProvider(PlayerProvider):
 class _ProcessSonosEventQueue:
     """Queue like object for dispatching sonos events."""
 
-    def __init__(self, handler):
+    def __init__(self, mass, handler):
         """Initialize Sonos event queue."""
         self._handler = handler
+        self.mass = mass
 
     def put(self, item, block=True, timeout=None):
         """Process event."""
         try:
-            self._handler(item)
+            self.mass.run_task(self._handler(item), wait_for_result=True)
         except Exception as ex:
             LOGGER.warning("Error calling %s: %s", self._handler, ex)
\ No newline at end of file
index e60d1f2c5c09045c1047cf3e37f49df8a10b6fa5..cf012eb9ce3e07fb5012ec81b46b497bac9a91e1 100644 (file)
@@ -41,10 +41,10 @@ class PySqueezeProvider(PlayerProvider):
     async def setup(self):
         ''' async initialize of module '''
         # start slimproto server
-        self.mass.create_task(
+        self.mass.event_loop.create_task(
                 asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
         # setup discovery
-        self.mass.create_task(self.start_discovery())
+        self.mass.event_loop.create_task(self.start_discovery())
 
     async def start_discovery(self):
         transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
@@ -84,7 +84,7 @@ class PySqueezeProvider(PlayerProvider):
                             player_id = str(device_mac).lower()
                             device_type = devices.get(dev_id, 'unknown device')
                             player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer)
-                            self.mass.create_task(self.mass.players.add_player(player))
+                            self.mass.event_loop.create_task(self.mass.players.add_player(player))
                         elif player != None:
                             player.process_msg(operation, packet)
                     
@@ -122,8 +122,8 @@ class PySqueezePlayer(Player):
         self.send_frame(b"setd", struct.pack("B", 4))
 
         # TODO: remember last volume and power state
-        self.mass.create_task(self.volume_set(40))
-        self.mass.create_task(self.power_off())
+        self.mass.event_loop.create_task(self.volume_set(40))
+        self.mass.event_loop.create_task(self.power_off())
         self._heartbeat_task = asyncio.create_task(self.__send_heartbeat())
 
     async def cmd_stop(self):
@@ -243,7 +243,7 @@ class PySqueezePlayer(Player):
         ''' send command to Squeeze player'''
         packet = struct.pack('!H', len(data) + 4) + command + data
         self._writer.write(packet)
-        self.mass.create_task(self._writer.drain())
+        self.mass.event_loop.create_task(self._writer.drain())
 
     def send_version(self):
         self.send_frame(b'vers', b'7.8')
@@ -318,7 +318,7 @@ class PySqueezePlayer(Player):
         LOGGER.debug("Decoder Ready for next track")
         next_item = self.queue.next_item
         if next_item:
-            self.mass.create_task(
+            self.mass.event_loop.create_task(
                 self.__send_play(next_item.uri))
 
     def stat_STMe(self, data):
index a079b1d2ffa37c339554bb6d33c350f63eddf024..6af0fb7deba5e5c01a0e0765ff82263d96f8c9d9 100644 (file)
@@ -50,7 +50,7 @@ class WebPlayerProvider(PlayerProvider):
         ''' async initialize of module '''
         await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE)
         await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER)
-        self.mass.create_task(self.check_players())
+        self.mass.event_loop.create_task(self.check_players())
 
     async def handle_mass_event(self, msg, msg_details):
         ''' received event for the webplayer component '''
index 90a2baee66bb74c03de9915c6b1e0196c95f58a6..18232ef59ef139e77a4d0d3ae32811ccd388d668 100755 (executable)
@@ -15,6 +15,7 @@ LOGGER = logging.getLogger('music_assistant')
 
 from .constants import CONF_KEY_MUSICPROVIDERS, CONF_ENABLED
 
+IS_HASSIO = os.path.isfile('/data/options.json')
 
 def run_periodic(period):
     def scheduler(fcn):
@@ -25,15 +26,6 @@ def run_periodic(period):
         return wrapper
     return scheduler
 
-async def try_supported(task):
-    ''' try to execute a task and pass NotImplementedError Exception '''
-    ret = None
-    try:
-        ret = await task
-    except NotImplementedError:
-        pass
-    return ret
-
 def filename_from_string(string):
     ''' create filename from unsafe string '''
     keepcharacters = (' ','.','_')
index 685b846f4510678592ee2ce5a635d66165f5be17..5abe5c7f7be803c270fc000f8dd61a6f016d9aa6 100755 (executable)
@@ -10,9 +10,10 @@ import ssl
 import concurrent
 import threading
 from .models.media_types import MediaItem, MediaType, media_type_from_string
-from .utils import run_periodic, LOGGER, run_async_background_task, get_ip, json_serializer
+from .utils import run_periodic, LOGGER, IS_HASSIO, run_async_background_task, get_ip, json_serializer
 
 CONF_KEY = 'web'
+
 CONFIG_ENTRIES = [
         ('http_port', 8095, 'web_http_port'),
         ('https_port', 8096, 'web_https_port'),
@@ -28,17 +29,18 @@ class Web():
         self.mass = mass
         # load/create/update config
         config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES)
+        enable_ssl = config['ssl_certificate'] and config['ssl_key']
         if config['ssl_certificate'] and not os.path.isfile(
                 config['ssl_certificate']):
-            raise FileNotFoundError(
-                "SSL certificate file not found: %s" % config['ssl_certificate'])
+            enable_ssl = False
+            LOGGER.warning("SSL certificate file not found: %s" % config['ssl_certificate'])
         if config['ssl_key'] and not os.path.isfile(config['ssl_key']):
-            raise FileNotFoundError(
-                "SSL certificate key file not found: %s" % config['ssl_key'])
-        self.local_ip = get_ip()
+            enable_ssl = False
+            LOGGER.warning( "SSL certificate key file not found: %s" % config['ssl_key'])
         self.http_port = config['http_port']
         self.https_port = config['https_port']
-        self._enable_ssl = config['ssl_certificate'] and config['ssl_key']
+        self._enable_ssl = enable_ssl
+        self.local_ip = get_ip()
         self.config = config
 
     async def setup(self):
@@ -74,11 +76,25 @@ class Web():
         await self.runner.setup()
         http_site = web.TCPSite(self.runner, '0.0.0.0', self.http_port)
         await http_site.start()
+        LOGGER.info("Started HTTP webserver on port %s" % self.http_port)
         if self._enable_ssl:
             ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
             ssl_context.load_cert_chain(self.config['ssl_certificate'], self.config['ssl_key'])
-            https_site = web.TCPSite(self.runner, '0.0.0.0', self.https_port, ssl_context=ssl_context)
+            https_site = web.TCPSite(self.runner, '0.0.0.0', self.config['https_port'], ssl_context=ssl_context)
             await https_site.start()
+            LOGGER.info("Started HTTPS webserver on port %s" % self.config['https_port'])
+        if IS_HASSIO:
+            # host additional http port for hassio ingress
+            headers = {"X-HASSIO-KEY": os.environ["HASSIO_TOKEN"]}
+            url = "http://hassio/addons/self/info"
+            async with aiohttp.ClientSession().get(url, headers=headers, verify_ssl=False) as response:
+                result = await response.json()
+                ingress_port = int(result["ingress_port"])
+                ingress_site = web.TCPSite(self.runner, '0.0.0.0', ingress_port)
+                await ingress_site.start()
+                LOGGER.info("Started INGRESS webserver on port %s" % ingress_port)
+
+
 
     async def get_items(self, request):
         ''' get multiple library items'''