fix websocket and audio cache options
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 28 May 2019 15:04:03 +0000 (17:04 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 28 May 2019 15:04:03 +0000 (17:04 +0200)
music_assistant/modules/http_streamer.py
music_assistant/modules/web.py
music_assistant/utils.py

index a1ced4e6eceaa92d6c738623798abe3564399443..52fd01d3e22ca0521c953f96199099d95240ff06 100755 (executable)
@@ -3,11 +3,12 @@
 
 import asyncio
 import os
-from utils import LOGGER, try_parse_int, get_ip, run_async_background_task
+from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
 from models import TrackQuality
 import shutil
 import xml.etree.ElementTree as ET
 import random
+import base64
 
 AUDIO_TEMP_DIR = "/tmp/audio_tmp"
 AUDIO_CACHE_DIR = "/tmp/audio_cache"
@@ -19,11 +20,14 @@ class HTTPStreamer():
         self.mass = mass
         self.create_config_entries()
         self.local_ip = get_ip()
+        self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
         # create needed temp/cache dirs
-        if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(AUDIO_CACHE_DIR):
-            os.makedirs(AUDIO_CACHE_DIR)
+        if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir):
+            self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
+            os.makedirs(self._audio_cache_dir)
         if not os.path.isdir(AUDIO_TEMP_DIR):
             os.makedirs(AUDIO_TEMP_DIR)
+        mass.event_loop.create_task(self.__cache_cleanup())
 
     def create_config_entries(self):
         ''' sets the config entries for this module (list with key/value pairs)'''
@@ -32,7 +36,9 @@ class HTTPStreamer():
             ('target_volume', '-23', 'target_volume_lufs'),
             ('fallback_gain_correct', '-12', 'fallback_gain_correct'),
             ('enable_cache', True, 'enable_audio_cache'),
-            ('trim_silence', True, 'trim_silence')
+            ('trim_silence', True, 'trim_silence'),
+            ('audio_cache_folder', '/tmp/audio_cache', 'audio_cache_folder'),
+            ('audio_cache_max_size_gb', 20, 'audio_cache_max_size_gb')
             ]
         if not self.mass.config['base'].get('http_streamer'):
             self.mass.config['base']['http_streamer'] = {}
@@ -179,11 +185,36 @@ class HTTPStreamer():
         LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
         return
 
-    @staticmethod
-    def __get_track_cache_filename(track_id, provider):
+    def __get_track_cache_filename(self, track_id, provider):
         ''' get filename for a track to use as cache file '''
-        return os.path.join(AUDIO_CACHE_DIR, '%s_%s' %(provider, track_id.split(os.sep)[-1]))
+        filename = '%s%s' %(provider, track_id.split(os.sep)[-1])
+        filename = base64.b64encode(filename.encode()).decode()
+        return os.path.join(self._audio_cache_dir, filename)
+
+    @run_periodic(3600)
+    async def __cache_cleanup(self):
+        ''' calculate size of cache folder and cleanup if needed '''
+        size_limit = self.mass.config['base']['http_streamer']['audio_cache_max_size_gb']
+        total_size_gb = get_folder_size(self._audio_cache_dir)
+        LOGGER.info("current size of cache folder is %s GB" % total_size_gb)
+        if size_limit and total_size_gb > size_limit:
+            LOGGER.info("Cache folder size exceeds threshold, start cleanup...")
+            from pathlib import Path
+            import time
+            days = 14
+            while total_size_gb > size_limit:
+                time_in_secs = time.time() - (days * 24 * 60 * 60)
+                for i in Path(self._audio_cache_dir).iterdir():
+                    if i.is_file():
+                        if i.stat().st_atime <= time_in_secs:
+                            total_size_gb -= i.stat().st_size/float(1<<30)
+                            i.unlink()
+                    if total_size_gb < size_limit:
+                        break
+                days -= 1
+            LOGGER.info("Cache folder size cleanup completed")
 
+    
     @staticmethod
     def __get_bs1770_binary():
         ''' get the path to the bs1770 binary for the current OS '''
index c5fc2cb76ce57afa3e87929479bd5df4d22a5485..41686c69f333825076ce099852134798ab23f5c5 100755 (executable)
@@ -214,35 +214,30 @@ class Web():
         ws = web.WebSocketResponse()
         await ws.prepare(request)
         cb_id = None
-        # register callback for internal events
-        async def send_event(msg, msg_details):
-            ws_msg = {"message": msg, "message_details": msg_details }
-            try:
+        try:
+            # register callback for internal events
+            async def send_event(msg, msg_details):
+                ws_msg = {"message": msg, "message_details": msg_details }
                 await ws.send_json(ws_msg, dumps=json_serializer)
-            except Exception as exc:
-                LOGGER.error(exc)
-                await self.mass.remove_event_listener(cb_id)
-
-        cb_id = await self.mass.add_event_listener(send_event)
-        # process incoming messages
-        async for msg in ws:
-            if msg.type == aiohttp.WSMsgType.TEXT:
-                if msg.data == 'close':
-                    await self.mass.remove_event_listener(cb_id)
-                    await ws.close()
-                else:
-                    # for now we only use WS for (simple) player commands
-                    if msg.data == 'players':
-                        players = await self.mass.player.players()
-                        ws_msg = {'message': 'players', 'message_details': players}
-                        await ws.send_json(ws_msg, dumps=json_serializer)
-                    elif msg.data.startswith('players') and '/cmd/' in msg.data:
-                        # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args}
-                        msg_data_parts = msg.data.split('/')
-                        player_id = msg_data_parts[1]
-                        cmd = msg_data_parts[3]
-                        cmd_args = msg_data_parts[4] if len(msg_data_parts) == 5 else None
-                        await self.mass.player.player_command(player_id, cmd, cmd_args)
+            cb_id = await self.mass.add_event_listener(send_event)
+            # process incoming messages
+            async for msg in ws:
+                if msg.type != aiohttp.WSMsgType.TEXT:
+                    continue
+                # for now we only use WS for (simple) player commands
+                if msg.data == 'players':
+                    players = await self.mass.player.players()
+                    ws_msg = {'message': 'players', 'message_details': players}
+                    await ws.send_json(ws_msg, dumps=json_serializer)
+                elif msg.data.startswith('players') and '/cmd/' in msg.data:
+                    # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args}
+                    msg_data_parts = msg.data.split('/')
+                    player_id = msg_data_parts[1]
+                    cmd = msg_data_parts[3]
+                    cmd_args = msg_data_parts[4] if len(msg_data_parts) == 5 else None
+                    await self.mass.player.player_command(player_id, cmd, cmd_args)
+        finally:
+            await self.mass.remove_event_listener(cb_id)
         LOGGER.info('websocket connection closed')
         return ws
 
index 75b90cb1c878ab51cc18fb9107166d5125f44ff5..32098437090f070b348c98006ac091bc45b671b2 100755 (executable)
@@ -5,6 +5,7 @@ import asyncio
 import logging
 from concurrent.futures import ThreadPoolExecutor
 import socket
+import os
 logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s')
 consolehandler = logging.StreamHandler()
 consolehandler.setFormatter(logformat)
@@ -106,4 +107,14 @@ def get_ip():
     return IP
 
 def get_hostname():
-    return socket.gethostname()
\ No newline at end of file
+    return socket.gethostname()
+
+def get_folder_size(folderpath):
+    ''' get folder size in gb'''
+    total_size = 0
+    for dirpath, dirnames, filenames in os.walk(folderpath):
+        for f in filenames:
+            fp = os.path.join(dirpath, f)
+            total_size += os.path.getsize(fp)
+    total_size_gb = total_size/float(1<<30)
+    return total_size_gb
\ No newline at end of file