import asyncio
import os
-from utils import LOGGER, try_parse_int, get_ip, run_async_background_task
+from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
from models import TrackQuality
import shutil
import xml.etree.ElementTree as ET
import random
+import base64
AUDIO_TEMP_DIR = "/tmp/audio_tmp"
AUDIO_CACHE_DIR = "/tmp/audio_cache"
self.mass = mass
self.create_config_entries()
self.local_ip = get_ip()
+ self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
# create needed temp/cache dirs
- if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(AUDIO_CACHE_DIR):
- os.makedirs(AUDIO_CACHE_DIR)
+ if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir):
+ self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
+ os.makedirs(self._audio_cache_dir)
if not os.path.isdir(AUDIO_TEMP_DIR):
os.makedirs(AUDIO_TEMP_DIR)
+ mass.event_loop.create_task(self.__cache_cleanup())
def create_config_entries(self):
''' sets the config entries for this module (list with key/value pairs)'''
('target_volume', '-23', 'target_volume_lufs'),
('fallback_gain_correct', '-12', 'fallback_gain_correct'),
('enable_cache', True, 'enable_audio_cache'),
- ('trim_silence', True, 'trim_silence')
+ ('trim_silence', True, 'trim_silence'),
+ ('audio_cache_folder', '/tmp/audio_cache', 'audio_cache_folder'),
+ ('audio_cache_max_size_gb', 20, 'audio_cache_max_size_gb')
]
if not self.mass.config['base'].get('http_streamer'):
self.mass.config['base']['http_streamer'] = {}
LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
return
- @staticmethod
- def __get_track_cache_filename(track_id, provider):
+ def __get_track_cache_filename(self, track_id, provider):
''' get filename for a track to use as cache file '''
- return os.path.join(AUDIO_CACHE_DIR, '%s_%s' %(provider, track_id.split(os.sep)[-1]))
+ filename = '%s%s' %(provider, track_id.split(os.sep)[-1])
+ filename = base64.b64encode(filename.encode()).decode()
+ return os.path.join(self._audio_cache_dir, filename)
+
+ @run_periodic(3600)
+ async def __cache_cleanup(self):
+ ''' calculate size of cache folder and cleanup if needed '''
+ size_limit = self.mass.config['base']['http_streamer']['audio_cache_max_size_gb']
+ total_size_gb = get_folder_size(self._audio_cache_dir)
+ LOGGER.info("current size of cache folder is %s GB" % total_size_gb)
+ if size_limit and total_size_gb > size_limit:
+ LOGGER.info("Cache folder size exceeds threshold, start cleanup...")
+ from pathlib import Path
+ import time
+ days = 14
+ while total_size_gb > size_limit:
+ time_in_secs = time.time() - (days * 24 * 60 * 60)
+ for i in Path(self._audio_cache_dir).iterdir():
+ if i.is_file():
+ if i.stat().st_atime <= time_in_secs:
+ total_size_gb -= i.stat().st_size/float(1<<30)
+ i.unlink()
+ if total_size_gb < size_limit:
+ break
+ days -= 1
+ LOGGER.info("Cache folder size cleanup completed")
+
@staticmethod
def __get_bs1770_binary():
''' get the path to the bs1770 binary for the current OS '''
ws = web.WebSocketResponse()
await ws.prepare(request)
cb_id = None
- # register callback for internal events
- async def send_event(msg, msg_details):
- ws_msg = {"message": msg, "message_details": msg_details }
- try:
+ try:
+ # register callback for internal events
+ async def send_event(msg, msg_details):
+ ws_msg = {"message": msg, "message_details": msg_details }
await ws.send_json(ws_msg, dumps=json_serializer)
- except Exception as exc:
- LOGGER.error(exc)
- await self.mass.remove_event_listener(cb_id)
-
- cb_id = await self.mass.add_event_listener(send_event)
- # process incoming messages
- async for msg in ws:
- if msg.type == aiohttp.WSMsgType.TEXT:
- if msg.data == 'close':
- await self.mass.remove_event_listener(cb_id)
- await ws.close()
- else:
- # for now we only use WS for (simple) player commands
- if msg.data == 'players':
- players = await self.mass.player.players()
- ws_msg = {'message': 'players', 'message_details': players}
- await ws.send_json(ws_msg, dumps=json_serializer)
- elif msg.data.startswith('players') and '/cmd/' in msg.data:
- # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args}
- msg_data_parts = msg.data.split('/')
- player_id = msg_data_parts[1]
- cmd = msg_data_parts[3]
- cmd_args = msg_data_parts[4] if len(msg_data_parts) == 5 else None
- await self.mass.player.player_command(player_id, cmd, cmd_args)
+ cb_id = await self.mass.add_event_listener(send_event)
+ # process incoming messages
+ async for msg in ws:
+ if msg.type != aiohttp.WSMsgType.TEXT:
+ continue
+ # for now we only use WS for (simple) player commands
+ if msg.data == 'players':
+ players = await self.mass.player.players()
+ ws_msg = {'message': 'players', 'message_details': players}
+ await ws.send_json(ws_msg, dumps=json_serializer)
+ elif msg.data.startswith('players') and '/cmd/' in msg.data:
+ # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args}
+ msg_data_parts = msg.data.split('/')
+ player_id = msg_data_parts[1]
+ cmd = msg_data_parts[3]
+ cmd_args = msg_data_parts[4] if len(msg_data_parts) == 5 else None
+ await self.mass.player.player_command(player_id, cmd, cmd_args)
+ finally:
+ await self.mass.remove_event_listener(cb_id)
LOGGER.info('websocket connection closed')
return ws
import logging
from concurrent.futures import ThreadPoolExecutor
import socket
+import os
logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s')
consolehandler = logging.StreamHandler()
consolehandler.setFormatter(logformat)
return IP
def get_hostname():
- return socket.gethostname()
\ No newline at end of file
+ return socket.gethostname()
+
+def get_folder_size(folderpath):
+ ''' get folder size in gb'''
+ total_size = 0
+ for dirpath, dirnames, filenames in os.walk(folderpath):
+ for f in filenames:
+ fp = os.path.join(dirpath, f)
+ total_size += os.path.getsize(fp)
+ total_size_gb = total_size/float(1<<30)
+ return total_size_gb
\ No newline at end of file