finished refactor
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Sat, 12 Oct 2019 15:45:47 +0000 (17:45 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Sat, 12 Oct 2019 15:45:47 +0000 (17:45 +0200)
16 files changed:
music_assistant/__init__.py
music_assistant/homeassistant.py
music_assistant/http_streamer.py
music_assistant/metadata.py
music_assistant/models/__init__.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/models/playerprovider.py
music_assistant/models/playerstate.py [new file with mode: 0755]
music_assistant/musicproviders/qobuz.py
music_assistant/musicproviders/spotify.py
music_assistant/player_manager.py
music_assistant/playerproviders/chromecast.py
music_assistant/playerproviders/lms.py
music_assistant/playerproviders/pylms.py
music_assistant/web.py

index 984e33b8c89df986c52874f0387b120bbcdd768b..23907716cdcb5a3304af12e95412d9a4f50cd813 100644 (file)
@@ -1,8 +1,6 @@
 #!/usr/bin/env python3
 # -*- coding:utf-8 -*-
 
-# import os, sys; sys.path.append(os.path.dirname(os.path.realpath(__file__)))
-
 import sys
 import asyncio
 from concurrent.futures import ThreadPoolExecutor
@@ -15,9 +13,6 @@ import slugify as unicode_slug
 import uuid
 import json
 import time
-# import stackimpact
-
-# __package__ = 'music_assistant'
 
 from .database import Database
 from .utils import run_periodic, LOGGER
@@ -32,7 +27,7 @@ from .web import setup as web_setup
 def handle_exception(loop, context):
     # context["message"] will always be there; but context["exception"] may not
     msg = context.get("exception", context["message"])
-    LOGGER.error(f"Caught exception: {msg}")
+    LOGGER.exception(f"Caught exception: {msg}")
 
 class MusicAssistant():
 
@@ -41,9 +36,10 @@ class MusicAssistant():
         self.datapath = datapath
         self.parse_config()
         self.event_loop = asyncio.get_event_loop()
+        self.event_loop.set_debug(True)
         self.bg_executor = ThreadPoolExecutor()
         self.event_loop.set_default_executor(self.bg_executor)
-        self.event_loop.set_exception_handler(handle_exception)
+        #self.event_loop.set_exception_handler(handle_exception)
         self.event_listeners = {}
 
         # init database and metadata modules
@@ -61,41 +57,33 @@ class MusicAssistant():
         self.player = PlayerManager(self)
         self.http_streamer = HTTPStreamer(self)
 
-        # agent = stackimpact.start(
-        #     agent_key = '4a00b6f2c7da20f692807d204ab3760318978ba3',
-        #     app_name = 'MusicAssistant')
-        # print("profiler started...")
-
         # start the event loop
         try:
             self.event_loop.run_forever()
         except (KeyboardInterrupt, SystemExit):
             LOGGER.info('Exit requested!')
-            self.signal_event("system_shutdown")
+            self.event_loop.create_task(self.signal_event("system_shutdown"))
             self.event_loop.stop()
             self.save_config()
             time.sleep(5)
             self.event_loop.close()
             LOGGER.info('Shutdown complete.')
 
-    def signal_event(self, msg, msg_details=None):
+    async def signal_event(self, msg, msg_details=None):
         ''' signal (systemwide) event '''
         LOGGER.debug("Event: %s - %s" %(msg, msg_details))
         listeners = list(self.event_listeners.values())
         for callback, eventfilter in listeners:
             if not eventfilter or eventfilter in msg:
-                if not asyncio.iscoroutinefunction(callback):
-                    callback(msg, msg_details)
-                else:
-                    self.event_loop.create_task(callback(msg, msg_details))
+                self.event_loop.create_task(callback(msg, msg_details))
 
-    def add_event_listener(self, cb, eventfilter=None):
+    async def add_event_listener(self, cb, eventfilter=None):
         ''' add callback to our event listeners '''
         cb_id = str(uuid.uuid4())
         self.event_listeners[cb_id] = (cb, eventfilter)
         return cb_id
 
-    def remove_event_listener(self, cb_id):
+    async def remove_event_listener(self, cb_id):
         ''' remove callback from our event listeners '''
         self.event_listeners.pop(cb_id, None)
 
index ae30d1f61c6a2b2585ee079402fe0c243fe66152..9f76f1cec0129eab0fe707601706f0579908f29c 100644 (file)
@@ -68,12 +68,17 @@ class HomeAssistant():
         else:
             self._use_ssl = False
             self._host = url.replace('http://','').split('/')[0]
-        self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
         self.__send_ws = None
         self.__last_id = 10
         LOGGER.info('Homeassistant integration is enabled')
+        self.mass.event_loop.create_task(self.setup())
+
+    async def setup(self):
+        ''' perform async setup '''
+        self.http_session = aiohttp.ClientSession(
+                loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
         mass.event_loop.create_task(self.__hass_websocket())
-        self.mass.add_event_listener(self.mass_event, "player updated")
+        await self.mass.add_event_listener(self.mass_event, "player updated")
         mass.event_loop.create_task(self.__get_sources())
 
     async def get_state(self, entity_id, attribute='state', register_listener=None):
index 0c5a34338a5802f5685cde841c9b085200aa0c47..9fcf371703af60a0230aa55bbb77222f523d07d3 100755 (executable)
@@ -14,7 +14,7 @@ import pyloudnorm as pyln
 import aiohttp
 from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
 from .models.media_types import TrackQuality, MediaType
-from .models.player import PlayerState
+from .models.playerstate import PlayerState
 
 
 class HTTPStreamer():
@@ -55,7 +55,6 @@ class HTTPStreamer():
                 run_async_background_task(
                     self.mass.bg_executor, 
                     self.__stream_queue, player, queue, cancelled)
-                await asyncio.sleep(2)
             try:
                 while True:
                     chunk = await queue.get()
@@ -104,21 +103,33 @@ class HTTPStreamer():
                 chunk = await sox_proc.stdout.read(256000)
                 if not chunk:
                     break
-                await buffer.put(chunk)
-            await buffer.put(b'') # indicate EOF
+                asyncio.run_coroutine_threadsafe(
+                    buffer.put(chunk), 
+                    self.mass.event_loop)
+            # indicate EOF if no more data
+            asyncio.run_coroutine_threadsafe(
+                    buffer.put(b''), 
+                    self.mass.event_loop)
         asyncio.create_task(fill_buffer())
 
-        LOGGER.info("Start Queue Stream for player %s" %(player.name))
+        LOGGER.info("Start Queue Stream for player %s " %(player.name))
+        is_start = True
         last_fadeout_data = b''
-        # report start of queue playback so we can calculate current track/duration etc.
-        # self.mass.event_loop.create_task(self.mass.player.player_queue_stream_update(player_id, queue_index, True))
         while True:
             # get the (next) track in queue
-            queue_track = player.queue.next_item
-            LOGGER.info("got queue track %s" % queue_track.name)
+            if is_start:
+                # report start of queue playback so we can calculate current track/duration etc.
+                queue_track = asyncio.run_coroutine_threadsafe(
+                    player.queue.start_queue_stream(), 
+                    self.mass.event_loop).result()
+                is_start = False
+            else:
+                queue_track = player.queue.next_item
             if not queue_track:
+                LOGGER.warning("no (more) tracks left in queue")
                 break
-            LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
+            LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
+            LOGGER.info(player.state)
             fade_in_part = b''
             cur_chunk = 0
             prev_chunk = None
@@ -196,7 +207,7 @@ class HTTPStreamer():
                 # wait for the queue to consume the data
                 # this prevents that the entire track is sitting in memory
                 # and it helps a bit in the quest to follow where we are in the queue
-                while buffer.qsize() > 1 and not cancelled.is_set():
+                while buffer.qsize() > 2 and not cancelled.is_set():
                     await asyncio.sleep(1)
             # end of the track reached
             if cancelled.is_set():
@@ -207,13 +218,8 @@ class HTTPStreamer():
                 # WIP: update actual duration to the queue for more accurate now playing info
                 accurate_duration = bytes_written / int(sample_rate * 4 * 2)
                 queue_track.duration = accurate_duration
-                #self.mass.player.providers[player.player_provider]._player_queue[player_id][queue_index] = queue_track
-                # move to next queue index
-                #queue_index += 1
-                #self.mass.event_loop.create_task(self.mass.player.player_queue_stream_update(player_id, queue_index, False))
                 LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
                 LOGGER.info("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
-            break
         # end of queue reached, pass last fadeout bits to final output
         if last_fadeout_data and not cancelled.is_set():
             sox_proc.stdin.write(last_fadeout_data)
@@ -255,14 +261,13 @@ class HTTPStreamer():
         streamdetails["provider"] = queue_item.provider
         streamdetails["track_id"] = queue_item.item_id
         streamdetails["player_id"] = player.player_id
-        self.mass.signal_event('streaming_started', streamdetails)
+        asyncio.run_coroutine_threadsafe(
+                self.mass.signal_event('streaming_started', streamdetails), self.mass.event_loop)
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
         prev_chunk = b''
         bytes_sent = 0
         while not process.stdout.at_eof():
-            if cancelled.is_set():
-                process.terminate()
             try:
                 chunk = await process.stdout.readexactly(chunksize)
             except asyncio.streams.IncompleteReadError:
@@ -279,6 +284,10 @@ class HTTPStreamer():
             bytes_sent += len(prev_chunk)
         await process.wait()
         if cancelled.is_set():
+            try:
+                process.terminate()
+            except ProcessLookupError:
+                pass
             LOGGER.warning("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
         else:
             LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
@@ -289,9 +298,13 @@ class HTTPStreamer():
             bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2
         seconds_streamed = int(bytes_sent/bytes_per_second)
         streamdetails["seconds"] = seconds_streamed
-        self.mass.signal_event('streaming_ended', streamdetails)
+        asyncio.run_coroutine_threadsafe(
+                self.mass.signal_event('streaming_ended', streamdetails), 
+                self.mass.event_loop)
         # send task to background to analyse the audio
-        self.mass.event_loop.create_task(self.__analyze_audio(queue_item.item_id, queue_item.provider))
+        asyncio.run_coroutine_threadsafe(
+            self.__analyze_audio(queue_item.item_id, queue_item.provider), 
+            self.mass.event_loop)
 
     async def __get_player_sox_options(self, player, queue_item):
         ''' get player specific sox effect options '''
index 0f0878903b7e549eb6a7b838a514c7e4f2a2fec5..2e0c14b14cb4c14618885bc7e59fdaf60ccb3f24 100755 (executable)
@@ -61,7 +61,12 @@ class MusicBrainz():
     def __init__(self, event_loop, cache):
         self.event_loop = event_loop
         self.cache = cache
-        self.http_session = aiohttp.ClientSession(loop=event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+        self.event_loop.create_task(self.setup())
+
+    async def setup(self):
+        ''' perform async setup '''
+        self.http_session = aiohttp.ClientSession(
+                loop=self.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
         self.throttler = Throttler(rate_limit=1, period=1)
 
     async def search_artist_by_album(self, artistname, albumname=None, album_upc=None):
@@ -131,13 +136,17 @@ class MusicBrainz():
                     result = None
                 return result
 
-
 class FanartTv():
 
     def __init__(self, event_loop, cache):
         self.event_loop = event_loop
         self.cache = cache
-        self.http_session = aiohttp.ClientSession(loop=event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+        self.event_loop.create_task(self.setup())
+
+    async def setup(self):
+        ''' perform async setup '''
+        self.http_session = aiohttp.ClientSession(
+                loop=self.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
         self.throttler = Throttler(rate_limit=1, period=1)
 
     async def artist_images(self, mb_artist_id):
index 05b722196101bd5e797976ba6766a1cf61014769..3518fe877c84d7597e9cb0af706a38f4bafd8100 100644 (file)
@@ -1,5 +1,6 @@
 from .media_types import *
-from .musicprovider import *
-from .player_queue import *
-from .player import *
-from .playerprovider import *
\ No newline at end of file
+from .musicprovider import MusicProvider
+from .player_queue import QueueItem, PlayerQueue
+from .player import Player
+from .playerstate import PlayerState
+from .playerprovider import PlayerProvider
\ No newline at end of file
index 4ae2c4f64c7476009b556b4e1dafb1418acc9322..e7d1b3b2cd3a6c23968048c8ba6621754b9b256d 100755 (executable)
@@ -10,14 +10,9 @@ from ..constants import CONF_ENABLED
 from ..cache import use_cache
 from .media_types import Track, MediaType
 from .player_queue import PlayerQueue, QueueItem
+from .playerstate import PlayerState
 
 
-class PlayerState(str, Enum):
-    Off = "off"
-    Stopped = "stopped"
-    Paused = "paused"
-    Playing = "playing"
-
 class Player():
     ''' representation of a player '''
 
@@ -198,7 +193,7 @@ class Player():
             return hass_state != 'off'
         # mute as power
         elif self.settings.get('mute_as_power'):
-            return self.muted
+            return not self.muted
         else:
             return self._powered
 
@@ -214,11 +209,10 @@ class Player():
         ''' [PROTECTED] cur_time (player's elapsed time) property of this player '''
         # handle group player
         if self.group_parent:
-            group_player = self.mass.bg_executor.submit(asyncio.run, 
-                self.mass.player.get_player(self.group_parent)).result()
+            group_player = self.mass.player.get_player_sync(self.group_parent)
             if group_player:
                 return group_player.cur_time
-        return self._cur_time
+        return self.queue.cur_item_time
 
     @cur_time.setter
     def cur_time(self, cur_time:int):
@@ -232,8 +226,7 @@ class Player():
         ''' [PROTECTED] cur_uri (uri loaded in player) property of this player '''
         # handle group player
         if self.group_parent:
-            group_player = self.mass.bg_executor.submit(asyncio.run, 
-                self.mass.player.get_player(self.group_parent)).result()
+            group_player = self.mass.player.get_player_sync(self.group_parent)
             if group_player:
                 return group_player.cur_uri
         return self._cur_uri
@@ -310,15 +303,6 @@ class Player():
             return []
         return [item for item in self.mass.player.players if item.group_parent == self.player_id]
 
-    @property
-    def settings(self):
-        ''' [PROTECTED] get the player config settings '''
-        player_settings = self.mass.config['player_settings'].get(self.player_id)
-        if not player_settings:
-            player_settings = self.mass.bg_executor.submit(asyncio.run, 
-                self.__update_player_settings()).result()
-        return player_settings
-
     @property
     def enabled(self):
         ''' [PROTECTED] player enabled config setting '''
@@ -329,8 +313,7 @@ class Player():
         ''' [PROTECTED] player's queue '''
         # handle group player
         if self.group_parent:
-            group_player = self.mass.bg_executor.submit(asyncio.run, 
-                self.mass.player.get_player(self.group_parent)).result()
+            group_player = self.mass.player.get_player_sync(self.group_parent)
             if group_player:
                 return group_player.queue
         return self._queue
@@ -344,7 +327,7 @@ class Player():
         ''' [PROTECTED] send stop command to player '''
         if self.group_parent:
             # redirect playback related commands to parent player
-            group_player = await self.mass.player.get(self.group_parent)
+            group_player = await self.mass.player.get_player(self.group_parent)
             if group_player:
                 return await group_player.stop()
         else:
@@ -501,6 +484,8 @@ class Player():
     async def volume_up(self):
         ''' [PROTECTED] send volume up command to player '''
         new_level = self.volume_level + 1
+        if new_level > 100:
+            new_level = 100
         return await self.volume_set(new_level)
 
     async def volume_down(self):
@@ -516,12 +501,17 @@ class Player():
 
     async def update(self):
         ''' [PROTECTED] signal player updated '''
-        await self.__update_player_settings()
-        LOGGER.info("player updated: %s" % self.name)
-        self.mass.signal_event('player changed', self)
+        await self.queue.update()
+        LOGGER.debug("player updated: %s" % self.name)
+        await self.mass.signal_event('player changed', self)
     
-    async def __update_player_settings(self):
+    @property
+    def settings(self):
         ''' [PROTECTED] get (or create) player config settings '''
+        player_settings = self.mass.config['player_settings'].get(self.player_id,{})
+        if player_settings:
+            return player_settings
+        # generate config for the player
         config_entries = [ # default config entries for a player
             ("enabled", True, "player_enabled"),
             ("name", "", "player_name"),
@@ -533,7 +523,7 @@ class Player():
             ("crossfade_duration", 0, "crossfade_duration"),
         ]
         # append player specific settings
-        config_entries += await self.mass.player.providers[self._prov_id].get_player_config_entries()
+        config_entries += self.mass.player.providers[self._prov_id].player_config_entries
         if self.is_group or not self.group_parent:
             config_entries += [ # play on power on setting
                 ("play_power_on", False, "player_power_play"),
@@ -543,7 +533,6 @@ class Player():
             config_entries += [("hass_power_entity", "", "hass_player_power"),
                             ("hass_power_entity_source", "", "hass_player_source"),
                             ("hass_volume_entity", "", "hass_player_volume")]
-        player_settings = self.mass.config['player_settings'].get(self.player_id,{})
         for key, def_value, desc in config_entries:
             if not key in player_settings:
                 if (isinstance(def_value, str) and def_value.startswith('<')):
@@ -554,8 +543,7 @@ class Player():
         self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries
         return player_settings
     
-    @property
-    def __dict__(self):
+    def to_dict(self):
         ''' instance attributes as dict so it can be serialized to json '''
         return {
             "player_id": self.player_id,
index ccf495d8425ef354ae1cb354eb5b92ff63996d65..de7d555cddb1da651dca987f0be2f0fb5a5243c6 100755 (executable)
@@ -10,6 +10,7 @@ import uuid
 from ..utils import LOGGER
 from ..constants import CONF_ENABLED
 from .media_types import Track, TrackQuality
+from .playerstate import PlayerState
 
 
 class QueueItem(Track):
@@ -21,8 +22,8 @@ class QueueItem(Track):
         self.queue_item_id = str(uuid.uuid4())
         # if existing media_item given, load those values
         if media_item:
-            for attribute, value in media_item.__dict__.items():
-                setattr(self, attribute, value)
+            for key, value in media_item.__dict__.items():
+                setattr(self, key, value)
 
 class PlayerQueue():
     ''' 
@@ -37,7 +38,9 @@ class PlayerQueue():
         self._items = []
         self._shuffle_enabled = True
         self._repeat_enabled = False
-        self._cur_index = None
+        self._cur_index = 0
+        self._cur_item_time = 0
+        self._last_index = 0 
 
     @property
     def shuffle_enabled(self):
@@ -49,7 +52,7 @@ class PlayerQueue():
 
     @property
     def crossfade_enabled(self):
-        return self._player.settings['crossfade_duration']
+        return self._player.settings.get('crossfade_duration', 0) > 0
 
     @property
     def gapless_enabled(self):
@@ -58,17 +61,21 @@ class PlayerQueue():
     @property
     def cur_index(self):
         ''' match current uri with queue items to determine queue index '''
-        for index, queue_item in enumerate(self.items):
-            if queue_item.uri == self._player.cur_uri:
-                return index
         return self._cur_index
 
     @property
     def cur_item(self):
-        if self.cur_index == None:
+        if self.cur_index == None or not self.items or len(self.items) < self.cur_index:
             return None
-        return self.mass.bg_executor.submit(asyncio.run,self.get_item(self.cur_index)).result()
+        return self.items[self.cur_index]
 
+    @property
+    def cur_item_time(self):
+        if self.use_queue_stream:
+            return self._cur_item_time
+        else:
+            return self._player._cur_time
+    
     @property
     def next_index(self):
         ''' 
@@ -94,8 +101,9 @@ class PlayerQueue():
         ''' 
             return the next item in the queue
         '''
-        return self.mass.bg_executor.submit(
-                asyncio.run, self.get_item(self.next_index)).result()
+        if self.next_index != None:
+            return self.items[self.next_index]
+        return None
     
     @property
     def items(self):
@@ -170,7 +178,7 @@ class PlayerQueue():
         if not len(self.items) > index:
             return
         if self.use_queue_stream:
-            self._cur_index = index -1
+            self._cur_index = index
             queue_stream_uri = 'http://%s:%s/stream/%s'% (
                         self.mass.web.local_ip, self.mass.web.http_port, self._player.player_id)
             return await self._player.cmd_play_uri(queue_stream_uri)
@@ -184,7 +192,7 @@ class PlayerQueue():
         if self._shuffle_enabled:
             queue_items = await self.__shuffle_items(queue_items)
         self._items = queue_items
-        self._cur_index = None
+        self._cur_index = 0
         if self.use_queue_stream or not self._player.supports_queue:
             return await self.play_index(0)
         else:
@@ -223,6 +231,41 @@ class PlayerQueue():
         if self._player.supports_queue:
             return await self._player.cmd_queue_append(queue_items)
 
+    async def update(self):
+        ''' update queue details, called when player updates '''
+        if self.use_queue_stream and self._player.state == PlayerState.Playing:
+            # determine queue index and cur_time for queue stream
+            # player is playing a constant stream of the queue so we need to do this the hard way
+            cur_time_queue = self._player._cur_time
+            total_time = 0
+            track_time = 0
+            if self.items:
+                queue_index = self._last_index # holds the last starting position
+                queue_track = None
+                while True:
+                    queue_track = self.items[queue_index]
+                    if cur_time_queue > (queue_track.duration + total_time):
+                        total_time += queue_track.duration
+                        queue_index += 1
+                    else:
+                        track_time = cur_time_queue - total_time
+                        break
+                self._cur_index = queue_index
+                self._cur_item_time = track_time
+        elif not self.use_queue_stream:
+            # normal queue based approach
+            cur_index = 0
+            for index, queue_item in enumerate(self.items):
+                if queue_item.uri == self._player.cur_uri:
+                    cur_index = index
+                    break
+            self._cur_index = cur_index
+
+    async def start_queue_stream(self):
+        ''' called by the queue streamer when it starts playing the queue stream '''
+        self._last_index = self.cur_index
+        return await self.get_item(self.cur_index)
+
     async def __shuffle_items(self, queue_items):
         ''' shuffle a list of tracks '''
         # for now we use default python random function
index 2b45955f92291e29229256e85fa4a1dac0a73f97..0dce5c332489ce7c9c087ab7cebdab0faf410978 100755 (executable)
@@ -24,19 +24,16 @@ class PlayerProvider():
         self.mass = mass
         self.name = 'My great Musicplayer provider' # display name
         self.prov_id = 'my_provider' # used as id
+        self.player_config_entries = [] # player specific config entries
 
     ### Common methods and properties ####
 
-    async def get_player_config_entries(self):
-        ''' [CAN OVERRIDE] get the player-specific config entries for this provider (list with key/value pairs)'''
-        return []
 
     @property
     def players(self):
         ''' return all players for this provider '''
-        return self.mass.bg_executor.submit(asyncio.run, 
-                self.mass.player.get_provider_players(self.prov_id)).result()
-    
+        return [item for item in self.mass.player.players if item.player_provider == self.prov_id]
+
     async def get_player(self, player_id:str):
         ''' return player by id '''
         return await self.mass.player.get_player(player_id)
diff --git a/music_assistant/models/playerstate.py b/music_assistant/models/playerstate.py
new file mode 100755 (executable)
index 0000000..3433611
--- /dev/null
@@ -0,0 +1,10 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+
+from enum import Enum
+
+class PlayerState(str, Enum):
+    Off = "off"
+    Stopped = "stopped"
+    Paused = "paused"
+    Playing = "playing"
index 40bab64d7900253d267c2f178e5ca01ec13f91db..0e4ba3cf5b0df079bcd59b1d874a3401b2e145fb 100644 (file)
@@ -44,15 +44,20 @@ class QobuzProvider(MusicProvider):
         self.prov_id = 'qobuz'
         self.mass = mass
         self.cache = mass.cache
-        self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
         self.__username = username
         self.__password = password
         self.__user_auth_info = None
         self.__logged_in = False
-        self.throttler = Throttler(rate_limit=2, period=1)
-        mass.add_event_listener(self.mass_event, 'streaming_started')
-        mass.add_event_listener(self.mass_event, 'streaming_ended')
+        self.mass.event_loop.create_task(self.setup())
 
+    async def setup(self):
+        ''' perform async setup '''
+        self.http_session = aiohttp.ClientSession(
+                loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+        self.throttler = Throttler(rate_limit=2, period=1)
+        await self.mass.add_event_listener(self.mass_event, 'streaming_started')
+        await self.mass.add_event_listener(self.mass_event, 'streaming_ended')
+    
     async def search(self, searchstring, media_types=List[MediaType], limit=5):
         ''' perform search on the provider '''
         result = {
index 43a56fc8c15efee353f6e0decc1427d0ecd9a4a9..50f4001d8d02143f07c26352ddad0ae13c894fa8 100644 (file)
@@ -45,11 +45,16 @@ class SpotifyProvider(MusicProvider):
         self._cur_user = None
         self.mass = mass
         self.cache = mass.cache
-        self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
-        self.throttler = Throttler(rate_limit=1, period=1)
         self._username = username
         self._password = password
         self.__auth_token = {}
+        self.mass.event_loop.create_task(self.setup())
+
+    async def setup(self):
+        ''' perform async setup '''
+        self.http_session = aiohttp.ClientSession(
+                loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+        self.throttler = Throttler(rate_limit=1, period=1)
 
     async def search(self, searchstring, media_types=List[MediaType], limit=5):
         ''' perform search on the provider '''
index de6026d859bd1bb120694b4c106a4d0d95469089..324c4823e98bf4915296720f094bde23c2ea2c91 100755 (executable)
@@ -13,7 +13,7 @@ import importlib
 from .utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task
 from .models.media_types import MediaType, TrackQuality
 from .models.player_queue import QueueItem
-from .models.player import PlayerState
+from .models.playerstate import PlayerState
 
 BASE_DIR = os.path.dirname(os.path.abspath(__file__))
 MODULES_PATH = os.path.join(BASE_DIR, "playerproviders" )
@@ -29,38 +29,31 @@ class PlayerManager():
         self._players = {}
         # dynamically load provider modules
         self.load_providers()
-
+    
     @property
     def players(self):
-        ''' all players as property '''
-        return self.mass.bg_executor.submit(asyncio.run, 
-                self.get_players()).result()
-    
-    async def get_players(self):
-        ''' return all players as a list '''
-        items = list(self._players.values())
-        items.sort(key=lambda x: x.name, reverse=False)
-        return items
+        ''' return list of all players '''
+        return self._players.values()
 
     async def get_player(self, player_id):
         ''' return player by id '''
         return self._players.get(player_id, None)
 
-    async def get_provider_players(self, player_provider):
-        ''' return all players for given provider_id '''
-        return [item for item in self._players.values() if item.player_provider == player_provider] 
+    def get_player_sync(self, player_id):
+        ''' return player by id (non async) '''
+        return self._players.get(player_id, None)
 
     async def add_player(self, player):
         ''' register a new player '''
         self._players[player.player_id] = player
-        self.mass.signal_event('player added', player)
+        await self.mass.signal_event('player added', player)
         # TODO: turn on player if it was previously turned on ?
         return player
 
     async def remove_player(self, player_id):
         ''' handle a player remove '''
         self._players.pop(player_id, None)
-        self.mass.signal_event('player removed', player_id)
+        await self.mass.signal_event('player removed', player_id)
 
     async def trigger_update(self, player_id):
         ''' manually trigger update for a player '''
index 7441b6c47ebdf06d5d747ca264baf35a6bc2e06a..cf1434db060eb1bd071dd7ce976ffe2733a72e15 100644 (file)
@@ -13,6 +13,7 @@ import types
 from ..utils import run_periodic, LOGGER, try_parse_int
 from ..models.playerprovider import PlayerProvider
 from ..models.player import Player, PlayerState
+from ..models.playerstate import PlayerState
 from ..models.player_queue import QueueItem, PlayerQueue
 from ..constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT
 
@@ -180,14 +181,9 @@ class ChromecastProvider(PlayerProvider):
         self.prov_id = 'chromecast'
         self.name = 'Chromecast'
         self._discovery_running = False
+        self.player_config_entries = [("gapless_enabled", False, "gapless_enabled")]
         self.mass.event_loop.create_task(self.__periodic_chromecast_discovery())
 
-    async def get_player_config_entries(self):
-        ''' get the player config entries for this provider (list with key/value pairs)'''
-        return [
-            ("gapless_enabled", False, "gapless_enabled")
-            ]
-
     async def __handle_player_state(self, chromecast, caststatus=None, mediastatus=None):
         ''' handle a player state message from the socket '''
         player_id = str(chromecast.uuid)
@@ -198,15 +194,25 @@ class ChromecastProvider(PlayerProvider):
             player.muted = caststatus.volume_muted
             player.volume_level = caststatus.volume_level * 100
         if mediastatus:
-            # chromecast does not support power on/of so we only set state
             if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
                 player.state = PlayerState.Playing
+                player.powered = True
             elif mediastatus.player_state == 'PAUSED':
                 player.state = PlayerState.Paused
             else:
                 player.state = PlayerState.Stopped
             player.cur_uri = mediastatus.content_id
             player.cur_time = mediastatus.adjusted_current_time
+            # create update/poll task for the current time
+            async def poll_task():
+                player.poll_task = True
+                while player.state == PlayerState.Playing:
+                    player.cur_time = mediastatus.adjusted_current_time
+                    await asyncio.sleep(5)
+                player.poll_task = False
+            if not player.poll_task and player.state == PlayerState.Playing:
+                self.mass.event_loop.create_task(poll_task())
+            asyncio.run_coroutine_threadsafe(player.update(), self.mass.event_loop)
 
     async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
         ''' callback when cast group members update '''
@@ -286,6 +292,7 @@ class ChromecastProvider(PlayerProvider):
         listenerMedia = StatusMediaListener(chromecast, self.__handle_player_state, self.mass.event_loop)
         chromecast.media_controller.register_status_listener(listenerMedia)
         player = ChromecastPlayer(self.mass, player_id, self.prov_id)
+        player.poll_task = False
         if chromecast.cast_type == 'group':
             player.is_group = True
             mz = MultizoneController(chromecast.uuid)
index ad5b5e38382aa23285afa5f8c011d0082490bce8..cc1b55fd1e2e65745fce99ba704bf60efa3c46bf 100644 (file)
@@ -44,15 +44,12 @@ class LMSProvider(PlayerProvider):
     ''' support for Logitech Media Server '''
 
     def __init__(self, mass, hostname, port):
+        super().__init__(mass)
         self.prov_id = 'lms'
         self.name = 'Logitech Media Server'
-        self.icon = ''
-        self.mass = mass
-        self._players = {}
         self._host = hostname
         self._port = port
         self.last_msg_received = 0
-        self.supported_musicproviders = ['qobuz', 'file', 'spotify', 'http']
         self.http_session = aiohttp.ClientSession(loop=mass.event_loop)
         # we use a combi of active polling and subscriptions because the cometd implementation of LMS is somewhat unreliable
         asyncio.ensure_future(self.__lms_events())
@@ -60,9 +57,6 @@ class LMSProvider(PlayerProvider):
 
     ### Provider specific implementation #####
 
-    async def player_config_entries(self):
-        ''' get the player config entries for this provider (list with key/value pairs)'''
-        return []
 
     async def player_command(self, player_id, cmd:str, cmd_args=None):
         ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
index 0a83f5336c560b1e1aeeeb554540888683a1c2f4..78ad61a7bd7a7bddbe01814d74dd63ac637f3c59 100644 (file)
@@ -35,9 +35,9 @@ class PyLMSServer(PlayerProvider):
     ''' Python implementation of SlimProto server '''
 
     def __init__(self, mass):
+        super().__init__(mass)
         self.prov_id = 'pylms'
         self.name = 'Logitech Media Server Emulation'
-        self.mass = mass
         self._lmsplayers = {}
         self.buffer = b''
         self.last_msg_received = 0
index 739e894d2f6a96a4d0fc0f7780726cf95875fd4d..83bac0639146c228465269ccd32f31fdc2997850 100755 (executable)
@@ -17,14 +17,30 @@ from .utils import run_periodic, LOGGER, run_async_background_task, get_ip
 #json_serializer = partial(json.dumps, default=lambda x: x.__dict__)
 
 def json_serializer(obj):
-    # if isinstance(obj, list):
-    #     lst = []
-    #     for item in obj:
-    #         json_obj = json.dumps(item, skipkeys=True, default=lambda x: x.__dict__)
-    #         lst.append(json_obj)
-    #     return '[' + ','.join(lst) + ']'
-    return json.dumps(obj, skipkeys=True, default=lambda x: x.__dict__)
 
+    def get_val(val):
+        if isinstance(val, (int, str, bool, float)):
+            return val
+        elif isinstance(val, list):
+            new_list = []
+            for item in val:
+                new_list.append( get_val(item))
+            return new_list
+        elif hasattr(val, 'to_dict'):
+            return get_val(val.to_dict())
+        elif isinstance(val, dict):
+            new_dict = {}
+            for key, value in val.items():
+                new_dict[key] = get_val(value)
+            return new_dict
+        elif hasattr(val, '__dict__'):
+            new_dict = {}
+            for key, value in val.__dict__.items():
+                new_dict[key] = get_val(value)
+            return new_dict
+        
+    obj = get_val(obj)
+    return json.dumps(obj, skipkeys=True)
 
 def setup(mass):
     ''' setup the module and read/apply config'''
@@ -70,14 +86,15 @@ class Web():
         self._ssl_cert = ssl_cert
         self._ssl_key = ssl_key
         self._cert_fqdn_host = cert_fqdn_host
-        self.http_session = aiohttp.ClientSession()
-        mass.event_loop.create_task(self.setup_web())
+        self.mass.event_loop.create_task(self.setup())
 
     def stop(self):
         asyncio.create_task(self.runner.cleanup())
         asyncio.create_task(self.http_session.close())
 
-    async def setup_web(self):
+    async def setup(self):
+        ''' perform async setup '''
+        self.http_session = aiohttp.ClientSession()
         app = web.Application()
         app.add_routes([web.get('/jsonrpc.js', self.json_rpc)])
         app.add_routes([web.post('/jsonrpc.js', self.json_rpc)])
@@ -89,6 +106,7 @@ class Web():
         app.add_routes([web.get('/api/config', self.get_config)])
         app.add_routes([web.post('/api/config', self.save_config)])
         app.add_routes([web.get('/api/players', self.players)])
+        app.add_routes([web.get('/api/players/{player_id}', self.player)])
         app.add_routes([web.get('/api/players/{player_id}/queue', self.player_queue)])
         app.add_routes([web.get('/api/players/{player_id}/cmd/{cmd}', self.player_command)])
         app.add_routes([web.get('/api/players/{player_id}/cmd/{cmd}/{cmd_args}', self.player_command)])
@@ -194,7 +212,15 @@ class Web():
 
     async def players(self, request):
         ''' get all players '''
-        return web.json_response(self.mass.player.players, dumps=json_serializer)
+        players = list(self.mass.player.players)
+        players.sort(key=lambda x: x.name, reverse=False)
+        return web.json_response(players, dumps=json_serializer)
+
+    async def player(self, request):
+        ''' get single player '''
+        player_id = request.match_info.get('player_id')
+        player = await self.mass.player.get_player(player_id)
+        return web.json_response(player, dumps=json_serializer)
 
     async def player_command(self, request):
         ''' issue player command'''
@@ -237,7 +263,7 @@ class Web():
         # queue_items = [item.__dict__ for item in queue_items]
         # print(queue_items)
         # result = queue_items[offset:limit]
-        return web.json_response(player.queue.items, dumps=json_serializer) 
+        return web.json_response(player.queue.items[offset:limit], dumps=json_serializer) 
     
     async def index(self, request):  
         return web.FileResponse("./web/index.html")
@@ -253,14 +279,16 @@ class Web():
             async def send_event(msg, msg_details):
                 ws_msg = {"message": msg, "message_details": msg_details }
                 await ws.send_json(ws_msg, dumps=json_serializer)
-            cb_id = self.mass.add_event_listener(send_event)
+            cb_id = await self.mass.add_event_listener(send_event)
             # process incoming messages
             async for msg in ws:
                 if msg.type != aiohttp.WSMsgType.TEXT:
                     continue
                 # for now we only use WS for (simple) player commands
                 if msg.data == 'players':
-                    ws_msg = {'message': 'players', 'message_details': self.mass.player.players}
+                    players = list(self.mass.player.players)
+                    players.sort(key=lambda x: x.name, reverse=False)
+                    ws_msg = {'message': 'players', 'message_details': players}
                     await ws.send_json(ws_msg, dumps=json_serializer)
                 elif msg.data.startswith('players') and '/cmd/' in msg.data:
                     # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args}
@@ -277,7 +305,7 @@ class Web():
         except Exception as exc:
             LOGGER.exception(exc)
         finally:
-            self.mass.remove_event_listener(cb_id)
+            await self.mass.remove_event_listener(cb_id)
         LOGGER.debug('websocket connection closed')
         return ws
 
@@ -303,7 +331,7 @@ class Web():
                     self.mass.config[key] = new_config[key]
         if config_changed:
             self.mass.save_config()
-            self.mass.signal_event('config_changed')
+            await self.mass.signal_event('config_changed')
         return web.Response(text='success')
 
     async def json_rpc(self, request):