From: Marcel van der Veldt Date: Tue, 28 May 2019 15:04:03 +0000 (+0200) Subject: fix websocket and audio cache options X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=87a4b7dc23e290fa3571ef084ec6c22a0a3168d8;p=music-assistant-server.git fix websocket and audio cache options --- diff --git a/music_assistant/modules/http_streamer.py b/music_assistant/modules/http_streamer.py index a1ced4e6..52fd01d3 100755 --- a/music_assistant/modules/http_streamer.py +++ b/music_assistant/modules/http_streamer.py @@ -3,11 +3,12 @@ import asyncio import os -from utils import LOGGER, try_parse_int, get_ip, run_async_background_task +from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size from models import TrackQuality import shutil import xml.etree.ElementTree as ET import random +import base64 AUDIO_TEMP_DIR = "/tmp/audio_tmp" AUDIO_CACHE_DIR = "/tmp/audio_cache" @@ -19,11 +20,14 @@ class HTTPStreamer(): self.mass = mass self.create_config_entries() self.local_ip = get_ip() + self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder'] # create needed temp/cache dirs - if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(AUDIO_CACHE_DIR): - os.makedirs(AUDIO_CACHE_DIR) + if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir): + self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder'] + os.makedirs(self._audio_cache_dir) if not os.path.isdir(AUDIO_TEMP_DIR): os.makedirs(AUDIO_TEMP_DIR) + mass.event_loop.create_task(self.__cache_cleanup()) def create_config_entries(self): ''' sets the config entries for this module (list with key/value pairs)''' @@ -32,7 +36,9 @@ class HTTPStreamer(): ('target_volume', '-23', 'target_volume_lufs'), ('fallback_gain_correct', '-12', 'fallback_gain_correct'), ('enable_cache', True, 'enable_audio_cache'), - ('trim_silence', True, 'trim_silence') + ('trim_silence', True, 'trim_silence'), + ('audio_cache_folder', '/tmp/audio_cache', 'audio_cache_folder'), + ('audio_cache_max_size_gb', 20, 'audio_cache_max_size_gb') ] if not self.mass.config['base'].get('http_streamer'): self.mass.config['base']['http_streamer'] = {} @@ -179,11 +185,36 @@ class HTTPStreamer(): LOGGER.info("fill_audio_buffer complete for track %s" % track_id) return - @staticmethod - def __get_track_cache_filename(track_id, provider): + def __get_track_cache_filename(self, track_id, provider): ''' get filename for a track to use as cache file ''' - return os.path.join(AUDIO_CACHE_DIR, '%s_%s' %(provider, track_id.split(os.sep)[-1])) + filename = '%s%s' %(provider, track_id.split(os.sep)[-1]) + filename = base64.b64encode(filename.encode()).decode() + return os.path.join(self._audio_cache_dir, filename) + + @run_periodic(3600) + async def __cache_cleanup(self): + ''' calculate size of cache folder and cleanup if needed ''' + size_limit = self.mass.config['base']['http_streamer']['audio_cache_max_size_gb'] + total_size_gb = get_folder_size(self._audio_cache_dir) + LOGGER.info("current size of cache folder is %s GB" % total_size_gb) + if size_limit and total_size_gb > size_limit: + LOGGER.info("Cache folder size exceeds threshold, start cleanup...") + from pathlib import Path + import time + days = 14 + while total_size_gb > size_limit: + time_in_secs = time.time() - (days * 24 * 60 * 60) + for i in Path(self._audio_cache_dir).iterdir(): + if i.is_file(): + if i.stat().st_atime <= time_in_secs: + total_size_gb -= i.stat().st_size/float(1<<30) + i.unlink() + if total_size_gb < size_limit: + break + days -= 1 + LOGGER.info("Cache folder size cleanup completed") + @staticmethod def __get_bs1770_binary(): ''' get the path to the bs1770 binary for the current OS ''' diff --git a/music_assistant/modules/web.py b/music_assistant/modules/web.py index c5fc2cb7..41686c69 100755 --- a/music_assistant/modules/web.py +++ b/music_assistant/modules/web.py @@ -214,35 +214,30 @@ class Web(): ws = web.WebSocketResponse() await ws.prepare(request) cb_id = None - # register callback for internal events - async def send_event(msg, msg_details): - ws_msg = {"message": msg, "message_details": msg_details } - try: + try: + # register callback for internal events + async def send_event(msg, msg_details): + ws_msg = {"message": msg, "message_details": msg_details } await ws.send_json(ws_msg, dumps=json_serializer) - except Exception as exc: - LOGGER.error(exc) - await self.mass.remove_event_listener(cb_id) - - cb_id = await self.mass.add_event_listener(send_event) - # process incoming messages - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - if msg.data == 'close': - await self.mass.remove_event_listener(cb_id) - await ws.close() - else: - # for now we only use WS for (simple) player commands - if msg.data == 'players': - players = await self.mass.player.players() - ws_msg = {'message': 'players', 'message_details': players} - await ws.send_json(ws_msg, dumps=json_serializer) - elif msg.data.startswith('players') and '/cmd/' in msg.data: - # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args} - msg_data_parts = msg.data.split('/') - player_id = msg_data_parts[1] - cmd = msg_data_parts[3] - cmd_args = msg_data_parts[4] if len(msg_data_parts) == 5 else None - await self.mass.player.player_command(player_id, cmd, cmd_args) + cb_id = await self.mass.add_event_listener(send_event) + # process incoming messages + async for msg in ws: + if msg.type != aiohttp.WSMsgType.TEXT: + continue + # for now we only use WS for (simple) player commands + if msg.data == 'players': + players = await self.mass.player.players() + ws_msg = {'message': 'players', 'message_details': players} + await ws.send_json(ws_msg, dumps=json_serializer) + elif msg.data.startswith('players') and '/cmd/' in msg.data: + # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args} + msg_data_parts = msg.data.split('/') + player_id = msg_data_parts[1] + cmd = msg_data_parts[3] + cmd_args = msg_data_parts[4] if len(msg_data_parts) == 5 else None + await self.mass.player.player_command(player_id, cmd, cmd_args) + finally: + await self.mass.remove_event_listener(cb_id) LOGGER.info('websocket connection closed') return ws diff --git a/music_assistant/utils.py b/music_assistant/utils.py index 75b90cb1..32098437 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -5,6 +5,7 @@ import asyncio import logging from concurrent.futures import ThreadPoolExecutor import socket +import os logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s') consolehandler = logging.StreamHandler() consolehandler.setFormatter(logformat) @@ -106,4 +107,14 @@ def get_ip(): return IP def get_hostname(): - return socket.gethostname() \ No newline at end of file + return socket.gethostname() + +def get_folder_size(folderpath): + ''' get folder size in gb''' + total_size = 0 + for dirpath, dirnames, filenames in os.walk(folderpath): + for f in filenames: + fp = os.path.join(dirpath, f) + total_size += os.path.getsize(fp) + total_size_gb = total_size/float(1<<30) + return total_size_gb \ No newline at end of file