fix playback reporting stuff
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Thu, 17 Oct 2019 22:04:15 +0000 (00:04 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Thu, 17 Oct 2019 22:04:15 +0000 (00:04 +0200)
fix chromecast stability due to threading

music_assistant/constants.py
music_assistant/http_streamer.py
music_assistant/models/player_queue.py
music_assistant/musicproviders/qobuz.py
music_assistant/playerproviders/chromecast.py
music_assistant/web.py

index a83bf0f99d62cc12ed1cd2d742ef1a8c3732a049..d848e47a040785fc9a7e4817afc349496e2f143e 100755 (executable)
@@ -20,3 +20,5 @@ EVENT_PLAYER_CHANGED = "player changed"
 EVENT_STREAM_STARTED = "streaming started"
 EVENT_STREAM_ENDED = "streaming ended"
 EVENT_CONFIG_CHANGED = "config changed"
+EVENT_PLAYBACK_STARTED = "playback started"
+EVENT_PLAYBACK_STOPPED = "playback stopped"
index 9f5b6fd461a573b6ff1dddfc4283dddece26ab51..1c9638d37da5d652057c5a60c4dd85e47372bfec 100755 (executable)
@@ -296,29 +296,29 @@ class HTTPStreamer():
             yield (True, b'')
             return
         # get sox effects and resample options
-        sox_effects = await self.__get_player_sox_options(player, queue_item)
+        sox_options = await self.__get_player_sox_options(player, queue_item)
         outputfmt = 'flac -C 0'
         if resample:
             outputfmt = 'raw -b 32 -c 2 -e signed-integer'
-            sox_effects += ' rate -v %s' % resample
+            sox_options += ' rate -v %s' % resample
+        streamdetails['sox_options'] = sox_options
         # determine how to proceed based on input file ype
         if streamdetails["content_type"] == 'aac':
             # support for AAC created with ffmpeg in between
-            args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects)
+            args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_options)
         elif streamdetails['type'] == 'url':
             args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"], 
-                    streamdetails["path"], outputfmt, sox_effects)
+                    streamdetails["path"], outputfmt, sox_options)
         elif streamdetails['type'] == 'executable':
             args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], 
-                    streamdetails["content_type"], outputfmt, sox_effects)
+                    streamdetails["content_type"], outputfmt, sox_options)
         # start sox process
         # we use normal subprocess instead of asyncio because of bug with executor
         # this should be fixed with python 3.8
         process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
-        # fire event that streaming has started for this track (needed by some streaming providers)
+        # fire event that streaming has started for this track
         asyncio.run_coroutine_threadsafe(
-                self.mass.signal_event(EVENT_STREAM_STARTED, 
-                streamdetails), self.mass.event_loop)
+                self.mass.signal_event(EVENT_STREAM_STARTED, queue_item), self.mass.event_loop)
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
         bytes_sent = 0
@@ -343,44 +343,37 @@ class HTTPStreamer():
             else:
                 buf += data
         del buf
-        # fire event that streaming has ended for this track (needed by some streaming providers)
-        if resample:
-            bytes_per_second = resample * (32/8) * 2
-            bytes_per_second = (resample * 32 * 2) / 8
-            seconds_streamed = int(bytes_sent/bytes_per_second)
-        else:
-            seconds_streamed = queue_item.duration
-        streamdetails["seconds"] = seconds_streamed
+        # fire event that streaming has ended
         asyncio.run_coroutine_threadsafe(
-                self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop)
+                self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop)
         # send task to background to analyse the audio
         self.mass.event_loop.call_soon_threadsafe(
                 asyncio.ensure_future, self.__analyze_audio(queue_item))
 
     async def __get_player_sox_options(self, player, queue_item):
         ''' get player specific sox effect options '''
-        sox_effects = []
+        sox_options = []
         # volume normalisation
         gain_correct = asyncio.run_coroutine_threadsafe(
                 self.mass.players.get_gain_correct(
                     player.player_id, queue_item.item_id, queue_item.provider), 
                 self.mass.event_loop).result()
         if gain_correct != 0:
-            sox_effects.append('vol %s dB ' % gain_correct)
+            sox_options.append('vol %s dB ' % gain_correct)
         # downsample if needed
         if player.settings['max_sample_rate']:
             max_sample_rate = try_parse_int(player.settings['max_sample_rate'])
             if max_sample_rate:
                 quality = queue_item.quality
                 if quality > TrackQuality.FLAC_LOSSLESS_HI_RES_3 and max_sample_rate == 192000:
-                    sox_effects.append('rate -v 192000')
+                    sox_options.append('rate -v 192000')
                 elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_2 and max_sample_rate == 96000:
-                    sox_effects.append('rate -v 96000')
+                    sox_options.append('rate -v 96000')
                 elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_1 and max_sample_rate == 48000:
-                    sox_effects.append('rate -v 48000')
-        if player.settings.get('sox_effects'):
-            sox_effects.append(player.settings['sox_effects'])
-        return " ".join(sox_effects)
+                    sox_options.append('rate -v 48000')
+        if player.settings.get('sox_options'):
+            sox_options.append(player.settings['sox_options'])
+        return " ".join(sox_options)
         
     async def __analyze_audio(self, queue_item):
         ''' analyze track audio, for now we only calculate EBU R128 loudness '''
@@ -422,7 +415,6 @@ class HTTPStreamer():
         args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length)
         process = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE)
         process.communicate(fade_in_part)
-        fadeinfile.close()
         # create fade-out part
         fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
         args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length)
index be2df942ff41d5639e91346695c71aa3aba02fae..ab75dbf462543f8f5e46778a5bd7819acbb0910e 100755 (executable)
@@ -10,7 +10,7 @@ import os
 import pickle
 
 from ..utils import LOGGER, json, filename_from_string
-from ..constants import CONF_ENABLED
+from ..constants import CONF_ENABLED, EVENT_PLAYBACK_STARTED, EVENT_PLAYBACK_STOPPED
 from .media_types import Track, TrackQuality
 from .playerstate import PlayerState
 
@@ -42,13 +42,14 @@ class PlayerQueue():
         self._repeat_enabled = False
         self._cur_index = 0
         self._cur_item_time = 0
-        self._last_cur_item_time = 0
-        self._last_index = 0
+        self._last_item_time = 0
+        self._last_queue_startindex = 0
         self._last_player_state = PlayerState.Stopped
         self._save_busy_ = False
+        self._last_track = None
         # load previous queue settings from disk
-        self.__load_from_file()
-
+        self.mass.event_loop.create_task(self.__load_from_file())
+        
     @property
     def shuffle_enabled(self):
         return self._shuffle_enabled
@@ -78,10 +79,7 @@ class PlayerQueue():
 
     @property
     def cur_item_time(self):
-        if self.use_queue_stream:
-            return self._cur_item_time
-        else:
-            return self._player._cur_time
+        return self._cur_item_time
     
     @property
     def next_index(self):
@@ -174,9 +172,12 @@ class PlayerQueue():
         ''' resume previous queue '''
         if self.items:
             prev_index = self.cur_index
-            await self.load(self._items)
-            if prev_index:
+            if self.use_queue_stream:
                 await self.play_index(prev_index)
+            else:
+                # at this point we don't know if the queue is synced with the player
+                # so just to be safe we only send the queue_items from the index
+                await self.load(self._items[prev_index:])
         else:
             LOGGER.warning("resume queue requested for %s but queue is empty" % self._player.name)
     
@@ -185,7 +186,7 @@ class PlayerQueue():
         if not len(self.items) > index:
             return
         if self.use_queue_stream:
-            self._cur_index = index
+            self._last_queue_startindex = index
             queue_stream_uri = 'http://%s:%s/stream/%s'% (
                         self.mass.web.local_ip, self.mass.web.http_port, self._player.player_id)
             return await self._player.cmd_play_uri(queue_stream_uri)
@@ -199,11 +200,10 @@ class PlayerQueue():
         if self._shuffle_enabled:
             queue_items = await self.__shuffle_items(queue_items)
         self._items = queue_items
-        self._cur_index = 0
         if self.use_queue_stream or not self._player.supports_queue:
-            return await self.play_index(0)
+            await self.play_index(0)
         else:
-            return await self._player.cmd_queue_load(queue_items)
+            await self._player.cmd_queue_load(queue_items)
 
     async def insert(self, queue_items:List[QueueItem], offset=0):
         ''' 
@@ -248,81 +248,95 @@ class PlayerQueue():
     async def update(self):
         ''' update queue details, called when player updates '''
         cur_index = self._cur_index
-        # determine queue index and cur_time for queue stream
+        track_time = self._cur_item_time
+        # handle queue stream
         if self.use_queue_stream and self._player.state == PlayerState.Playing:
-            # player is playing a constant stream of the queue so we need to do this the hard way
-            cur_time_queue = self._player._cur_time
-            total_time = 0
-            track_time = 0
-            if self.items and len(self.items) > self._last_index:
-                queue_index = self._last_index # holds the last starting position
-                queue_track = None
-                while len(self.items) > queue_index:
-                    queue_track = self.items[queue_index]
-                    if cur_time_queue > (queue_track.duration + total_time):
-                        total_time += queue_track.duration
-                        queue_index += 1
-                    else:
-                        track_time = cur_time_queue - total_time
-                        break
-                cur_index = queue_index
-                self._cur_item_time = track_time
+            cur_index, track_time = await self.__get_queue_stream_index()
         # normal queue based approach
         elif not self.use_queue_stream:
-            if 'queue_item_id' in self._player.cur_uri:
-                queue_item_id = self._player.cur_uri.split('queue_item_id=')[1]
+            track_time = self._player._cur_time
             for index, queue_item in enumerate(self.items):
                 if queue_item.uri == self._player.cur_uri:
                     cur_index = index
                     break
         # process new index
-        await self.__update_index(cur_index)
+        await self.__process_queue_update(cur_index, track_time)
 
     async def start_queue_stream(self):
         ''' called by the queue streamer when it starts playing the queue stream '''
-        self._last_index = self.cur_index
-        return await self.get_item(self.cur_index)
+        return await self.get_item(self._last_queue_startindex)
 
-    async def __update_index(self, new_index):
+    async def __get_queue_stream_index(self):
+        # player is playing a constant stream of the queue so we need to do this the hard way
+        queue_index = 0
+        cur_time_queue = self._player._cur_time
+        total_time = 0
+        track_time = 0
+        if self.items and len(self.items) > self._last_queue_startindex:
+            queue_index = self._last_queue_startindex # holds the last starting position
+            queue_track = None
+            while len(self.items) > queue_index:
+                queue_track = self.items[queue_index]
+                if cur_time_queue > (queue_track.duration + total_time):
+                    total_time += queue_track.duration
+                    queue_index += 1
+                else:
+                    track_time = cur_time_queue - total_time
+                    break
+        return queue_index, track_time
+        
+    async def __process_queue_update(self, new_index, track_time):
         ''' compare the queue index to determine if playback changed '''
-        if new_index != self._last_index:
-            LOGGER.info("new track loaded in queue")
-            self._cur_index = new_index
-        elif (self._last_player_state == PlayerState.Stopped and 
-                self._player.state == PlayerState.Playing and
-                self.cur_item):
-            LOGGER.info("Player %s started playing %s" % self.cur_item.name)
-        elif (self._last_player_state == PlayerState.Playing and 
-                self._player.state == PlayerState.Stopped and
-                self.cur_item):
-            LOGGER.info("Player %s stopped playing %s" % self.cur_item.name)
-        # always update timestamp
-        self._last_cur_item_time = self.cur_item_time
-
+        new_track = await self.get_item(new_index)
+        if (not self._last_track and new_track) or self._last_track != new_track:
+            # queue track updated
+            # account for track changing state so trigger track change after 1 second
+            if self._last_track:
+                self._last_track.seconds_played = self._last_item_time
+                self.mass.event_loop.create_task(
+                    self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track))
+            if new_track:
+                self.mass.event_loop.create_task(
+                    self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track))
+            self._last_track = new_track
+            await self.__save_to_file()
+        if self._last_player_state != self._player.state:
+            self._last_player_state = self._player.state
+            if (self._player.cur_time == 0 and 
+                self._player.state in [PlayerState.Stopped, PlayerState.Off]):
+                # player stopped playing
+                self._last_queue_startindex = self.cur_index
+        # update vars
+        if track_time > 2:
+            # account for track changing state so keep this a few seconds behind
+            self._last_item_time = track_time
+        self._cur_item_time = track_time
+        self._cur_index = new_index
+        
     async def __shuffle_items(self, queue_items):
         ''' shuffle a list of tracks '''
         # for now we use default python random function
         # can be extended with some more magic last last_played and stuff
         return random.sample(queue_items, len(queue_items))
 
-    def __load_from_file(self):
+    async def __load_from_file(self):
         ''' try to load the saved queue for this player from file '''
         player_safe_str = filename_from_string(self._player.player_id)
         settings_dir = os.path.join(self.mass.datapath, 'queue')
         player_file = os.path.join(settings_dir, player_safe_str)
         if os.path.isfile(player_file):
             try:
-                with open(player_file) as f:
+                with open(player_file, 'rb') as f:
                     data = pickle.load(f)
                     self._shuffle_enabled = data["shuffle_enabled"]
                     self._repeat_enabled = data["repeat_enabled"]
                     self._items = data["items"]
                     self._cur_index = data["cur_item"]
-                    self._last_index = data["last_index"]
+                    self._last_queue_startindex = data["last_index"]
             except Exception as exc:
                 LOGGER.debug("Could not load queue from disk - %s" % str(exc))
 
-    def __save_to_file(self):
+    async def __save_to_file(self):
         ''' save current queue settings to file '''
         if self._save_busy_:
             return
@@ -335,12 +349,12 @@ class PlayerQueue():
             "repeat_enabled": self._repeat_enabled,
             "items": self._items,
             "cur_item": self._cur_index,
-            "last_index": self._last_index
+            "last_index": self._cur_index
         }
         if not os.path.isdir(settings_dir):
             os.mkdir(settings_dir)
-        with open(player_file, 'w+') as f:
-            pickle.dump(data, f)
+        with open(player_file, 'wb') as f:
+            data = pickle.dump(data, f)
         self._save_busy_ = False
 
 
index 8979a4468d32aa7445d0af21c070069c512033f3..7d98cd6700a75f47aa2d6b28ea71f4640b6e4960 100644 (file)
@@ -17,7 +17,7 @@ from ..app_vars import get_app_var
 from ..models import MusicProvider, MediaType, TrackQuality, \
         AlbumType, Artist, Album, Track, Playlist
 from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, \
-        CONF_TYPE_PASSWORD, EVENT_STREAM_STARTED, EVENT_STREAM_ENDED
+        CONF_TYPE_PASSWORD, EVENT_PLAYBACK_STARTED, EVENT_PLAYBACK_STOPPED
 
 PROV_ID = 'qobuz'
 PROV_NAME = 'Qobuz'
@@ -50,8 +50,8 @@ class QobuzProvider(MusicProvider):
         self.http_session = aiohttp.ClientSession(
                 loop=self.mass.event_loop, connector=aiohttp.TCPConnector())
         self.throttler = Throttler(rate_limit=2, period=1)
-        await self.mass.add_event_listener(self.mass_event, EVENT_STREAM_STARTED)
-        await self.mass.add_event_listener(self.mass_event, EVENT_STREAM_ENDED)
+        await self.mass.add_event_listener(self.mass_event, EVENT_PLAYBACK_STARTED)
+        await self.mass.add_event_listener(self.mass_event, EVENT_PLAYBACK_STOPPED)
     
     async def search(self, searchstring, media_types=List[MediaType], limit=5):
         ''' perform search on the provider '''
@@ -278,30 +278,28 @@ class QobuzProvider(MusicProvider):
     async def mass_event(self, msg, msg_details):
         ''' received event from mass '''
         # TODO: need to figure out if the streamed track is purchased
-        if msg == "streaming_started" and msg_details['provider'] == self.prov_id:
+        if msg == EVENT_PLAYBACK_STARTED and msg_details.provider == self.prov_id:
             # report streaming started to qobuz
-            LOGGER.debug("streaming_started %s" % msg_details["track_id"])
             device_id = self.__user_auth_info["user"]["device"]["id"]
             credential_id = self.__user_auth_info["user"]["credential"]["id"]
             user_id = self.__user_auth_info["user"]["id"]
-            format_id = msg_details["details"]["format_id"]
+            format_id = msg_details.streamdetails["details"]["format_id"]
             timestamp = int(time.time())
             events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, 
-                "track_id": msg_details["track_id"], "purchase": False, "date": timestamp,
+                "track_id": msg_details.item_id, "purchase": False, "date": timestamp,
                 "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
             await self.__post_data("track/reportStreamingStart", data=events)
-        elif msg == "streaming_ended" and msg_details['provider'] == self.prov_id:
+        elif msg == EVENT_PLAYBACK_STOPPED and msg_details.provider == self.prov_id:
             # report streaming ended to qobuz
-            LOGGER.debug("streaming_ended %s - seconds played: %s" %(msg_details["track_id"], msg_details["seconds"]) )
             device_id = self.__user_auth_info["user"]["device"]["id"]
             credential_id = self.__user_auth_info["user"]["credential"]["id"]
             user_id = self.__user_auth_info["user"]["id"]
-            format_id = msg_details["details"]["format_id"]
+            format_id = msg_details.streamdetails["details"]["format_id"]
             timestamp = int(time.time())
             events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, 
-                "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, "duration": msg_details["seconds"],
+                "track_id": msg_details.item_id, "purchase": False, "date": timestamp, "duration": int(msg_details.seconds_played),
                 "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
-            await self.__post_data("track/reportStreamingStart", data=events)
+            await self.__post_data("track/reportStreamingEnd", data=events)
     
     async def __parse_artist(self, artist_obj):
         ''' parse qobuz artist object to generic layout '''
index 79a19ec10ca7294a1dcc2a0292f56a122f71a6cc..a0e57463019ce3a2fa4bd27e83da76d94193f8e6 100644 (file)
@@ -34,12 +34,15 @@ class ChromecastPlayer(Player):
 
     async def try_chromecast_command(self, cmd:types.MethodType, *args, **kwargs):
         ''' guard for disconnected socket client '''
-        try:
-            cmd(*args, **kwargs)
-        except (pychromecast.error.NotConnected, AttributeError):
-            LOGGER.warning("Chromecast %s is not connected!" % self.name)
-        except Exception as exc:
-            LOGGER.warning(exc)
+        def _try_chromecast_command(_cmd:types.MethodType, *_args, **_kwargs):
+            try:
+                _cmd(*_args, **_kwargs)
+            except (pychromecast.error.NotConnected, AttributeError):
+                LOGGER.warning("Chromecast %s is not connected!" % self.name)
+            except Exception as exc:
+                LOGGER.warning(exc)
+        return self.mass.event_loop.call_soon_threadsafe(
+            _try_chromecast_command, cmd, *args, **kwargs)
     
     async def cmd_stop(self):
         ''' send stop command to player '''
@@ -175,22 +178,31 @@ class ChromecastPlayer(Player):
         else:
             send_queue()
 
+    def __update_group_members(self):
+        ''' update group members '''
+        if not self.mz:
+            return
+        try:
+            self.mz.update_members()
+        except Exception as exc:
+            LOGGER.exception(exc)
+
     async def handle_player_state(self, caststatus=None, 
             mediastatus=None, connection_status=None):
         ''' handle a player state message from the socket '''
         # handle connection status
         if connection_status:
             if self.mz and connection_status.status == CONNECTION_STATUS_CONNECTED:
-                return self.mz.update_members()
+                self.mass.event_loop.call_soon_threadsafe(self.__update_group_members)
             elif connection_status.status == CONNECTION_STATUS_DISCONNECTED:
                 # schedule a new scan which will handle group parent changes
-                return self.mass.event_loop.create_task(
+                self.mass.event_loop.create_task(
                     self.mass.players.providers[self.player_provider].start_chromecast_discovery())
         # handle generic cast status
         if caststatus:
-            self.name = self.cc.name
             self.muted = caststatus.volume_muted
             self.volume_level = caststatus.volume_level * 100
+        self.name = self.cc.name
         # handle media status
         if mediastatus:
             if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
@@ -211,8 +223,6 @@ class ChromecastPlayer(Player):
                 self.poll_task = False
             if not self.poll_task and self.state == PlayerState.Playing:
                 self.mass.event_loop.create_task(poll_task())
-            # we are called from socket client thread so do this threadsafe!
-            #asyncio.run_coroutine_threadsafe(self.update(), self.mass.event_loop)
 
 class ChromecastProvider(PlayerProvider):
     ''' support for ChromeCast Audio '''
@@ -236,7 +246,7 @@ class ChromecastProvider(PlayerProvider):
             player = await self.get_player(added_player)
             group_player = await self.get_player(str(mz._uuid))
             if player and group_player:
-                player.group_parent = str(mz._uuid)
+                player.group_parent = group_player.player_id
                 LOGGER.debug("player %s added to group %s" %(player.name, group_player.name))
         elif removed_player:
             player = await self.get_player(added_player)
@@ -248,6 +258,7 @@ class ChromecastProvider(PlayerProvider):
             for member in mz.members:
                 player = await self.get_player(member)
                 if player:
+                    LOGGER.debug("player %s added to group %s" %(player.name, str(mz._uuid)))
                     player.group_parent = str(mz._uuid)
     
     @run_periodic(1800)
index 7da9aedc2745e4a1d26210e61713298255d3b8c5..e855e9d8707e8fccca5fdc887221172afab6f703 100755 (executable)
@@ -255,6 +255,8 @@ class Web():
                         result = await player_cmd(cmd_args)
                     elif player_cmd:
                         result = await player_cmd()
+        except (Exception, AssertionError) as exc:
+            LOGGER.warning("Websocket disconnected - %s" % str(exc))
         finally:
             await self.mass.remove_event_listener(cb_id)
         LOGGER.debug('websocket connection closed')