COPY . /tmp
RUN pip wheel uvloop cchardet aiodns brotlipy \
&& pip wheel -r /tmp/requirements.txt \
- # Include frontend-app in the source files
- && curl -L https://github.com/music-assistant/app/archive/master.tar.gz | tar xz \
- && mv app-master/build /tmp/music_assistant/web/static \
&& pip wheel /tmp
#### FINAL IMAGE
default=default_data_dir,
help="Directory that contains the MusicAssistant configuration",
)
+ parser.add_argument(
+ "-p",
+ "--port",
+ metavar="port",
+ default=8095,
+ help="TCP port on which the server should be run.",
+ )
parser.add_argument(
"--debug",
action="store_true",
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("aiosqlite").setLevel(logging.INFO)
- mass = MusicAssistant(data_dir, args.debug)
+ mass = MusicAssistant(data_dir, args.debug, int(args.port))
def on_shutdown(loop):
logger.info("shutdown requested!")
"""All constants for Music Assistant."""
-__version__ = "0.0.72"
+__version__ = "0.0.73"
REQUIRED_PYTHON_VER = "3.7"
# configuration keys/attributes
from music_assistant.constants import __version__ as app_version
from music_assistant.helpers.encryption import encrypt_string
from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.helpers.util import get_hostname
async def check_migrations(mass: MusicAssistantType):
mass.config.stored_config["jwt_key"] = encrypt_string(str(uuid.uuid4()))
if "initialized" not in mass.config.stored_config:
mass.config.stored_config["initialized"] = False
+ if "friendly_name" not in mass.config.stored_config:
+ mass.config.stored_config["friendly_name"] = get_hostname()
mass.config.save()
# create default db tables (if needed)
return json_response(data)
-def api_route(ws_cmd_path):
+def api_route(ws_cmd_path, ws_require_auth=True):
"""Decorate a function as websocket command."""
def decorate(func):
func.ws_cmd_path = ws_cmd_path
+ func.ws_require_auth = ws_require_auth
return func
return decorate
self._player_queues = {}
self._poll_ticks = 0
self._controls = {}
- # self.mass.add_event_listener(
- # self.__handle_websocket_player_control_event,
- # [
- # EVENT_REGISTER_PLAYER_CONTROL,
- # EVENT_UNREGISTER_PLAYER_CONTROL,
- # EVENT_PLAYER_CONTROL_UPDATED,
- # ],
- # )
async def async_setup(self):
"""Async initialize of module."""
async def async_trigger_player_update(self, player_id: str):
"""Trigger update of an existing player.."""
player = self.get_player(player_id)
- if player:
- await self._player_states[player.player_id].async_update(player)
+ player_state = self.get_player_state(player_id)
+ if player and player_state:
+ await player_state.async_update(player)
@api_route("players/controls/:control_id/register")
async def async_register_player_control(
continue
queue_item = QueueItem.from_track(track)
# generate uri for this queue item
- queue_item.uri = "%s/stream/queue/%s/%s" % (
- self.mass.web.url,
+ queue_item.uri = "%s/queue/%s/%s" % (
+ self.mass.web.stream_url,
player_id,
queue_item.queue_item_id,
)
"""
queue_item = QueueItem(item_id=uri, provider="uri", name=uri)
# generate uri for this queue item
- queue_item.uri = "%s/stream/%s/%s" % (
- self.mass.web.url,
+ queue_item.uri = "%s/%s/%s" % (
+ self.mass.web.stream_url,
player_id,
queue_item.queue_item_id,
)
player_state = self.get_player_state(player_id)
if not player_state:
return
- new_level = player_state.volume_level + 1
+ if player_state.volume_level <= 10 or player_state.volume_level >= 90:
+ step_size = 2
+ else:
+ step_size = 5
+ new_level = player_state.volume_level + step_size
if new_level > 100:
new_level = 100
return await self.async_cmd_volume_set(player_id, new_level)
player_state = self.get_player_state(player_id)
if not player_state:
return
- new_level = player_state.volume_level - 1
+ if player_state.volume_level <= 10 or player_state.volume_level >= 90:
+ step_size = 2
+ else:
+ step_size = 5
+ new_level = player_state.volume_level - step_size
if new_level < 0:
new_level = 0
return await self.async_cmd_volume_set(player_id, new_level)
)
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.util import create_tempfile, get_ip, try_parse_int
+from music_assistant.helpers.util import create_tempfile, get_ip
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
LOGGER = logging.getLogger("stream_manager")
output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
resample: Optional[int] = None,
gain_db_adjust: Optional[float] = None,
- chunk_size: int = 1024000,
+ chunk_size: int = 512000,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the sox manipulated audio data for the given streamdetails."""
# collect all args for sox
async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
- chunk_size = 512000
-
player_conf = self.mass.config.get_player_config(player_id)
sample_rate = player_conf.get(CONF_MAX_SAMPLE_RATE, 96000)
+ chunk_size = sample_rate * 2 * 10
args = [
"sox",
gain_correct = await self.mass.players.async_get_gain_correct(
player_id, streamdetails.item_id, streamdetails.provider
)
+ streamdetails.gain_correct = gain_correct
+
LOGGER.debug(
"Start Streaming queue track: %s (%s) for player %s",
queue_track.item_id,
gain_correct = await self.mass.players.async_get_gain_correct(
player_id, streamdetails.item_id, streamdetails.provider
)
+ streamdetails.gain_correct = gain_correct
+
# start streaming
LOGGER.debug("Start streaming %s (%s)", queue_item_id, queue_item.name)
async for _, audio_chunk in self.async_get_sox_stream(
if stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
- while True:
- chunk = await response.content.read(chunk_size)
- if not chunk:
- break
+ async for chunk, _ in response.content.iter_chunks():
yield chunk
if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
if needs_analyze and audio_data:
self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
- def __get_player_sox_options(
- self, player_id: str, streamdetails: StreamDetails
- ) -> str:
- """Get player specific sox effect options."""
- sox_options = []
- player_conf = self.mass.config.get_player_config(player_id)
- # volume normalisation
- gain_correct = self.mass.add_job(
- self.mass.players.async_get_gain_correct(
- player_id, streamdetails.item_id, streamdetails.provider
- )
- ).result()
- if gain_correct != 0:
- sox_options.append("vol %s dB " % gain_correct)
- # downsample if needed
- if player_conf["max_sample_rate"]:
- max_sample_rate = try_parse_int(player_conf["max_sample_rate"])
- if max_sample_rate < streamdetails.sample_rate:
- sox_options.append(f"rate -v {max_sample_rate}")
- if player_conf.get("sox_options"):
- sox_options.append(player_conf["sox_options"])
- return " ".join(sox_options)
-
def __analyze_audio(self, streamdetails, audio_data) -> None:
"""Analyze track audio, for now we only calculate EBU R128 loudness."""
item_key = "%s%s" % (streamdetails.item_id, streamdetails.provider)
from music_assistant.managers.players import PlayerManager
from music_assistant.managers.streams import StreamManager
from music_assistant.models.provider import Provider, ProviderType
-from music_assistant.web import WebServer
-from zeroconf import NonUniqueNameException, ServiceInfo, Zeroconf
+from music_assistant.web.server import WebServer
+from zeroconf import InterfaceChoice, NonUniqueNameException, ServiceInfo, Zeroconf
LOGGER = logging.getLogger("mass")
self._players = PlayerManager(self)
self._streams = StreamManager(self)
# shared zeroconf instance
- self.zeroconf = Zeroconf()
+ self.zeroconf = Zeroconf(interfaces=InterfaceChoice.All)
async def async_start(self):
"""Start running the music assistant server."""
await self._music.async_setup()
await self._players.async_setup()
await self.__async_preload_providers()
- await self.__async_setup_discovery()
+ await self.async_setup_discovery()
await self._web.async_setup()
await self._library.async_setup()
self.loop.create_task(self.__process_background_tasks())
await task
await asyncio.sleep(1)
- async def __async_setup_discovery(self) -> None:
+ async def async_setup_discovery(self) -> None:
"""Make this Music Assistant instance discoverable on the network."""
- zeroconf_type = "_music-assistant._tcp.local."
- discovery_info = await self.web.discovery_info()
- name = discovery_info["id"].lower()
- info = ServiceInfo(
- zeroconf_type,
- name=f"{name}.{zeroconf_type}",
- addresses=[get_ip_pton()],
- port=discovery_info["port"],
- properties=discovery_info,
- )
- LOGGER.debug("Starting Zeroconf broadcast...")
- try:
- self.zeroconf.register_service(info)
- except NonUniqueNameException:
- LOGGER.error(
- "Music Assistant instance with identical name present in the local network"
+
+ def setup_discovery():
+ zeroconf_type = "_music-assistant._tcp.local."
+
+ info = ServiceInfo(
+ zeroconf_type,
+ name=f"{self.web.server_id}.{zeroconf_type}",
+ addresses=[get_ip_pton()],
+ port=self.web.port,
+ properties=self.web.discovery_info,
+ server="musicassistant.local.",
)
+ LOGGER.debug("Starting Zeroconf broadcast...")
+ try:
+ existing = getattr(self, "mass_zc_service_set", None)
+ if existing:
+ self.zeroconf.update_service(info)
+ else:
+ self.zeroconf.register_service(info)
+ setattr(self, "mass_zc_service_set", True)
+ except NonUniqueNameException:
+ LOGGER.error(
+ "Music Assistant instance with identical name present in the local network!"
+ )
+
+ self.add_job(setup_discovery)
async def __async_preload_providers(self):
"""Dynamically load all providermodules."""
def get_stream_url(self) -> str:
"""Return the full stream url for the player's Queue Stream."""
- uri = f"{self.mass.web.url}/stream/queue/{self.queue_id}"
+ uri = f"{self.mass.web.stream_url}/queue/{self.queue_id}"
# we set the checksum just to invalidate cache stuf
uri += f"?checksum={time.time()}"
return uri
ATTR_STATE: self.state.value,
ATTR_AVAILABLE: self.available,
ATTR_CURRENT_URI: self.current_uri,
- ATTR_VOLUME_LEVEL: self.volume_level,
+ ATTR_VOLUME_LEVEL: int(self.volume_level),
ATTR_MUTED: self.muted,
ATTR_IS_GROUP_PLAYER: self.is_group_player,
ATTR_GROUP_CHILDS: self.group_childs,
OGG = "ogg"
FLAC = "flac"
MP3 = "mp3"
- RAW = "raw"
AAC = "aac"
MPEG = "mpeg"
player_id: str = ""
details: Any = None
seconds_played: int = 0
- sox_options: str = None
+ gain_correct: float = 0
def to_dict(
self,
"content_type": self.content_type.value,
"sample_rate": self.sample_rate,
"bit_depth": self.bit_depth,
- "sox_options": self.sox_options,
+ "gain_correct": self.gain_correct,
"seconds_played": self.seconds_played,
}
self._available = False
self._status_listener: Optional[CastStatusListener] = None
self._is_speaker_group = False
- self._throttler = Throttler(rate_limit=1, period=0.25)
+ self._throttler = Throttler(rate_limit=1, period=0.1)
@property
def player_id(self) -> str:
@property
def volume_level(self) -> int:
"""Return volume_level of this player."""
- return int(self.cast_status.volume_level * 100 if self.cast_status else 0)
+ return self.cast_status.volume_level * 100 if self.cast_status else 0
@property
def muted(self) -> bool:
await self.async_get_token()
spotty = self.get_spotty_binary()
spotty_exec = (
- '%s -n temp -c "%s" --pass-through --single-track spotify://track:%s'
+ '%s -n temp -c "%s" -b 320 --pass-through --single-track spotify://track:%s'
% (
spotty,
self.mass.config.data_path,
params["market"] = "from_token"
params["country"] = "from_token"
token = await self.async_get_token()
+ if not token:
+ return None
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
async with self._throttler:
async with self.mass.http_session.get(
params = {}
url = "https://api.spotify.com/v1/%s" % endpoint
token = await self.async_get_token()
+ if not token:
+ return None
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
async with self.mass.http_session.delete(
url, headers=headers, params=params, json=data, verify_ssl=False
params = {}
url = "https://api.spotify.com/v1/%s" % endpoint
token = await self.async_get_token()
+ if not token:
+ return None
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
async with self.mass.http_session.put(
url, headers=headers, params=params, json=data, verify_ssl=False
params = {}
url = "https://api.spotify.com/v1/%s" % endpoint
token = await self.async_get_token()
+ if not token:
+ return None
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
async with self.mass.http_session.post(
url, headers=headers, params=params, json=data, verify_ssl=False
for child_player_id in self.group_childs:
child_player = self.mass.players.get_player(child_player_id)
if child_player:
- queue_stream_uri = f"{self.mass.web.url}/stream/group/{self.player_id}?player_id={child_player_id}"
+ queue_stream_uri = f"{self.mass.web.stream_url}/group/{self.player_id}?player_id={child_player_id}"
await child_player.async_cmd_play_uri(queue_stream_uri)
self.update_state()
self.stream_task = self.mass.add_job(self.async_queue_stream_task())
-"""
-The web module handles serving the frontend and the rest/websocket api's.
-
-API is available with both HTTP json rest endpoints AND WebSockets.
-All MusicAssistant clients communicate with the websockets api.
-For now, we do not yet support SSL/HTTPS directly, to prevent messing with certificates etc.
-The server is intended to be used locally only and not exposed outside.
-Users may use reverse proxy etc. to add ssl themselves.
-"""
-import asyncio
-import datetime
-import logging
-import os
-import uuid
-from base64 import b64encode
-from typing import Any, Awaitable, Optional, Union
-
-import aiohttp_cors
-import jwt
-import ujson
-from aiohttp import web
-from aiohttp.web_request import Request
-from aiohttp_jwt import JWTMiddleware, login_required
-from music_assistant.constants import (
- CONF_KEY_SECURITY,
- CONF_KEY_SECURITY_APP_TOKENS,
- CONF_KEY_SECURITY_LOGIN,
- CONF_PASSWORD,
- CONF_USERNAME,
-)
-from music_assistant.constants import __version__ as MASS_VERSION
-from music_assistant.helpers import repath
-from music_assistant.helpers.encryption import decrypt_string
-from music_assistant.helpers.images import async_get_image_url, async_get_thumb_file
-from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.util import get_hostname, get_ip
-from music_assistant.helpers.web import (
- api_route,
- async_json_response,
- json_serializer,
- parse_arguments,
-)
-from music_assistant.models.media_types import ItemMapping, MediaItem
-
-from .json_rpc import json_rpc_endpoint
-from .streams import routes as stream_routes
-from .websocket import WebSocketHandler
-
-LOGGER = logging.getLogger("webserver")
-
-
-class WebServer:
- """Webserver and json/websocket api."""
-
- def __init__(self, mass: MusicAssistantType, port: int):
- """Initialize class."""
- self.jwt_key = None
- self.app = None
- self.mass = mass
- self._port = port
- # load/create/update config
- self._host = get_ip()
- self.config = mass.config.base["web"]
- self._runner = None
- self.api_routes = {}
-
- async def async_setup(self):
- """Perform async setup."""
- self.jwt_key = decrypt_string(self.mass.config.stored_config["jwt_key"])
- jwt_middleware = JWTMiddleware(
- self.jwt_key,
- request_property="user",
- credentials_required=False,
- is_revoked=self.is_token_revoked,
- )
- self.app = web.Application(middlewares=[jwt_middleware])
- self.app["mass"] = self.mass
- self.app["websockets"] = []
- # add all routes routes
- self.app.add_routes(stream_routes)
- if not self.mass.config.stored_config["initialized"]:
- self.app.router.add_post("/setup", self.setup)
- self.app.router.add_post("/login", self.login)
- self.app.router.add_get("/jsonrpc.js", json_rpc_endpoint)
- self.app.router.add_post("/jsonrpc.js", json_rpc_endpoint)
- self.app.router.add_get("/ws", WebSocketHandler)
- self.app.router.add_get("/", self.index)
- self.app.router.add_put("/api/library/{tail:.*}/add", self.handle_api_request)
- self.app.router.add_delete(
- "/api/library/{tail:.*}/remove", self.handle_api_request
- )
- self.app.router.add_put(
- "/api/players/{tail:.*}/play_media", self.handle_api_request
- )
- self.app.router.add_put(
- "/api/players/{tail:.*}/play_uri", self.handle_api_request
- )
- # catch-all for all api routes is handled by our special method
- self.app.router.add_get("/api/{tail:.*}", self.handle_api_request)
-
- # register all methods decorated as api_route
- for cls in [
- self,
- self.mass.music,
- self.mass.players,
- self.mass.config,
- self.mass.library,
- ]:
- self.register_api_routes(cls)
-
- webdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static/")
- if os.path.isdir(webdir):
- self.app.router.add_static("/", webdir, append_version=True)
- else:
- # The (minified) build of the frontend(app) is included in the pypi releases
- LOGGER.warning("Loaded without frontend support.")
-
- # Add CORS support to all routes
- cors = aiohttp_cors.setup(
- self.app,
- defaults={
- "*": aiohttp_cors.ResourceOptions(
- allow_credentials=True,
- allow_headers="*",
- )
- },
- )
- for route in list(self.app.router.routes()):
- cors.add(route)
-
- # set custom server header
- async def on_prepare(request, response):
- response.headers[
- "Server"
- ] = f'MusicAssistant/{MASS_VERSION} {response.headers["Server"]}'
-
- self.app.on_response_prepare.append(on_prepare)
- self._runner = web.AppRunner(self.app, access_log=None)
- await self._runner.setup()
- # set host to None to bind to all addresses on both IPv4 and IPv6
- http_site = web.TCPSite(self._runner, host=None, port=self.port)
- await http_site.start()
- LOGGER.info("Started Music Assistant server on %s", self.url)
- self.mass.add_event_listener(self.__async_handle_mass_events)
-
- async def async_stop(self):
- """Stop the webserver."""
- for ws_client in self.app["websockets"]:
- await ws_client.close("server shutdown")
-
- def register_api_route(self, cmd: str, func: Awaitable):
- """Register a command(handler) to the websocket api."""
- pattern = repath.pattern(cmd)
- self.api_routes[pattern] = func
-
- def register_api_routes(self, cls: Any):
- """Register all methods of a class (instance) that are decorated with api_route."""
- for item in dir(cls):
- func = getattr(cls, item)
- if not hasattr(func, "ws_cmd_path"):
- continue
- # method is decorated with our websocket decorator
- self.register_api_route(func.ws_cmd_path, func)
-
- @property
- def host(self):
- """Return the local IP address/host for this Music Assistant instance."""
- return self._host
-
- @property
- def port(self):
- """Return the port for this Music Assistant instance."""
- return self._port
-
- @property
- def url(self):
- """Return the URL for this Music Assistant instance."""
- return f"http://{self.host}:{self.port}"
-
- @property
- def server_id(self):
- """Return the device ID for this Music Assistant Server."""
- return self.mass.config.stored_config["server_id"]
-
- @api_route("info")
- async def discovery_info(self):
- """Return (discovery) info about this instance."""
- return {
- "id": self.server_id,
- "url": self.url,
- "host": self.host,
- "port": self.port,
- "version": MASS_VERSION,
- "friendly_name": get_hostname(),
- "initialized": self.mass.config.stored_config["initialized"],
- }
-
- async def login(self, request: Request):
- """Handle user login by form/json post. Will issue JWT token."""
- form = await request.post()
- try:
- username = form["username"]
- password = form["password"]
- app_id = form.get("app_id")
- except KeyError:
- data = await request.json()
- username = data["username"]
- password = data["password"]
- app_id = data.get("app_id")
- token_info = await self.get_token(username, password, app_id)
- if token_info:
- return web.Response(
- body=json_serializer(token_info), content_type="application/json"
- )
- return web.HTTPUnauthorized(body="Invalid username and/or password provided!")
-
- async def get_token(self, username: str, password: str, app_id: str = "") -> dict:
- """
- Validate given credentials and return JWT token.
-
- If app_id is provided, a long lived token will be issued which can be withdrawn by the user.
- """
- verified = self.mass.config.security.validate_credentials(username, password)
- if verified:
- client_id = str(uuid.uuid4())
- token_info = {
- "username": username,
- "server_id": self.server_id,
- "client_id": client_id,
- "app_id": app_id,
- }
- if app_id:
- token_info["exp"] = datetime.datetime.utcnow() + datetime.timedelta(
- days=365 * 10
- )
- else:
- token_info["exp"] = datetime.datetime.utcnow() + datetime.timedelta(
- hours=8
- )
- token = jwt.encode(token_info, self.jwt_key).decode()
- if app_id:
- self.mass.config.stored_config[CONF_KEY_SECURITY][
- CONF_KEY_SECURITY_APP_TOKENS
- ][client_id] = token_info
- self.mass.config.save()
- token_info["token"] = token
- return token_info
- return None
-
- async def setup(self, request: Request):
- """Handle first-time server setup through onboarding wizard."""
- if self.mass.config.stored_config["initialized"]:
- return web.HTTPUnauthorized()
- form = await request.post()
- username = form["username"]
- password = form["password"]
- # save credentials in config
- self.mass.config.security[CONF_KEY_SECURITY_LOGIN][CONF_USERNAME] = username
- self.mass.config.security[CONF_KEY_SECURITY_LOGIN][CONF_PASSWORD] = password
- self.mass.config.stored_config["initialized"] = True
- self.mass.config.save()
- return web.Response(status=200)
-
- @login_required
- async def handle_api_request(self, request: Request):
- """Handle API route/command."""
- api_path = request.path.replace("/api/", "")
- LOGGER.debug("Handling %s - %s", api_path, request.get("user"))
- try:
- # TODO: parse mediaitems from body if needed
- data = await request.json(loads=ujson.loads)
- except Exception: # pylint: disable=broad-except
- data = {}
- # work out handler for the given path/command
- for key in self.api_routes:
- match = repath.match(key, api_path)
- if match:
- try:
- params = match.groupdict()
- handler = self.mass.web.api_routes[key]
- params = parse_arguments(handler, {**params, **data})
- res = handler(**params)
- if asyncio.iscoroutine(res):
- res = await res
- # return result of command to client
- return await async_json_response(res)
- except Exception as exc: # pylint: disable=broad-except
- return web.Response(status=500, text=str(exc))
- return web.Response(status=404)
-
- async def index(self, request: web.Request):
- """Get the index page, redirect if we do not have a web directory."""
- # pylint: disable=unused-argument
- if not self.mass.config.stored_config["initialized"]:
- return web.FileResponse(
- os.path.join(os.path.dirname(os.path.abspath(__file__)), "setup.html")
- )
- html_app = os.path.join(
- os.path.dirname(os.path.abspath(__file__)), "static/index.html"
- )
- if not os.path.isfile(html_app):
- raise web.HTTPFound("https://music-assistant.github.io/app")
- return web.FileResponse(html_app)
-
- async def __async_handle_mass_events(self, event, event_data):
- """Broadcast events to connected websocket clients."""
- for ws_client in self.app["websockets"]:
- if not ws_client.authenticated:
- continue
- try:
- await ws_client.send(event=event, data=event_data)
- except ConnectionResetError:
- # connection lost to this client, cleanup
- await ws_client.close()
- except Exception as exc: # pylint: disable=broad-except
- # log all other errors but continue sending to all other clients
- LOGGER.exception(exc)
-
- @api_route("images/thumb")
- async def async_get_image_thumb(
- self,
- size: int,
- url: Optional[str] = "",
- item: Union[None, ItemMapping, MediaItem] = None,
- ):
- """Get (resized) thumb image for given URL or media item as base64 encoded string."""
- if not url and item:
- url = await async_get_image_url(
- self.mass, item.item_id, item.provider, item.media_type
- )
- if url:
- img_file = await async_get_thumb_file(self.mass, url, size)
- if img_file:
- with open(img_file, "rb") as _file:
- icon_data = _file.read()
- icon_data = b64encode(icon_data)
- return "data:image/png;base64," + icon_data.decode()
- raise KeyError("Invalid item or url")
-
- @api_route("images/provider-icons/:provider_id?")
- async def async_get_provider_icon(self, provider_id: Optional[str]):
- """Get Provider icon as base64 encoded string."""
- if not provider_id:
- return {
- prov.id: await self.async_get_provider_icon(prov.id)
- for prov in self.mass.get_providers(include_unavailable=True)
- }
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- icon_path = os.path.join(base_dir, "providers", provider_id, "icon.png")
- if os.path.isfile(icon_path):
- with open(icon_path, "rb") as _file:
- icon_data = _file.read()
- icon_data = b64encode(icon_data)
- return "data:image/png;base64," + icon_data.decode()
- raise KeyError("Invalid provider: %s" % provider_id)
-
- def is_token_revoked(self, request: Request, token_info: dict):
- """Return bool is token is revoked."""
- return self.mass.config.security.is_token_revoked(token_info)
+"""Webserver and API handlers/logic."""
-"""JSON RPC API endpoint."""
+"""JSON RPC API endpoint (mostly) compatible with LMS."""
from aiohttp.web import Request, Response
from music_assistant.helpers.web import require_local_subnet
--- /dev/null
+"""
+The web module handles serving the custom websocket api on a custom port (default is 8095).
+
+All MusicAssistant clients communicate locally with this websockets api.
+The server is intended to be used locally only and not exposed outside,
+so it is HTTP only. Secure remote connections will be offered by a remote connect broker.
+"""
+
+import asyncio
+import datetime
+import logging
+import os
+import uuid
+from base64 import b64encode
+from typing import Any, Awaitable, Optional, Union
+
+import aiohttp_cors
+import jwt
+import ujson
+from aiohttp import WSMsgType, web
+from aiohttp.web import WebSocketResponse
+from music_assistant.constants import (
+ CONF_KEY_SECURITY,
+ CONF_KEY_SECURITY_APP_TOKENS,
+ CONF_KEY_SECURITY_LOGIN,
+ CONF_PASSWORD,
+ CONF_USERNAME,
+)
+from music_assistant.constants import __version__ as MASS_VERSION
+from music_assistant.helpers import repath
+from music_assistant.helpers.encryption import decrypt_string
+from music_assistant.helpers.images import async_get_image_url, async_get_thumb_file
+from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.helpers.util import get_hostname, get_ip
+from music_assistant.helpers.web import api_route, json_serializer, parse_arguments
+from music_assistant.models.media_types import ItemMapping, MediaItem
+
+from .json_rpc import json_rpc_endpoint
+from .streams import routes as stream_routes
+
+LOGGER = logging.getLogger("webserver")
+
+
+class WebServer:
+ """Webserver and json/websocket api."""
+
+ def __init__(self, mass: MusicAssistantType, port: int):
+ """Initialize class."""
+ self.jwt_key = None
+ self.app = None
+ self.mass = mass
+ self._port = port
+ # load/create/update config
+ self._hostname = get_hostname().lower()
+ self._ip_address = get_ip()
+ self.config = mass.config.base["web"]
+ self._runner = None
+ self.api_routes = {}
+ self._discovered_servers = []
+
+ async def async_setup(self):
+ """Perform async setup."""
+ self.jwt_key = decrypt_string(self.mass.config.stored_config["jwt_key"])
+ self.app = web.Application()
+ self.app["mass"] = self.mass
+ self.app["clients"] = []
+ # add all routes
+ self.app.add_routes(stream_routes)
+ self.app.router.add_route("*", "/jsonrpc.js", json_rpc_endpoint)
+ self.app.router.add_get("/ws", self.__async_websocket_handler)
+
+ # register all methods decorated as api_route
+ for cls in [
+ self,
+ self.mass.music,
+ self.mass.players,
+ self.mass.config,
+ self.mass.library,
+ ]:
+ self.register_api_routes(cls)
+
+ # Add server discovery on root/index including CORS support
+ cors = aiohttp_cors.setup(
+ self.app,
+ defaults={
+ "*": aiohttp_cors.ResourceOptions(
+ allow_credentials=True,
+ allow_headers="*",
+ )
+ },
+ )
+ cors.add(self.app.router.add_get("/", self.async_info))
+
+ self._runner = web.AppRunner(self.app, access_log=None)
+ await self._runner.setup()
+ # set host to None to bind to all addresses on both IPv4 and IPv6
+ http_site = web.TCPSite(self._runner, host=None, port=self.port)
+ await http_site.start()
+ LOGGER.info("Started Music Assistant server on port %s", self.port)
+ self.mass.add_event_listener(self.__async_handle_mass_events)
+
+ async def async_stop(self):
+ """Stop the webserver."""
+ for ws_client in self.app["clients"]:
+ await ws_client.close(message=b"server shutdown")
+
+ def register_api_route(self, cmd: str, func: Awaitable):
+ """Register a command(handler) to the websocket api."""
+ pattern = repath.pattern(cmd)
+ self.api_routes[pattern] = func
+
+ def register_api_routes(self, cls: Any):
+ """Register all methods of a class (instance) that are decorated with api_route."""
+ for item in dir(cls):
+ func = getattr(cls, item)
+ if not hasattr(func, "ws_cmd_path"):
+ continue
+ # method is decorated with our websocket decorator
+ self.register_api_route(func.ws_cmd_path, func)
+
+ @property
+ def hostname(self):
+ """Return the hostname for this Music Assistant instance."""
+ return self._hostname
+
+ @property
+ def ip_address(self):
+ """Return the local IP(v4) address for this Music Assistant instance."""
+ return self._ip_address
+
+ @property
+ def port(self):
+ """Return the port for this Music Assistant instance."""
+ return self._port
+
+ @property
+ def stream_url(self):
+ """Return the base stream URL for this Music Assistant instance."""
+ # dns resolving often fails on stream devices so use IP-address
+ return f"http://{self.ip_address}:{self.port}/stream"
+
+ @property
+ def address(self):
+ """Return the API connect address for this Music Assistant instance."""
+ return f"ws://{self.hostname}:{self.port}/ws"
+
+ @property
+ def server_id(self):
+ """Return the device ID for this Music Assistant Server."""
+ return self.mass.config.stored_config["server_id"]
+
+ @property
+ def discovery_info(self):
+ """Return discovery info for this Music Assistant server."""
+ return {
+ "id": self.server_id,
+ "address": self.address,
+ "hostname": self.hostname,
+ "ip_address": self.ip_address,
+ "port": self.port,
+ "version": MASS_VERSION,
+ "friendly_name": self.mass.config.stored_config["friendly_name"],
+ "initialized": self.mass.config.stored_config["initialized"],
+ }
+
+ @api_route("info")
+ async def async_info(self, request: web.Request = None):
+ """Return discovery info on index page."""
+ if request:
+ return web.json_response(self.discovery_info)
+ return self.discovery_info
+
+ @api_route("get_token", False)
+ async def async_get_token(
+ self, username: str, password: str, app_id: str = ""
+ ) -> dict:
+ """
+ Validate given credentials and return JWT token.
+
+ If app_id is provided, a long lived token will be issued which can be withdrawn by the user.
+ """
+ verified = self.mass.config.security.validate_credentials(username, password)
+ if verified:
+ client_id = str(uuid.uuid4())
+ token_info = {
+ "username": username,
+ "server_id": self.server_id,
+ "client_id": client_id,
+ "app_id": app_id,
+ }
+ if app_id:
+ token_info["exp"] = datetime.datetime.utcnow() + datetime.timedelta(
+ days=365 * 10
+ )
+ else:
+ token_info["exp"] = datetime.datetime.utcnow() + datetime.timedelta(
+ hours=8
+ )
+ token = jwt.encode(token_info, self.jwt_key).decode()
+ if app_id:
+ self.mass.config.stored_config[CONF_KEY_SECURITY][
+ CONF_KEY_SECURITY_APP_TOKENS
+ ][client_id] = token_info
+ self.mass.config.save()
+ token_info["token"] = token
+ return token_info
+ raise AuthenticationError("Invalid credentials")
+
+ @api_route("setup", False)
+ async def async_create_user_setup(self, username: str, password: str):
+ """Handle first-time server setup through onboarding wizard."""
+ if self.mass.config.stored_config["initialized"]:
+ raise AuthenticationError("Already initialized")
+ # save credentials in config
+ self.mass.config.security[CONF_KEY_SECURITY_LOGIN][CONF_USERNAME] = username
+ self.mass.config.security[CONF_KEY_SECURITY_LOGIN][CONF_PASSWORD] = password
+ self.mass.config.stored_config["initialized"] = True
+ self.mass.config.save()
+ # fix discovery info
+ await self.mass.async_setup_discovery()
+ for item in self._discovered_servers:
+ if item["id"] == self.server_id:
+ item["initialized"] = True
+ return True
+
+ @api_route("images/thumb")
+ async def async_get_image_thumb(
+ self,
+ size: int,
+ url: Optional[str] = "",
+ item: Union[None, ItemMapping, MediaItem] = None,
+ ):
+ """Get (resized) thumb image for given URL or media item as base64 encoded string."""
+ if not url and item:
+ url = await async_get_image_url(
+ self.mass, item.item_id, item.provider, item.media_type
+ )
+ if url:
+ img_file = await async_get_thumb_file(self.mass, url, size)
+ if img_file:
+ with open(img_file, "rb") as _file:
+ icon_data = _file.read()
+ icon_data = b64encode(icon_data)
+ return "data:image/png;base64," + icon_data.decode()
+ raise KeyError("Invalid item or url")
+
+ @api_route("images/provider-icons/:provider_id?")
+ async def async_get_provider_icon(self, provider_id: Optional[str]):
+ """Get Provider icon as base64 encoded string."""
+ if not provider_id:
+ return {
+ prov.id: await self.async_get_provider_icon(prov.id)
+ for prov in self.mass.get_providers(include_unavailable=True)
+ }
+ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ icon_path = os.path.join(base_dir, "providers", provider_id, "icon.png")
+ if os.path.isfile(icon_path):
+ with open(icon_path, "rb") as _file:
+ icon_data = _file.read()
+ icon_data = b64encode(icon_data)
+ return "data:image/png;base64," + icon_data.decode()
+ raise KeyError("Invalid provider: %s" % provider_id)
+
+ async def __async_websocket_handler(self, request: web.Request):
+ """Handle websocket client."""
+
+ ws_client = WebSocketResponse()
+ ws_client.authenticated = False
+ await ws_client.prepare(request)
+ request.app["clients"].append(ws_client)
+
+ try:
+ # handle incoming messages
+ async for msg in ws_client:
+ if msg.type == WSMsgType.error:
+ LOGGER.warning(
+ "ws connection closed with exception %s", ws_client.exception()
+ )
+ if msg.type != WSMsgType.text:
+ continue
+ if msg.data == "close":
+ await ws_client.close()
+ break
+ # regular message
+ json_msg = msg.json(loads=ujson.loads)
+ if "command" in json_msg and "data" in json_msg:
+ # handle command
+ await self.__async_handle_command(
+ ws_client,
+ json_msg["command"],
+ json_msg["data"],
+ json_msg.get("id"),
+ )
+ elif "event" in json_msg:
+ # handle event
+ await self.__async_handle_event(
+ ws_client, json_msg["event"], json_msg.get("data")
+ )
+ except Exception as exc: # pylint:disable=broad-except
+ # log the error and disconnect client
+ await self.__async_send_json(ws_client, error=str(exc), **json_msg)
+ await ws_client.close(message=str(exc).encode())
+ LOGGER.debug("Error with WS client", exc_info=exc)
+
+ # websocket disconnected
+ request.app["clients"].remove(ws_client)
+ LOGGER.debug("websocket connection closed: %s", request.remote)
+
+ return ws_client
+
+ async def __async_handle_command(
+ self,
+ ws_client: WebSocketResponse,
+ command: str,
+ data: Optional[dict],
+ msg_id: Any = None,
+ ):
+ """Handle websocket command."""
+ res = None
+ if command == "auth":
+ return await self.__async_handle_auth(ws_client, data)
+ # work out handler for the given path/command
+ for key in self.api_routes:
+ match = repath.match(key, command)
+ if match:
+ params = match.groupdict()
+ handler = self.api_routes[key]
+ # check authentication
+ if (
+ getattr(handler, "ws_require_auth", True)
+ and not ws_client.authenticated
+ ):
+ raise AuthenticationError("Not authenticated")
+ if not data:
+ data = {}
+ params = parse_arguments(handler, {**params, **data})
+ res = handler(**params)
+ if asyncio.iscoroutine(res):
+ res = await res
+ # return result of command to client
+ return await self.__async_send_json(
+ ws_client, id=msg_id, result=command, data=res
+ )
+ raise KeyError("Unknown command")
+
+ async def __async_handle_event(
+ self, ws_client: WebSocketResponse, event: str, data: Any
+ ):
+ """Handle event message from ws client."""
+ LOGGER.info("received event %s", event)
+ if ws_client.authenticated:
+ self.mass.signal_event(event, data)
+
+ async def __async_handle_auth(self, ws_client: WebSocketResponse, token: str):
+ """Handle authentication with JWT token."""
+ token_info = jwt.decode(token, self.mass.web.jwt_key)
+ if self.mass.config.security.is_token_revoked(token_info):
+ raise AuthenticationError("Token is revoked")
+ ws_client.authenticated = True
+ # TODO: store token/app_id on ws_client obj and periodiclaly check if token is expired or revoked
+ await self.__async_send_json(ws_client, result="auth", data=token_info)
+
+ async def __async_send_json(self, ws_client: WebSocketResponse, **kwargs):
+ """Send message (back) to websocket client."""
+ await ws_client.send_str(json_serializer(kwargs))
+
+ async def __async_handle_mass_events(self, event: str, event_data: Any):
+ """Broadcast events to connected clients."""
+ for ws_client in self.app["clients"]:
+ if not ws_client.authenticated:
+ continue
+ try:
+ await self.__async_send_json(ws_client, event=event, data=event_data)
+ except ConnectionResetError:
+ # client is already disconnected
+ self.app["clients"].remove(ws_client)
+ except Exception as exc: # pylint: disable=broad-except
+ # log errors and continue sending to all other clients
+ LOGGER.debug("Error while sending message to api client", exc_info=exc)
+
+
+class AuthenticationError(Exception):
+ """Custom Exception for all authentication errors."""
+++ /dev/null
-<!DOCTYPE html>
-<html>
-<head>
- <link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700,900" rel="stylesheet">
- <link href="https://cdn.jsdelivr.net/npm/@mdi/font@4.x/css/materialdesignicons.min.css" rel="stylesheet">
- <link href="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.min.css" rel="stylesheet">
- <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no, minimal-ui">
-</head>
-<body>
- <div id="app">
- <v-app>
- <v-main>
- <v-container fluid fill-height>
- <v-layout align-center justify-center>
- <v-flex sm8>
- <v-card class="elevation-12">
- <v-toolbar dark color="black">
- <v-toolbar-title>Setup MusicAssistant server</v-toolbar-title>
- <v-spacer></v-spacer>
- </v-toolbar>
- <v-card-text>
- <span>In order to use the MusicAssistant server, you must setup a username and password to protect the server.</span>
- <span>When you click submit, the server will be setup and you can login with the created credentials.</span>
- <br /><br />
- <v-form ref="form" v-model="valid" method="post">
- <v-text-field
- @keyup.enter="submit"
- v-model="username"
- prepend-icon="mdi-account"
- name="login"
- label="Username"
- type="text"
- required
- :rules="[v => !!v || 'Username is required']"
- ></v-text-field>
- <v-text-field
- @keyup.enter="submit"
- v-model="password1"
- prepend-icon="mdi-lock"
- name="password1"
- label="Password"
- id="password1"
- type="password"
- required
- :rules="[v => v.length >= 8 || 'Password must have minimum of 8 characters']"
- ></v-text-field>
- <v-text-field
- @keyup.enter="submit"
- v-model="password2"
- prepend-icon="mdi-lock"
- name="password2"
- label="Repeat password"
- id="password"
- type="password"
- required
- :rules="[
- password1 === password2 || 'Passwords do not match'
- ]"
- ></v-text-field>
- </v-form>
- <v-alert type="success" v-if="success"
- >Useraccount created and server is ready. You will be redirected to the webinterface.
- Login with your newly created credentials and configure the other aspects of the server to get going!
- </v-alert>
- </v-card-text>
- <v-card-actions>
- <v-spacer></v-spacer>
- <v-btn type="submit" :disabled="!valid" color="success" @click="submit"
- class="mr-4">Submit</v-btn>
- </v-card-actions>
- </v-card>
- </v-flex>
- </v-layout>
- </v-container>
- </v-main>
- </v-app>
- </div>
-
- <script src="https://cdn.jsdelivr.net/npm/vue@2.x/dist/vue.js"></script>
- <script src="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.js"></script>
- <script src="https://unpkg.com/axios/dist/axios.min.js"></script>
- <script>
- new Vue({
- el: '#app',
- vuetify: new Vuetify(),
- data: () => ({
- username: '',
- password1: '',
- password2: '',
- valid: true,
- success: false
- }),
- methods: {
- submit () {
- const formData = new FormData()
- formData.append('username', this.username)
- formData.append('password', this.password1)
- axios.post('/setup', formData)
- .then(function (response) {
- this.success = true
- // refresh page, server will redirect to webapp
- setTimeout(function() {
- document.location.reload()
- }, 5000);
- }.bind(this))
- .catch(function (error) {
- console.log('error', error)
- });
- }
- }
- })
- </script>
-</body>
-</html>
\ No newline at end of file
resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": f"audio/{content_type}"}
)
-
- resp.enable_chunked_encoding()
- resp.enable_compression()
await resp.prepare(request)
# stream track
resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
- resp.enable_chunked_encoding()
- resp.enable_compression()
await resp.prepare(request)
# stream queue
resp = StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/flac"}
)
- resp.enable_chunked_encoding()
- resp.enable_compression()
await resp.prepare(request)
async for audio_chunk in request.app["mass"].streams.async_stream_queue_item(
+++ /dev/null
-"""Websocket API endpoint."""
-import asyncio
-import logging
-from typing import Any, Optional
-
-import jwt
-import ujson
-from aiohttp import WSMsgType
-from aiohttp.web import View, WebSocketResponse
-from music_assistant.helpers import repath
-from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.web import json_serializer, parse_arguments
-
-LOGGER = logging.getLogger("web.endpoints.websocket")
-
-
-class WebSocketHandler(View):
- """Handler for websockets API."""
-
- authenticated = False
- _ws = None
- mass: MusicAssistantType = None
-
- def __init__(self, *args, **kwargs):
- """Initialize."""
- super().__init__(*args, **kwargs)
- self.mass: MusicAssistantType = self.request.app["mass"]
-
- async def get(self):
- """Handle main ws entrypoint."""
- websocket = WebSocketResponse()
- await websocket.prepare(self.request)
-
- self.request.app["websockets"].append(self)
- self._ws = websocket
-
- LOGGER.debug("new client connected: %s", self.request.remote)
-
- async for msg in websocket:
- if msg.type == WSMsgType.text:
- if msg.data == "close":
- await websocket.close()
- break
- try:
- json_msg = msg.json(loads=ujson.loads)
- if "command" in json_msg and "data" in json_msg:
- # handle command
- await self.handle_command(
- json_msg["command"],
- json_msg["data"],
- json_msg.get("id"),
- )
- elif "event" in json_msg:
- # handle event
- await self.handle_event(json_msg["event"], json_msg.get("data"))
- else:
- raise KeyError
- except (KeyError, ValueError):
- await self.send(
- error='commands must be issued in json format \
- {"command": "command", "data":" optional data"}',
- )
- elif msg.type == WSMsgType.error:
- LOGGER.warning(
- "ws connection closed with exception %s", websocket.exception()
- )
-
- # websocket disconnected
- await self.close()
- return websocket
-
- async def send(self, **kwargs):
- """Send message (back) to websocket client."""
- ws_msg = kwargs
- await self._ws.send_str(json_serializer(ws_msg))
-
- async def close(self, reason=""):
- """Close websocket connection."""
- try:
- await self._ws.close(message=reason.encode())
- except Exception: # pylint: disable=broad-except
- pass
- try:
- self.request.app["websockets"].remove(self)
- except Exception: # pylint: disable=broad-except
- pass
- LOGGER.debug("websocket connection closed: %s", self.request.remote)
-
- async def handle_command(self, command: str, data: Optional[dict], id: Any = None):
- """Handle websocket command."""
- res = None
- try:
- if command == "auth":
- res = await self.auth(data)
- return await self.send(id=id, result=command, data=res)
- if command == "get_token":
- res = await self.mass.web.get_token(**data)
- if not res:
- raise Exception("Invalid credentials")
- return await self.send(id=id, result=command, data=res)
- if not self.authenticated:
- return await self.send(
- id=id,
- result=command,
- error="Not authenticated, please login first.",
- )
- # work out handler for the given path/command
- for key in self.mass.web.api_routes:
- match = repath.match(key, command)
- if match:
- params = match.groupdict()
- handler = self.mass.web.api_routes[key]
- if not data:
- data = {}
- params = parse_arguments(handler, {**params, **data})
- res = handler(**params)
- if asyncio.iscoroutine(res):
- res = await res
- # return result of command to client
- return await self.send(id=id, result=command, data=res)
- raise KeyError("Unknown command")
- except Exception as exc: # pylint:disable=broad-except
- return await self.send(result=command, error=str(exc))
-
- async def handle_event(self, event: str, data: Any):
- """Handle command message."""
- LOGGER.info("received event %s", event)
- if self.authenticated:
- self.mass.signal_event(event, data)
-
- async def auth(self, token: str):
- """Handle authentication with JWT token."""
- token_info = jwt.decode(token, self.mass.web.jwt_key)
- if self.mass.web.is_token_revoked(None, token_info):
- raise Exception("Token is revoked")
- self.authenticated = True
- return token_info
aiohttp_cors==0.7.0
unidecode==1.1.1
PyJWT==1.7.1
-aiohttp_jwt==0.6.1
-zeroconf==0.28.6
+zeroconf==0.28.7
passlib==1.7.4
cryptography==3.2.1
ujson==4.0.1