import logging
import secrets
import string
+from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any
4. Bridges WebRTC DataChannel messages to the local WebSocket API
"""
+ # Default ICE servers (public STUN only - used as fallback)
+ DEFAULT_ICE_SERVERS: list[dict[str, Any]] = [
+ {"urls": "stun:stun.home-assistant.io:3478"},
+ {"urls": "stun:stun.l.google.com:19302"},
+ {"urls": "stun:stun1.l.google.com:19302"},
+ {"urls": "stun:stun.cloudflare.com:3478"},
+ ]
+
def __init__(
self,
http_session: aiohttp.ClientSession,
local_ws_url: str = "ws://localhost:8095/ws",
ice_servers: list[dict[str, Any]] | None = None,
remote_id: str | None = None,
+ ice_servers_callback: Callable[[], Awaitable[list[dict[str, Any]]]] | None = None,
) -> None:
"""Initialize the WebRTC Gateway.
:param http_session: Shared aiohttp ClientSession to use for HTTP/WebSocket connections.
:param signaling_url: WebSocket URL of the signaling server.
:param local_ws_url: Local WebSocket URL to bridge to.
- :param ice_servers: List of ICE server configurations.
+ :param ice_servers: List of ICE server configurations (used at registration time).
:param remote_id: Optional Remote ID to use (generated if not provided).
+ :param ice_servers_callback: Optional callback to fetch fresh ICE servers for each session.
+ If provided, this will be called for each client connection to get fresh TURN
+ credentials. If not provided, the static ice_servers will be used.
"""
self.http_session = http_session
self.signaling_url = signaling_url
self.local_ws_url = local_ws_url
self.remote_id = remote_id or generate_remote_id()
self.logger = LOGGER
+ self._ice_servers_callback = ice_servers_callback
- self.ice_servers = ice_servers or [
- {"urls": "stun:stun.home-assistant.io:3478"},
- {"urls": "stun:stun.l.google.com:19302"},
- {"urls": "stun:stun1.l.google.com:19302"},
- {"urls": "stun:stun.cloudflare.com:3478"},
- ]
+ # Static ICE servers used at registration time (relayed to clients via signaling server)
+ self.ice_servers = ice_servers or self.DEFAULT_ICE_SERVERS
self.sessions: dict[str, WebRTCSession] = {}
self._signaling_ws: aiohttp.ClientWebSocketResponse | None = None
"""Return whether the gateway is connected to the signaling server."""
return self._is_connected
+ async def _get_fresh_ice_servers(self) -> list[dict[str, Any]]:
+ """Get fresh ICE servers for a new WebRTC session.
+
+ If an ice_servers_callback was provided, it will be called to get fresh
+ TURN credentials. Otherwise, returns the static ice_servers.
+
+ :return: List of ICE server configurations with fresh credentials.
+ """
+ if self._ice_servers_callback:
+ try:
+ fresh_servers = await self._ice_servers_callback()
+ if fresh_servers:
+ return fresh_servers
+ except Exception:
+ self.logger.exception("Failed to fetch fresh ICE servers, using cached servers")
+ return self.ice_servers
+
async def start(self) -> None:
"""Start the WebRTC Gateway."""
self.logger.info("Starting WebRTC Gateway with Remote ID: %s", self.remote_id)
session_id = message.get("sessionId")
if session_id:
await self._create_session(session_id)
+ # Send session-ready with fresh ICE servers back to signaling server
+ # This allows the signaling server to relay fresh TURN credentials to the client
+ fresh_ice_servers = await self._get_fresh_ice_servers()
+ if self._signaling_ws:
+ await self._signaling_ws.send_json(
+ {
+ "type": "session-ready",
+ "sessionId": session_id,
+ "iceServers": fresh_ice_servers,
+ }
+ )
elif msg_type == "client-disconnected":
session_id = message.get("sessionId")
if session_id:
:param session_id: The session ID.
"""
+ # Get fresh ICE servers for this session (important for TURN credentials that expire)
+ session_ice_servers = await self._get_fresh_ice_servers()
self.logger.debug(
"Creating session %s with %d ICE servers: %s",
session_id,
- len(self.ice_servers),
+ len(session_ice_servers),
[
{k: v if k != "credential" else "***" for k, v in s.items()}
- for s in self.ice_servers
+ for s in session_ice_servers
],
)
config = RTCConfiguration(
- iceServers=[RTCIceServer(**server) for server in self.ice_servers]
+ iceServers=[RTCIceServer(**server) for server in session_ice_servers]
)
pc = RTCPeerConnection(configuration=config)
session = WebRTCSession(session_id=session_id, peer_connection=pc)
@pc.on("connectionstatechange")
async def on_connectionstatechange() -> None:
+ self.logger.debug("Session %s connection state: %s", session_id, pc.connectionState)
if pc.connectionState == "failed":
await self._close_session(session_id)
+ @pc.on("iceconnectionstatechange")
+ async def on_iceconnectionstatechange() -> None:
+ self.logger.debug(
+ "Session %s ICE connection state: %s", session_id, pc.iceConnectionState
+ )
+
+ @pc.on("icegatheringstatechange")
+ async def on_icegatheringstatechange() -> None:
+ self.logger.debug(
+ "Session %s ICE gathering state: %s", session_id, pc.iceGatheringState
+ )
+
async def _handle_offer(self, session_id: str, offer: dict[str, Any]) -> None:
"""Handle incoming WebRTC offer.