From: marcelveldt Date: Sat, 19 Oct 2019 15:53:24 +0000 (+0200) Subject: fix memory leaks ? X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=54addbc8a7827a4a68b3cdaf40105b5e4f1b413d;p=music-assistant-server.git fix memory leaks ? fix all possible issues that could cause memory leak which seems to only occur on docker python 3.7.4 dev machine at 3.7.5 does not seem to have this issue --- diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index a9f2b920..6be460e7 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -59,7 +59,7 @@ class MusicAssistant(): await self.http_streamer.setup() # temp code to chase memory leak import subprocess - subprocess.call("pip install pympler", shell=True) + subprocess.call("pip install mem_top", shell=True) self.event_loop.create_task(self.print_memory()) def handle_exception(self, loop, context): @@ -85,17 +85,8 @@ class MusicAssistant(): ''' remove callback from our event listeners ''' self.event_listeners.pop(cb_id, None) - @run_periodic(30) + @run_periodic(60) + # TEMP: Check for any memory leaks async def print_memory(self): - - from pympler import muppy, summary - - all_objects = muppy.get_objects() - sum1 = summary.summarize(all_objects) - # Prints out a summary of the large objects - summary.print_(sum1) - # Get references to certain types of objects such as dataframe - # dataframes = [ao for ao in all_objects if isinstance(ao, pd.DataFrame)] - # for d in dataframes: - # print(d.columns.values) - # print(len(d)) + from mem_top import mem_top + print(mem_top()) diff --git a/music_assistant/homeassistant.py b/music_assistant/homeassistant.py index 1ee5fcaa..94bdae78 100644 --- a/music_assistant/homeassistant.py +++ b/music_assistant/homeassistant.py @@ -22,12 +22,20 @@ from .cache import use_cache CONF_KEY = 'homeassistant' CONF_PUBLISH_PLAYERS = "publish_players" EVENT_HASS_CHANGED = "hass entity changed" -CONFIG_ENTRIES = [ + +### auto detect hassio for auto config #### +if os.path.isfile('/data/options.json'): + IS_HASSIO = True + CONFIG_ENTRIES = [ + (CONF_ENABLED, False, CONF_ENABLED)] +else: + IS_HASSIO = False + CONFIG_ENTRIES = [ (CONF_ENABLED, False, CONF_ENABLED), (CONF_URL, 'localhost', 'hass_url'), (CONF_TOKEN, '', 'hass_token'), - (CONF_PUBLISH_PLAYERS, True, 'hass_publish') - ] + (CONF_PUBLISH_PLAYERS, True, 'hass_publish')] + class HomeAssistant(): ''' @@ -47,18 +55,22 @@ class HomeAssistant(): # load/create/update config config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES) self.enabled = config[CONF_ENABLED] - if self.enabled and (not config[CONF_URL] or - not config[CONF_TOKEN]): + if (self.enabled and not IS_HASSIO and not + (config[CONF_URL] or config[CONF_TOKEN])): LOGGER.warning("Invalid configuration for Home Assistant") self.enabled = False self._token = config[CONF_TOKEN] - url = config[CONF_URL] - if url.startswith('https://'): - self._use_ssl = True - self._host = url.replace('https://','').split('/')[0] - else: + if IS_HASSIO: self._use_ssl = False - self._host = url.replace('http://','').split('/')[0] + self._host = 'hassio/homeassistant' + else: + url = config[CONF_URL] + if url.startswith('https://'): + self._use_ssl = True + self._host = url.replace('https://','').split('/')[0] + else: + self._use_ssl = False + self._host = url.replace('http://','').split('/')[0] if self.enabled: LOGGER.info('Homeassistant integration is enabled') diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 20d8646d..fefc97a9 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -71,11 +71,13 @@ class HTTPStreamer(): try: while True: chunk = await buf_queue.get() - if not chunk: + if chunk: + await resp.write(chunk) + buf_queue.task_done() + del chunk + else: buf_queue.task_done() break - await resp.write(chunk) - buf_queue.task_done() except (asyncio.CancelledError, asyncio.TimeoutError): LOGGER.debug("stream interrupted") cancelled.set() @@ -86,20 +88,31 @@ class HTTPStreamer(): return resp async def __stream_single(self, player, queue_item, buffer, cancelled): - ''' start streaming single track from provider ''' - LOGGER.debug("stream single track started for track %s on player %s" % (queue_item.name, player.name)) + ''' start streaming single queue track ''' + LOGGER.debug("stream single queue track started for track %s on player %s" % (queue_item.name, player.name)) audio_stream = self.__get_audio_stream(player, queue_item, cancelled) async for is_last_chunk, audio_chunk in audio_stream: + if cancelled.is_set(): + # http session ended + # we must consume the data to prevent hanging subprocess instances + continue + # put chunk in buffer asyncio.run_coroutine_threadsafe( buffer.put(audio_chunk), self.mass.event_loop) - # indicate EOF if no more data - asyncio.run_coroutine_threadsafe( - buffer.put(b''), - self.mass.event_loop) + # this should be garbage collected but just in case... + del audio_chunk + # wait for the queue to consume the data + while not cancelled.is_set() and buffer.qsize() > 5: + await asyncio.sleep(0.5) + # all chunks received: streaming finished if cancelled.is_set(): LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) else: + # indicate EOF if no more data + asyncio.run_coroutine_threadsafe( + buffer.put(b''), + self.mass.event_loop) LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) async def __stream_queue(self, player, buffer, cancelled): @@ -108,8 +121,6 @@ class HTTPStreamer(): fade_length = try_parse_int(player.settings["crossfade_duration"]) if not sample_rate or sample_rate < 44100 or sample_rate > 384000: sample_rate = 96000 - elif player.player_provider == 'web': - sample_rate = 41100 if fade_length: fade_bytes = int(sample_rate * 4 * 2 * fade_length) else: @@ -123,24 +134,20 @@ class HTTPStreamer(): stdout=subprocess.PIPE, stdin=subprocess.PIPE) def fill_buffer(): - sample_size = int(sample_rate * 4 * 2) + chunk_size = int(sample_rate * 4 * 2) while sox_proc.returncode == None: - chunk = sox_proc.stdout.read(sample_size) - if not chunk: - # no more data - break - if not cancelled.is_set(): + chunk = sox_proc.stdout.read(chunk_size) + if chunk and not cancelled.is_set(): asyncio.run_coroutine_threadsafe( - buffer.put(chunk), - self.mass.event_loop) + buffer.put(chunk), self.mass.event_loop) + del chunk # indicate EOF if no more data if not cancelled.is_set(): asyncio.run_coroutine_threadsafe( - buffer.put(b''), - self.mass.event_loop) + buffer.put(b''), self.mass.event_loop) + # start fill buffer task in background threading.Thread(target=fill_buffer).start() - LOGGER.info("Start Queue Stream for player %s " %(player.name)) is_start = True last_fadeout_data = b'' @@ -166,7 +173,8 @@ class HTTPStreamer(): bytes_written = 0 # handle incoming audio chunks async for is_last_chunk, chunk in self.__get_audio_stream( - player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate): + player, queue_track, cancelled, chunksize=fade_bytes, + resample=sample_rate): cur_chunk += 1 ### HANDLE FIRST PART OF TRACK @@ -174,8 +182,10 @@ class HTTPStreamer(): # no fadeout_part available so just pass it to the output directly sox_proc.stdin.write(chunk) bytes_written += len(chunk) + del chunk elif cur_chunk == 1 and last_fadeout_data: prev_chunk = chunk + del chunk ### HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN elif cur_chunk == 2 and last_fadeout_data: # combine the first 2 chunks and strip off silence @@ -198,6 +208,7 @@ class HTTPStreamer(): sox_proc.stdin.write(remaining_bytes) bytes_written += len(remaining_bytes) del remaining_bytes + del chunk prev_chunk = None # needed to prevent this chunk being sent again ### HANDLE LAST PART OF TRACK elif prev_chunk and is_last_chunk: @@ -212,6 +223,7 @@ class HTTPStreamer(): sox_proc.stdin.write(last_part) bytes_written += len(last_part) del last_part + del chunk else: # handle crossfading support if len(last_part) < fade_bytes: @@ -232,6 +244,7 @@ class HTTPStreamer(): bytes_written += len(remaining_bytes) del last_part del remaining_bytes + del chunk ### MIDDLE PARTS OF TRACK else: # middle part of the track @@ -248,7 +261,7 @@ class HTTPStreamer(): # break out the loop if the http session is cancelled break else: - # WIP: update actual duration to the queue for more accurate now playing info + # update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / int(sample_rate * 4 * 2) queue_track.duration = accurate_duration LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) @@ -260,6 +273,7 @@ class HTTPStreamer(): if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) del last_fadeout_data + ### END OF QUEUE STREAM sox_proc.stdin.close() sox_proc.terminate() LOGGER.info("streaming of queue for player %s completed" % player.name) @@ -316,27 +330,35 @@ class HTTPStreamer(): while True: # read exactly buffersize of data if cancelled.is_set(): - process.kill() + # http session ended + # send terminate and pick up left over bytes + process.terminate() + # try to read as much data as possible from stdin data = process.stdout.read(chunksize) if not data: - # last bytes received + # no more data + # yield (empty) chunk as lust chunk yield (True, buf) bytes_sent += len(buf) + del buf break elif len(buf) + len(data) >= chunksize: + # enough data to send a chunk new_data = buf + data chunk = new_data[:chunksize] yield (False, chunk) bytes_sent += len(chunk) buf = new_data[chunksize:] del chunk + del data + del new_data else: buf += data - del buf + del data # fire event that streaming has ended asyncio.run_coroutine_threadsafe( self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop) - # send task to background to analyse the audio + # send task to main event loop to analyse the audio self.mass.event_loop.call_soon_threadsafe( asyncio.ensure_future, self.__analyze_audio(queue_item)) diff --git a/music_assistant/playerproviders/squeezebox.py b/music_assistant/playerproviders/squeezebox.py index 53a5bab9..9599a8d4 100644 --- a/music_assistant/playerproviders/squeezebox.py +++ b/music_assistant/playerproviders/squeezebox.py @@ -70,6 +70,7 @@ class PySqueezeProvider(PlayerProvider): break # handle incoming data from socket buffer = buffer + data + del data if len(buffer) > 8: operation, length = buffer[:4], buffer[4:8] plen = struct.unpack('!I', length)[0] + 8 diff --git a/music_assistant/web.py b/music_assistant/web.py index 10bed343..aeaaad82 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -229,7 +229,10 @@ class Web(): # register callback for internal events async def send_event(msg, msg_details): ws_msg = {"message": msg, "message_details": msg_details } - await ws.send_json(ws_msg, dumps=json_serializer) + try: + await ws.send_json(ws_msg, dumps=json_serializer) + except (AssertionError, asyncio.CancelledError): + await self.mass.remove_event_listener(cb_id) cb_id = await self.mass.add_event_listener(send_event) # process incoming messages async for msg in ws: diff --git a/music_assistant/web/components/player.vue.js b/music_assistant/web/components/player.vue.js index c526f845..cd552442 100755 --- a/music_assistant/web/components/player.vue.js +++ b/music_assistant/web/components/player.vue.js @@ -279,6 +279,8 @@ Vue.component("player", { } }, createAudioPlayer(data) { + if (navigator.userAgent.includes("WebKit")) + return // streaming flac not supported on webkit ?! if (localStorage.getItem('audio_player_id')) // get player id from local storage this.audioPlayerId = localStorage.getItem('audio_player_id'); @@ -306,21 +308,8 @@ Vue.component("player", { // add event handlers this.audioPlayer.addEventListener("canplaythrough", event => { /* the audio is now playable; play it if permissions allow */ - console.log("canplaythrough") this.audioPlayer.play(); }); - this.audioPlayer.addEventListener("canplay", event => { - /* the audio is now playable; play it if permissions allow */ - console.log("canplay"); - //this.audioPlayer.play(); - //msg_details['cur_uri'] = this.audioPlayer.src; - //this.ws.send(JSON.stringify({message:'webplayer state', message_details: msg_details})); - }); - this.audioPlayer.addEventListener("emptied", event => { - /* the audio is now playable; play it if permissions allow */ - console.log("emptied"); - //this.audioPlayer.play(); - }); const timeupdateHandler = (event) => { // currenTime of player updated, sent state (throttled at 1 sec) msg_details['cur_time'] = Math.round(this.audioPlayer.currentTime);