4. Bridges WebRTC DataChannel messages to the local WebSocket API
"""
+ # Close code 4000 means this connection was replaced by a new one from the same server
+ # In that case, we should not reconnect as another connection is now active
+ CLOSE_CODE_REPLACED = 4000
+
# Default ICE servers (public STUN only - used as fallback)
DEFAULT_ICE_SERVERS: list[dict[str, Any]] = [
{"urls": "stun:stun.home-assistant.io:3478"},
async def start(self) -> None:
"""Start the WebRTC Gateway."""
+ if self._running:
+ self.logger.warning("WebRTC Gateway already running, skipping start")
+ return
self.logger.info("Starting WebRTC Gateway")
self.logger.debug("Signaling URL: %s", self.signaling_url)
self.logger.debug("Local WS URL: %s", self.local_ws_url)
"""Run the main loop with reconnection logic."""
self.logger.debug("WebRTC Gateway _run() loop starting")
while self._running:
+ should_reconnect = True
try:
- await self._connect_to_signaling()
+ should_reconnect = await self._connect_to_signaling()
# Connection closed gracefully or with error
self._is_connected = False
- if self._running:
+ if self._running and should_reconnect:
self.logger.warning(
"Signaling server connection lost. Reconnecting in %ss...",
self._current_reconnect_delay,
self._current_reconnect_delay,
)
- if self._running:
+ if self._running and should_reconnect:
await asyncio.sleep(self._current_reconnect_delay)
# Exponential backoff with max limit
self._current_reconnect_delay = min(
self._current_reconnect_delay * 2, self._max_reconnect_delay
)
+ elif not should_reconnect:
+ # Connection was replaced by another instance, stop the run loop
+ self.logger.info("Connection replaced, stopping reconnection attempts")
+ self._running = False
+ break
- async def _connect_to_signaling(self) -> None:
- """Connect to the signaling server."""
+ async def _connect_to_signaling(self) -> bool:
+ """Connect to the signaling server.
+
+ :return: True if reconnection should be attempted, False if connection was replaced.
+ """
if self._connecting:
self.logger.warning("Already connecting to signaling server, skipping")
- return
+ return False # Don't trigger another reconnect cycle
self._connecting = True
+ close_code: int | None = None
self.logger.info("Connecting to signaling server: %s", self.signaling_url)
try:
self._signaling_ws = await self.http_session.ws_connect(
self._current_reconnect_delay = self._reconnect_delay
self.logger.debug("Registration sent, waiting for confirmation...")
- # Message loop
- self.logger.debug("Entering message loop")
- async for msg in self._signaling_ws:
- if msg.type == aiohttp.WSMsgType.TEXT:
- try:
- await self._handle_signaling_message(json.loads(msg.data))
- except Exception:
- self.logger.exception("Error handling signaling message")
- elif msg.type == aiohttp.WSMsgType.PING:
- # WebSocket ping - autoping should handle this, just log
- self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PING")
- elif msg.type == aiohttp.WSMsgType.PONG:
- # WebSocket pong response - just log
- self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PONG")
- elif msg.type == aiohttp.WSMsgType.CLOSE:
- # Close frame received
- self.logger.warning(
- "Signaling server sent close frame: code=%s, reason=%s",
- msg.data,
- msg.extra,
- )
- break
- elif msg.type == aiohttp.WSMsgType.CLOSED:
- self.logger.warning("Signaling server closed connection")
- break
- elif msg.type == aiohttp.WSMsgType.ERROR:
- self.logger.error("WebSocket error: %s", self._signaling_ws.exception())
- break
- else:
- self.logger.warning("Unexpected WebSocket message type: %s", msg.type)
+ # Run message loop and get close code
+ close_code = await self._signaling_message_loop(self._signaling_ws)
+ # Get close code from WebSocket if not already set from CLOSE message
+ if close_code is None:
+ close_code = self._signaling_ws.close_code
ws_exception = self._signaling_ws.exception()
self.logger.debug(
"Message loop exited - WebSocket closed: %s, close_code: %s, exception: %s",
self._signaling_ws.closed,
- self._signaling_ws.close_code,
+ close_code,
ws_exception,
)
except TimeoutError:
self._connecting = False
self._signaling_ws = None
+ # Check if this connection was replaced by another one
+ if close_code == self.CLOSE_CODE_REPLACED:
+ self.logger.info("Connection was replaced by another instance - not reconnecting")
+ return False
+
+ return True
+
+ async def _signaling_message_loop(self, ws: aiohttp.ClientWebSocketResponse) -> int | None:
+ """Process messages from the signaling WebSocket.
+
+ :param ws: The WebSocket connection to process messages from.
+ :return: Close code if connection was closed with a code, None otherwise.
+ """
+ close_code: int | None = None
+ self.logger.debug("Entering message loop")
+ async for msg in ws:
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ try:
+ await self._handle_signaling_message(json.loads(msg.data))
+ except Exception:
+ self.logger.exception("Error handling signaling message")
+ elif msg.type == aiohttp.WSMsgType.PING:
+ self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PING")
+ elif msg.type == aiohttp.WSMsgType.PONG:
+ self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PONG")
+ elif msg.type == aiohttp.WSMsgType.CLOSE:
+ close_code = msg.data
+ self.logger.warning(
+ "Signaling server sent close frame: code=%s, reason=%s",
+ msg.data,
+ msg.extra,
+ )
+ break
+ elif msg.type == aiohttp.WSMsgType.CLOSED:
+ self.logger.warning("Signaling server closed connection")
+ break
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ self.logger.error("WebSocket error: %s", ws.exception())
+ break
+ else:
+ self.logger.warning("Unexpected WebSocket message type: %s", msg.type)
+ return close_code
+
async def _register(self) -> None:
"""Register with the signaling server."""
if self._signaling_ws: