import asyncio
import logging
-from concurrent.futures import ThreadPoolExecutor
import socket
import importlib
import os
import json
LOGGER = logging.getLogger('music_assistant')
-from .constants import CONF_KEY_MUSICPROVIDERS, CONF_ENABLED
+from .constants import CONF_KEY_MUSICPROVIDERS, CONF_KEY_PLAYERPROVIDERS, CONF_ENABLED
IS_HASSIO = os.path.isfile('/data/options.json')
def run_async_background_task(executor, corofn, *args):
''' run async task in background '''
def run_task(corofn, *args):
- LOGGER.debug('running %s in background task' % corofn.__name__)
+ LOGGER.debug('running %s in background task', corofn.__name__)
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
coro = corofn(*args)
res = new_loop.run_until_complete(coro)
new_loop.close()
- LOGGER.debug('completed %s in background task' % corofn.__name__)
+ LOGGER.debug('completed %s in background task', corofn.__name__)
return res
return asyncio.get_event_loop().run_in_executor(executor, run_task, corofn, *args)
yield items
else:
for item in items:
- yield items
+ yield item
def try_parse_float(possible_float):
try:
LOGGER.debug("Could not load json from file %s - %s" % (jsonfile, str(exc)))
return None
-def load_provider_modules(mass, prov_type=CONF_KEY_MUSICPROVIDERS):
+async def load_provider_modules(mass, provider_modules, prov_type=CONF_KEY_MUSICPROVIDERS):
''' dynamically load music/player providers '''
- provider_modules = {}
base_dir = os.path.dirname(os.path.abspath(__file__))
modules_path = os.path.join(base_dir, prov_type )
+ # load modules
for item in os.listdir(modules_path):
if (os.path.isfile(os.path.join(modules_path, item)) and not item.startswith("_") and
item.endswith('.py') and not item.startswith('.')):
module_name = item.replace(".py","")
- prov_mod = load_provider_module(mass, module_name, prov_type)
- if prov_mod:
- provider_modules[prov_mod.prov_id] = prov_mod
- return provider_modules
+ if module_name not in provider_modules:
+ prov_mod = await load_provider_module(mass, module_name, prov_type)
+ if prov_mod:
+ provider_modules[module_name] = prov_mod
+ # unload modules (if needed)
+ removed_modules = []
+ for prov_id, prov in provider_modules.items():
+ if not mass.config[prov_type][prov_id][CONF_ENABLED]:
+ removed_modules.append(prov_id)
+ if hasattr(prov, 'http_session'):
+ await prov.http_session.close()
+ if prov_type == CONF_KEY_PLAYERPROVIDERS:
+ for player in prov.players:
+ await mass.players.remove_player(player.player_id)
+ for prov_id in removed_modules:
+ provider_modules.pop(prov_id, None)
+ LOGGER.info('Unloaded %s module', prov_id)
-
-def load_provider_module(mass, module_name, prov_type):
+async def load_provider_module(mass, module_name, prov_type):
''' dynamically load music/player provider '''
- LOGGER.debug("Loading provider module %s" % module_name)
try:
prov_mod = importlib.import_module(f".{module_name}",
f"music_assistant.{prov_type}")
prov_conf_entries = prov_mod.CONFIG_ENTRIES
- prov_id = prov_mod.PROV_ID
+ prov_id = module_name
+ prov_name = prov_mod.PROV_NAME
+ prov_class = prov_mod.PROV_CLASS
# get/create config for the module
prov_config = mass.config.create_module_config(
prov_id, prov_conf_entries, prov_type)
if prov_config[CONF_ENABLED]:
- prov_mod_cls = getattr(prov_mod, prov_mod.PROV_CLASS)
- provider = prov_mod_cls(mass, prov_config)
- LOGGER.info("Successfully initialized module %s" % provider.name)
+ prov_mod_cls = getattr(prov_mod, prov_class)
+ provider = prov_mod_cls(mass)
+ provider.prov_id = prov_id
+ provider.name = prov_name
+ await provider.setup(prov_config)
+ LOGGER.info("Successfully initialized module %s", provider.name)
return provider
else:
return None
except Exception as exc:
- LOGGER.exception("Error loading module %s: %s" %(module_name, exc))
+ LOGGER.exception("Error loading module %s: %s", module_name, exc)
import threading
from .models.media_types import MediaItem, MediaType, media_type_from_string
from .utils import run_periodic, LOGGER, IS_HASSIO, run_async_background_task, get_ip, json_serializer
+from .constants import CONF_KEY_PLAYERSETTINGS, CONF_KEY_MUSICPROVIDERS, CONF_KEY_PLAYERPROVIDERS
CONF_KEY = 'web'
if IS_HASSIO:
# on hassio we use ingress
- CONFIG_ENTRIES = []
+ CONFIG_ENTRIES = [('https_port', 8096, 'web_https_port'),
+ ('ssl_certificate', '', 'web_ssl_cert'),
+ ('ssl_key', '', 'web_ssl_key'),
+ ('cert_fqdn_host', '', 'cert_fqdn_host')]
else:
- CONFIG_ENTRIES = [
- ('http_port', 8095, 'web_http_port'),
- ('https_port', 8096, 'web_https_port'),
- ('ssl_certificate', '', 'web_ssl_cert'),
- ('ssl_key', '', 'web_ssl_key'),
- ('cert_fqdn_host', '', 'cert_fqdn_host')
- ]
+ CONFIG_ENTRIES = [('http_port', 8095, 'web_http_port'),
+ ('https_port', 8096, 'web_https_port'),
+ ('ssl_certificate', '', 'web_ssl_cert'),
+ ('ssl_key', '', 'web_ssl_key'),
+ ('cert_fqdn_host', '', 'cert_fqdn_host')]
+
class ClassRouteTableDef(web.RouteTableDef):
def __repr__(self) -> str:
return "<ClassRouteTableDef count={}>".format(len(self._items))
- def route(self,
- method: str,
- path: str,
- **kwargs):
+ def route(self, method: str, path: str, **kwargs):
def inner(handler):
handler.route_info = (method, path, kwargs)
return handler
+
return inner
def add_class_routes(self, instance) -> None:
def predicate(member) -> bool:
return all((inspect.iscoroutinefunction(member),
hasattr(member, "route_info")))
+
for _, handler in inspect.getmembers(instance, predicate):
method, path, kwargs = handler.route_info
super().route(method, path, **kwargs)(handler)
+
+
routes = ClassRouteTableDef()
+
class Web():
""" webserver and json/websocket api """
runner = None
-
+
def __init__(self, mass):
self.mass = mass
# load/create/update config
- config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES)
+ config = self.mass.config.create_module_config(CONF_KEY,
+ CONFIG_ENTRIES)
self.local_ip = get_ip()
self.config = config
if IS_HASSIO:
- # retrieve ingress port
+ # retrieve ingress http port
import requests
- response = requests.get(
- "http://hassio/addons/self/info",
- headers = {"X-HASSIO-KEY": os.environ["HASSIO_TOKEN"]}).json()
+ url = 'http://hassio/addons/self/info'
+ headers = { "X-HASSIO-KEY":os.environ["HASSIO_TOKEN"] }
+ response = requests.get(url, headers=headers).json()
self.http_port = response["data"]["ingress_port"]
- self.https_port = 0
- self._enable_ssl = False
else:
# use settings from config
self.http_port = config['http_port']
- enable_ssl = config['ssl_certificate'] and config['ssl_key']
- if config['ssl_certificate'] and not os.path.isfile(
- config['ssl_certificate']):
- enable_ssl = False
- LOGGER.warning("SSL certificate file not found: %s", config['ssl_certificate'])
- if config['ssl_key'] and not os.path.isfile(config['ssl_key']):
- enable_ssl = False
- LOGGER.warning( "SSL certificate key file not found: %s", config['ssl_key'])
- self.https_port = config['https_port']
- self._enable_ssl = enable_ssl
+ enable_ssl = config['ssl_certificate'] and config['ssl_key']
+ if config['ssl_certificate'] and not os.path.isfile(
+ config['ssl_certificate']):
+ enable_ssl = False
+ LOGGER.warning("SSL certificate file not found: %s",
+ config['ssl_certificate'])
+ if config['ssl_key'] and not os.path.isfile(config['ssl_key']):
+ enable_ssl = False
+ LOGGER.warning("SSL certificate key file not found: %s",
+ config['ssl_key'])
+ self.https_port = config['https_port']
+ self._enable_ssl = enable_ssl
async def setup(self):
""" perform async setup """
app = web.Application()
app.add_routes(routes)
app.add_routes([
- web.get('/stream/{player_id}', self.mass.http_streamer.stream, allow_head=False),
- web.get('/stream/{player_id}/{queue_item_id}', self.mass.http_streamer.stream, allow_head=False),
+ web.get('/stream/{player_id}',
+ self.mass.http_streamer.stream,
+ allow_head=False),
+ web.get('/stream/{player_id}/{queue_item_id}',
+ self.mass.http_streamer.stream,
+ allow_head=False),
web.get('/', self.index),
web.get('/jsonrpc.js', self.json_rpc),
web.post('/jsonrpc.js', self.json_rpc),
web.get('/ws', self.websocket_handler)
])
- webdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'web/')
+ webdir = os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ 'web/')
app.router.add_static("/", webdir)
-
+
# Add CORS support to all routes
- cors = aiohttp_cors.setup( app,
+ cors = aiohttp_cors.setup(
+ app,
defaults={
- "*": aiohttp_cors.ResourceOptions(
+ "*":
+ aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
LOGGER.info("Started HTTP webserver on port %s", self.http_port)
if self._enable_ssl:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- ssl_context.load_cert_chain(self.config['ssl_certificate'], self.config['ssl_key'])
- https_site = web.TCPSite(self.runner, '0.0.0.0', self.config['https_port'], ssl_context=ssl_context)
+ ssl_context.load_cert_chain(self.config['ssl_certificate'],
+ self.config['ssl_key'])
+ https_site = web.TCPSite(self.runner,
+ '0.0.0.0',
+ self.config['https_port'],
+ ssl_context=ssl_context)
await https_site.start()
- LOGGER.info("Started HTTPS webserver on port %s", self.config['https_port'])
+ LOGGER.info("Started HTTPS webserver on port %s",
+ self.config['https_port'])
async def index(self, request):
- index_file = os.path.join(
- os.path.dirname(os.path.abspath(__file__)), 'web/index.html')
+ index_file = os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ 'web/index.html')
return web.FileResponse(index_file)
@routes.get('/api/library/artists')
"""Get all library artists."""
orderby = request.query.get('orderby', 'name')
provider_filter = request.rel_url.query.get('provider')
- iterator = self.mass.music.library_artists(orderby=orderby, provider_filter=provider_filter)
+ iterator = self.mass.music.library_artists(
+ orderby=orderby, provider_filter=provider_filter)
return await self.__stream_json(request, iterator)
@routes.get('/api/library/albums')
"""Get all library albums."""
orderby = request.query.get('orderby', 'name')
provider_filter = request.rel_url.query.get('provider')
- iterator = self.mass.music.library_albums(orderby=orderby, provider_filter=provider_filter)
+ iterator = self.mass.music.library_albums(
+ orderby=orderby, provider_filter=provider_filter)
return await self.__stream_json(request, iterator)
@routes.get('/api/library/tracks')
"""Get all library tracks."""
orderby = request.query.get('orderby', 'name')
provider_filter = request.rel_url.query.get('provider')
- iterator = self.mass.music.library_tracks(orderby=orderby, provider_filter=provider_filter)
+ iterator = self.mass.music.library_tracks(
+ orderby=orderby, provider_filter=provider_filter)
return await self.__stream_json(request, iterator)
@routes.get('/api/library/radios')
"""Get all library radios."""
orderby = request.query.get('orderby', 'name')
provider_filter = request.rel_url.query.get('provider')
- iterator = self.mass.music.library_radios(orderby=orderby, provider_filter=provider_filter)
+ iterator = self.mass.music.library_radios(
+ orderby=orderby, provider_filter=provider_filter)
return await self.__stream_json(request, iterator)
@routes.get('/api/library/playlists')
"""Get all library playlists."""
orderby = request.query.get('orderby', 'name')
provider_filter = request.rel_url.query.get('provider')
- iterator = self.mass.music.library_playlists(orderby=orderby, provider_filter=provider_filter)
+ iterator = self.mass.music.library_playlists(
+ orderby=orderby, provider_filter=provider_filter)
return await self.__stream_json(request, iterator)
@routes.put('/api/library')
return web.Response(text='invalid item or provider', status=501)
result = await self.mass.music.artist(item_id, provider, lazy=lazy)
return web.json_response(result, dumps=json_serializer)
-
+
@routes.get('/api/albums/{item_id}')
async def album(self, request):
""" get full album details"""
media_id = request.match_info.get('media_id')
provider = request.rel_url.query.get('provider')
if (media_id is None or provider is None):
- return web.Response(text='invalid media_id or provider', status=501)
+ return web.Response(text='invalid media_id or provider',
+ status=501)
size = int(request.rel_url.query.get('size', 0))
img_file = await self.mass.music.get_image_thumb(
- media_id, media_type, provider, size)
+ media_id, media_type, provider, size)
if not img_file or not os.path.isfile(img_file):
return web.Response(status=404)
- headers = {'Cache-Control': 'max-age=86400, public', 'Pragma': 'public'}
+ headers = {
+ 'Cache-Control': 'max-age=86400, public',
+ 'Pragma': 'public'
+ }
return web.FileResponse(img_file, headers=headers)
@routes.get('/api/artists/{item_id}/toptracks')
if not media_types_query or "radios" in media_types_query:
media_types.append(MediaType.Radio)
# get results from database
- result = await self.mass.music.search(searchquery, media_types, limit=limit, online=online)
+ result = await self.mass.music.search(searchquery,
+ media_types,
+ limit=limit,
+ online=online)
return web.json_response(result, dumps=json_serializer)
@routes.get('/api/players')
result = await player_cmd()
else:
return web.Response(text='invalid command', status=501)
- return web.json_response(result, dumps=json_serializer)
-
+ return web.json_response(result, dumps=json_serializer)
+
@routes.post('/api/players/{player_id}/play_media/{queue_opt}')
async def player_play_media(self, request):
""" issue player play_media command"""
queue_opt = request.match_info.get('queue_opt', 'play')
body = await request.json()
media_items = await self.__media_items_from_body(body)
- result = await self.mass.players.play_media(player_id, media_items, queue_opt)
+ result = await self.mass.players.play_media(player_id, media_items,
+ queue_opt)
return web.json_response(result, dumps=json_serializer)
-
+
@routes.get('/api/players/{player_id}/queue/items/{queue_item}')
async def player_queue_item(self, request):
""" return item (by index or queue item id) from the player's queue """
except ValueError:
queue_item = await player.queue.by_item_id(item_id)
return web.json_response(queue_item, dumps=json_serializer)
-
+
@routes.get('/api/players/{player_id}/queue/items')
async def player_queue_items(self, request):
""" return the items in the player's queue """
player_id = request.match_info.get('player_id')
player = await self.mass.players.get_player(player_id)
+
async def queue_tracks_iter():
for item in player.queue.items:
yield item
+
return await self.__stream_json(request, queue_tracks_iter())
-
+
@routes.get('/api/players/{player_id}/queue')
async def player_queue(self, request):
""" return the player queue details """
conf_key = request.match_info.get('key')
conf_subkey = request.match_info.get('subkey')
new_values = await request.json()
- LOGGER.debug(f'save config called for {conf_key}/{conf_subkey} - new value: {new_values}')
+ LOGGER.debug(
+ f'save config called for {conf_key}/{conf_subkey} - new value: {new_values}'
+ )
cur_values = self.mass.config[conf_key][conf_subkey]
- result = {"success": True, "restart_required": False, "settings_changed": False}
+ result = {
+ "success": True,
+ "restart_required": False,
+ "settings_changed": False
+ }
if cur_values != new_values:
# config changed
result["settings_changed"] = True
self.mass.config[conf_key][conf_subkey] = new_values
- if conf_key == "player_settings":
- # player settings don't require restart, force update of player
+ if conf_key == CONF_KEY_PLAYERSETTINGS:
+ # player settings: force update of player
self.mass.event_loop.create_task(
self.mass.players.trigger_update(conf_subkey))
+ elif conf_key == CONF_KEY_MUSICPROVIDERS:
+ # (re)load music provider modules
+ self.mass.event_loop.create_task(
+ self.mass.music.load_modules())
+ elif conf_key == CONF_KEY_PLAYERPROVIDERS:
+ # (re)load player provider modules
+ self.mass.event_loop.create_task(
+ self.mass.players.load_modules())
else:
- # TODO: allow some settings without restart ?
+ # other settings need restart
result["restart_required"] = True
self.mass.config.save()
return web.json_response(result)
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
+
# register callback for internal events
async def send_event(msg, msg_details):
- ws_msg = {"message": msg, "message_details": msg_details }
+ ws_msg = {"message": msg, "message_details": msg_details}
try:
await ws.send_json(ws_msg)
except (AssertionError, asyncio.CancelledError):
await self.mass.remove_event_listener(cb_id)
+
cb_id = await self.mass.add_event_listener(send_event)
# process incoming messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.ERROR:
LOGGER.debug('ws connection closed with exception %s' %
- ws.exception())
+ ws.exception())
elif msg.type != aiohttp.WSMsgType.TEXT:
LOGGER.warning(msg.data)
else:
data = msg.json()
# echo the websocket message on event bus
# can be picked up by other modules, e.g. the webplayer
- await self.mass.signal_event(data['message'], data['message_details'])
+ await self.mass.signal_event(data['message'],
+ data['message_details'])
except (Exception, AssertionError, asyncio.CancelledError) as exc:
LOGGER.warning("Websocket disconnected - %s" % str(exc))
finally:
else:
return web.Response(text='command not supported')
return web.Response(text='success')
-
+
async def __media_items_from_body(self, data):
"""Helper to turn posted body data into media items."""
if not isinstance(data, list):
data = [data]
media_items = []
for item in data:
- media_item = await self.mass.music.item(
- item['item_id'], item['media_type'], item['provider'], lazy=True)
+ media_item = await self.mass.music.item(item['item_id'],
+ item['media_type'],
+ item['provider'],
+ lazy=True)
media_items.append(media_item)
return media_items
-
+
async def __stream_json(self, request, iterator):
""" stream items from async iterator as json object """
resp = web.StreamResponse(status=200,