from concurrent import futures
from contextlib import suppress
from functools import partial
-from typing import TYPE_CHECKING, Any, Final
+from typing import TYPE_CHECKING, Any, Final, cast
import aiofiles
from aiohttp import WSMsgType, web
assert isinstance(port_value, int)
self.publish_port = port_value
self.publish_ip = default_publish_ip
- bind_ip = config.get_value(CONF_BIND_IP)
- assert isinstance(bind_ip, str)
+ bind_ip = cast("str | None", config.get_value(CONF_BIND_IP))
# print a big fat message in the log where the webserver is running
# because this is a common source of issues for people with more complex setups
if not self.mass.config.onboard_done:
try:
args = parse_arguments(handler.signature, handler.type_hints, command_msg.args)
result: Any = handler.target(**args)
- if asyncio.iscoroutine(result):
- result = await result
if hasattr(result, "__anext__"):
# handle async generator (for really large listings)
result = [item async for item in result]
while not self.wsock.closed:
if (process := await self._to_write.get()) is None:
break
- self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", process)
- await self.wsock.send_str(process)
if callable(process):
message: str = process()