check_audio_support,
crossfade_pcm_parts,
get_media_stream,
- get_preview_stream,
get_stream_details,
)
from music_assistant.server.helpers.process import AsyncProcess
bind_port=self.publish_port,
base_url=f"http://{self.publish_ip}:{self.publish_port}",
static_routes=[
- ("GET", "/preview", self.serve_preview_stream),
(
"GET",
"/{queue_id}/multi/{job_id}/{player_id}/{queue_item_id}.{fmt}",
return resp
- async def serve_preview_stream(self, request: web.Request):
- """Serve short preview sample."""
- self._log_request(request)
- provider_instance_id_or_domain = request.query["provider"]
- item_id = urllib.parse.unquote(request.query["item_id"])
- resp = web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "audio/mp3"})
- await resp.prepare(request)
- async for chunk in get_preview_stream(self.mass, provider_instance_id_or_domain, item_id):
- await resp.write(chunk)
- return resp
-
async def get_flow_stream(
self,
queue: PlayerQueue,
import inspect
import logging
import os
+import urllib.parse
from collections.abc import Awaitable
from concurrent import futures
from contextlib import suppress
from music_assistant.common.models.event import MassEvent
from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT
from music_assistant.server.helpers.api import APICommandHandler, parse_arguments
+from music_assistant.server.helpers.audio import get_preview_stream
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
# if a user also wants to expose a the webserver non securely on his internal
# network he/she should open the port in the add-on config.
internal_ip = next((x for x in await get_ips() if x.startswith("172")), await get_ip())
- base_url = f"http://{internal_ip:8095}"
+ base_url = f"http://{internal_ip}:8095"
return (
ConfigEntry(
key=CONF_BIND_PORT,
# also host the image proxy on the webserver
routes.append(("GET", "/imageproxy", self.mass.metadata.handle_imageproxy))
# also host the audio preview service
- routes.append(("GET", "/preview", self.mass.streams.serve_preview_stream))
+ routes.append(("GET", "/preview", self.serve_preview_stream))
# start the webserver
+ self.publish_port = config.get_value(CONF_BIND_PORT)
+ self.publish_ip = config.get_value(CONF_BIND_IP)
await self._server.setup(
- bind_ip=config.get_value(CONF_BIND_IP),
- bind_port=config.get_value(CONF_BIND_PORT),
+ bind_ip=self.publish_ip,
+ bind_port=self.publish_port,
base_url=config.get_value(CONF_BASE_URL),
static_routes=routes,
# add assets subdir as static_content
await client.disconnect()
await self._server.close()
+ async def serve_preview_stream(self, request: web.Request):
+ """Serve short preview sample."""
+ self._log_request(request)
+ provider_instance_id_or_domain = request.query["provider"]
+ item_id = urllib.parse.unquote(request.query["item_id"])
+ resp = web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "audio/mp3"})
+ await resp.prepare(request)
+ async for chunk in get_preview_stream(self.mass, provider_instance_id_or_domain, item_id):
+ await resp.write(chunk)
+ return resp
+
async def _handle_server_info(self, request: web.Request) -> web.Response: # noqa: ARG002
"""Handle request for server info."""
return web.json_response(self.mass.get_server_info().to_dict())
import urllib.error
import urllib.parse
import urllib.request
-from contextlib import suppress
from functools import lru_cache
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
async def is_hass_supervisor() -> bool:
"""Return if we're running inside the HA Supervisor (e.g. HAOS)."""
- with suppress(urllib.error.URLError):
- res = await asyncio.to_thread(urllib.request.urlopen, "http://supervisor/core")
- # this should return a 401 unauthorized if it exists
- return res.code == 401
- return False
+
+ def _check():
+ try:
+ urllib.request.urlopen("http://supervisor/core")
+ except urllib.error.URLError as err:
+ # this should return a 401 unauthorized if it exists
+ return getattr(err, "code", 999) == 401
+ except Exception:
+ return False
+
+ return await asyncio.to_thread(_check)
async def get_provider_module(domain: str) -> ProviderModuleType: