fix memory leaks ?
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Sat, 19 Oct 2019 15:53:24 +0000 (17:53 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Sat, 19 Oct 2019 15:53:24 +0000 (17:53 +0200)
fix all possible issues that could cause memory leak which seems to only occur on docker python 3.7.4
dev machine at 3.7.5 does not seem to have this issue

music_assistant/__init__.py
music_assistant/homeassistant.py
music_assistant/http_streamer.py
music_assistant/playerproviders/squeezebox.py
music_assistant/web.py
music_assistant/web/components/player.vue.js

index a9f2b9202c9484728d97b508fdd4cf9d3b5ffd82..6be460e7075453cec90be059f0492e5efd6da01c 100644 (file)
@@ -59,7 +59,7 @@ class MusicAssistant():
         await self.http_streamer.setup()
         # temp code to chase memory leak
         import subprocess
-        subprocess.call("pip install pympler", shell=True)
+        subprocess.call("pip install mem_top", shell=True)
         self.event_loop.create_task(self.print_memory())
 
     def handle_exception(self, loop, context):
@@ -85,17 +85,8 @@ class MusicAssistant():
         ''' remove callback from our event listeners '''
         self.event_listeners.pop(cb_id, None)
 
-    @run_periodic(30)
+    @run_periodic(60)
+    # TEMP: Check for any memory leaks
     async def print_memory(self):
-        
-        from pympler import muppy, summary
-        
-        all_objects = muppy.get_objects()
-        sum1 = summary.summarize(all_objects)
-        # Prints out a summary of the large objects
-        summary.print_(sum1)
-        # Get references to certain types of objects such as dataframe
-        # dataframes = [ao for ao in all_objects if isinstance(ao, pd.DataFrame)]
-        # for d in dataframes:
-        #     print(d.columns.values)
-        #     print(len(d))
+        from mem_top import mem_top
+        print(mem_top())
index 1ee5fcaa25b2892fc74b674285ae82063be9f1d8..94bdae784964c48644cef044a6531b162ad36106 100644 (file)
@@ -22,12 +22,20 @@ from .cache import use_cache
 CONF_KEY = 'homeassistant'
 CONF_PUBLISH_PLAYERS = "publish_players"
 EVENT_HASS_CHANGED = "hass entity changed"
-CONFIG_ENTRIES = [
+
+### auto detect hassio for auto config ####
+if os.path.isfile('/data/options.json'):
+    IS_HASSIO = True
+    CONFIG_ENTRIES = [
+        (CONF_ENABLED, False, CONF_ENABLED)]
+else:
+    IS_HASSIO = False
+    CONFIG_ENTRIES = [
         (CONF_ENABLED, False, CONF_ENABLED),
         (CONF_URL, 'localhost', 'hass_url'), 
         (CONF_TOKEN, '<password>', 'hass_token'),
-        (CONF_PUBLISH_PLAYERS, True, 'hass_publish')
-        ]
+        (CONF_PUBLISH_PLAYERS, True, 'hass_publish')]
+    
 
 class HomeAssistant():
     '''
@@ -47,18 +55,22 @@ class HomeAssistant():
         # load/create/update config
         config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES)
         self.enabled = config[CONF_ENABLED]
-        if self.enabled and (not config[CONF_URL] or 
-                not config[CONF_TOKEN]):
+        if (self.enabled and not IS_HASSIO and not 
+                (config[CONF_URL] or config[CONF_TOKEN])):
             LOGGER.warning("Invalid configuration for Home Assistant")
             self.enabled = False
         self._token = config[CONF_TOKEN]
-        url = config[CONF_URL]
-        if url.startswith('https://'):
-            self._use_ssl = True
-            self._host = url.replace('https://','').split('/')[0]
-        else:
+        if IS_HASSIO:
             self._use_ssl = False
-            self._host = url.replace('http://','').split('/')[0]
+            self._host = 'hassio/homeassistant'
+        else:
+            url = config[CONF_URL]
+            if url.startswith('https://'):
+                self._use_ssl = True
+                self._host = url.replace('https://','').split('/')[0]
+            else:
+                self._use_ssl = False
+                self._host = url.replace('http://','').split('/')[0]
         if self.enabled:
             LOGGER.info('Homeassistant integration is enabled')
 
index 20d8646db2ca034eeaf5e0f522ba3b26129d91a4..fefc97a93731ea68fe7e80b353d79edbd05c60d5 100755 (executable)
@@ -71,11 +71,13 @@ class HTTPStreamer():
         try:
             while True:
                 chunk = await buf_queue.get()
-                if not chunk:
+                if chunk:
+                    await resp.write(chunk)
+                    buf_queue.task_done()
+                    del chunk
+                else:
                     buf_queue.task_done()
                     break
-                await resp.write(chunk)
-                buf_queue.task_done()
         except (asyncio.CancelledError, asyncio.TimeoutError):
             LOGGER.debug("stream interrupted")
             cancelled.set()
@@ -86,20 +88,31 @@ class HTTPStreamer():
         return resp
     
     async def __stream_single(self, player, queue_item, buffer, cancelled):
-        ''' start streaming single track from provider '''
-        LOGGER.debug("stream single track started for track %s on player %s" % (queue_item.name, player.name))
+        ''' start streaming single queue track '''
+        LOGGER.debug("stream single queue track started for track %s on player %s" % (queue_item.name, player.name))
         audio_stream = self.__get_audio_stream(player, queue_item, cancelled)
         async for is_last_chunk, audio_chunk in audio_stream:
+            if cancelled.is_set():
+                # http session ended
+                # we must consume the data to prevent hanging subprocess instances
+                continue
+            # put chunk in buffer
             asyncio.run_coroutine_threadsafe(
                     buffer.put(audio_chunk), 
                     self.mass.event_loop)
-        # indicate EOF if no more data
-        asyncio.run_coroutine_threadsafe(
-                buffer.put(b''), 
-                self.mass.event_loop)
+            # this should be garbage collected but just in case...
+            del audio_chunk
+            # wait for the queue to consume the data
+            while not cancelled.is_set() and buffer.qsize() > 5:
+                await asyncio.sleep(0.5)
+        # all chunks received: streaming finished
         if cancelled.is_set():
             LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
         else:
+            # indicate EOF if no more data
+            asyncio.run_coroutine_threadsafe(
+                    buffer.put(b''), 
+                    self.mass.event_loop)
             LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
 
     async def __stream_queue(self, player, buffer, cancelled):
@@ -108,8 +121,6 @@ class HTTPStreamer():
         fade_length = try_parse_int(player.settings["crossfade_duration"])
         if not sample_rate or sample_rate < 44100 or sample_rate > 384000:
             sample_rate = 96000
-        elif player.player_provider == 'web':
-            sample_rate = 41100
         if fade_length:
             fade_bytes = int(sample_rate * 4 * 2 * fade_length)
         else:
@@ -123,24 +134,20 @@ class HTTPStreamer():
             stdout=subprocess.PIPE, stdin=subprocess.PIPE)
 
         def fill_buffer():
-            sample_size = int(sample_rate * 4 * 2)
+            chunk_size = int(sample_rate * 4 * 2)
             while sox_proc.returncode == None:
-                chunk = sox_proc.stdout.read(sample_size)
-                if not chunk:
-                    # no more data
-                    break
-                if not cancelled.is_set():
+                chunk = sox_proc.stdout.read(chunk_size)
+                if chunk and not cancelled.is_set():
                     asyncio.run_coroutine_threadsafe(
-                        buffer.put(chunk), 
-                        self.mass.event_loop)
+                        buffer.put(chunk), self.mass.event_loop)
+                del chunk
             # indicate EOF if no more data
             if not cancelled.is_set():
                 asyncio.run_coroutine_threadsafe(
-                        buffer.put(b''), 
-                        self.mass.event_loop)
+                        buffer.put(b''),  self.mass.event_loop)
+        # start fill buffer task in background
         threading.Thread(target=fill_buffer).start()
         
-
         LOGGER.info("Start Queue Stream for player %s " %(player.name))
         is_start = True
         last_fadeout_data = b''
@@ -166,7 +173,8 @@ class HTTPStreamer():
             bytes_written = 0
             # handle incoming audio chunks
             async for is_last_chunk, chunk in self.__get_audio_stream(
-                    player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate):
+                    player, queue_track, cancelled, chunksize=fade_bytes, 
+                    resample=sample_rate):
                 cur_chunk += 1
 
                 ### HANDLE FIRST PART OF TRACK
@@ -174,8 +182,10 @@ class HTTPStreamer():
                     # no fadeout_part available so just pass it to the output directly
                     sox_proc.stdin.write(chunk)
                     bytes_written += len(chunk)
+                    del chunk
                 elif cur_chunk == 1 and last_fadeout_data:
                     prev_chunk = chunk
+                    del chunk
                 ### HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
                 elif cur_chunk == 2 and last_fadeout_data:
                     # combine the first 2 chunks and strip off silence
@@ -198,6 +208,7 @@ class HTTPStreamer():
                     sox_proc.stdin.write(remaining_bytes)
                     bytes_written += len(remaining_bytes)
                     del remaining_bytes
+                    del chunk
                     prev_chunk = None # needed to prevent this chunk being sent again
                 ### HANDLE LAST PART OF TRACK
                 elif prev_chunk and is_last_chunk:
@@ -212,6 +223,7 @@ class HTTPStreamer():
                         sox_proc.stdin.write(last_part)
                         bytes_written += len(last_part)
                         del last_part
+                        del chunk
                     else:
                         # handle crossfading support
                         if len(last_part) < fade_bytes:
@@ -232,6 +244,7 @@ class HTTPStreamer():
                             bytes_written += len(remaining_bytes)
                             del last_part
                             del remaining_bytes
+                            del chunk
                 ### MIDDLE PARTS OF TRACK
                 else:
                     # middle part of the track
@@ -248,7 +261,7 @@ class HTTPStreamer():
                 # break out the loop if the http session is cancelled
                 break
             else:
-                # WIP: update actual duration to the queue for more accurate now playing info
+                # update actual duration to the queue for more accurate now playing info
                 accurate_duration = bytes_written / int(sample_rate * 4 * 2)
                 queue_track.duration = accurate_duration
                 LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
@@ -260,6 +273,7 @@ class HTTPStreamer():
         if last_fadeout_data and not cancelled.is_set():
             sox_proc.stdin.write(last_fadeout_data)
             del last_fadeout_data
+        ### END OF QUEUE STREAM
         sox_proc.stdin.close()
         sox_proc.terminate()
         LOGGER.info("streaming of queue for player %s completed" % player.name)
@@ -316,27 +330,35 @@ class HTTPStreamer():
         while True:
             # read exactly buffersize of data
             if cancelled.is_set():
-                process.kill()
+                # http session ended
+                # send terminate and pick up left over bytes
+                process.terminate()
+            # try to read as much data as possible from stdin
             data = process.stdout.read(chunksize)
             if not data:
-                # last bytes received
+                # no more data
+                # yield (empty) chunk as lust chunk
                 yield (True, buf)
                 bytes_sent += len(buf)
+                del buf
                 break
             elif len(buf) + len(data) >= chunksize:
+                # enough data to send a chunk
                 new_data = buf + data
                 chunk = new_data[:chunksize]
                 yield (False, chunk)
                 bytes_sent += len(chunk)
                 buf = new_data[chunksize:]
                 del chunk
+                del data
+                del new_data
             else:
                 buf += data
-        del buf
+                del data
         # fire event that streaming has ended
         asyncio.run_coroutine_threadsafe(
                 self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop)
-        # send task to background to analyse the audio
+        # send task to main event loop to analyse the audio
         self.mass.event_loop.call_soon_threadsafe(
                 asyncio.ensure_future, self.__analyze_audio(queue_item))
 
index 53a5bab94d711799e9a6156b2889e8bc3b568484..9599a8d47735403698da521ed983e8ac54001de4 100644 (file)
@@ -70,6 +70,7 @@ class PySqueezeProvider(PlayerProvider):
                     break
                 # handle incoming data from socket
                 buffer = buffer + data
+                del data
                 if len(buffer) > 8:
                     operation, length = buffer[:4], buffer[4:8]
                     plen = struct.unpack('!I', length)[0] + 8
index 10bed3433767e80cb21747ad4ab6a2c3cae75cdc..aeaaad825a7fc4c5a23f0735ccefe8f414eb62e6 100755 (executable)
@@ -229,7 +229,10 @@ class Web():
             # register callback for internal events
             async def send_event(msg, msg_details):
                 ws_msg = {"message": msg, "message_details": msg_details }
-                await ws.send_json(ws_msg, dumps=json_serializer)
+                try:
+                    await ws.send_json(ws_msg, dumps=json_serializer)
+                except (AssertionError, asyncio.CancelledError):
+                    await self.mass.remove_event_listener(cb_id)
             cb_id = await self.mass.add_event_listener(send_event)
             # process incoming messages
             async for msg in ws:
index c526f845fe484c220061a12d9f0a5fec495fd97a..cd55244290fe72947041a194d6f2ed000ef2f59e 100755 (executable)
@@ -279,6 +279,8 @@ Vue.component("player", {
         }
     },
     createAudioPlayer(data) {
+      if (navigator.userAgent.includes("WebKit"))
+        return // streaming flac not supported on webkit ?!
       if (localStorage.getItem('audio_player_id'))
         // get player id from local storage
         this.audioPlayerId = localStorage.getItem('audio_player_id');
@@ -306,21 +308,8 @@ Vue.component("player", {
       // add event handlers
       this.audioPlayer.addEventListener("canplaythrough", event => {
         /* the audio is now playable; play it if permissions allow */
-        console.log("canplaythrough")
         this.audioPlayer.play();
       });
-      this.audioPlayer.addEventListener("canplay", event => {
-        /* the audio is now playable; play it if permissions allow */
-        console.log("canplay");
-        //this.audioPlayer.play();
-        //msg_details['cur_uri'] = this.audioPlayer.src;
-        //this.ws.send(JSON.stringify({message:'webplayer state', message_details: msg_details}));
-      });
-      this.audioPlayer.addEventListener("emptied", event => {
-        /* the audio is now playable; play it if permissions allow */
-        console.log("emptied");
-        //this.audioPlayer.play();
-      });
       const timeupdateHandler = (event) => {
         // currenTime of player updated, sent state (throttled at 1 sec)
         msg_details['cur_time'] = Math.round(this.audioPlayer.currentTime);