From 413dd11e539b97c00c91e6c3656586562e5821c3 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 15 Dec 2025 10:49:10 +0100 Subject: [PATCH] Fix signaling server reconnect logic --- .../webserver/remote_access/gateway.py | 109 ++++++++++++------ 1 file changed, 72 insertions(+), 37 deletions(-) diff --git a/music_assistant/controllers/webserver/remote_access/gateway.py b/music_assistant/controllers/webserver/remote_access/gateway.py index 18f247ed..57483248 100644 --- a/music_assistant/controllers/webserver/remote_access/gateway.py +++ b/music_assistant/controllers/webserver/remote_access/gateway.py @@ -55,6 +55,10 @@ class WebRTCGateway: 4. Bridges WebRTC DataChannel messages to the local WebSocket API """ + # Close code 4000 means this connection was replaced by a new one from the same server + # In that case, we should not reconnect as another connection is now active + CLOSE_CODE_REPLACED = 4000 + # Default ICE servers (public STUN only - used as fallback) DEFAULT_ICE_SERVERS: list[dict[str, Any]] = [ {"urls": "stun:stun.home-assistant.io:3478"}, @@ -135,6 +139,9 @@ class WebRTCGateway: async def start(self) -> None: """Start the WebRTC Gateway.""" + if self._running: + self.logger.warning("WebRTC Gateway already running, skipping start") + return self.logger.info("Starting WebRTC Gateway") self.logger.debug("Signaling URL: %s", self.signaling_url) self.logger.debug("Local WS URL: %s", self.local_ws_url) @@ -170,11 +177,12 @@ class WebRTCGateway: """Run the main loop with reconnection logic.""" self.logger.debug("WebRTC Gateway _run() loop starting") while self._running: + should_reconnect = True try: - await self._connect_to_signaling() + should_reconnect = await self._connect_to_signaling() # Connection closed gracefully or with error self._is_connected = False - if self._running: + if self._running and should_reconnect: self.logger.warning( "Signaling server connection lost. Reconnecting in %ss...", self._current_reconnect_delay, @@ -188,19 +196,28 @@ class WebRTCGateway: self._current_reconnect_delay, ) - if self._running: + if self._running and should_reconnect: await asyncio.sleep(self._current_reconnect_delay) # Exponential backoff with max limit self._current_reconnect_delay = min( self._current_reconnect_delay * 2, self._max_reconnect_delay ) + elif not should_reconnect: + # Connection was replaced by another instance, stop the run loop + self.logger.info("Connection replaced, stopping reconnection attempts") + self._running = False + break - async def _connect_to_signaling(self) -> None: - """Connect to the signaling server.""" + async def _connect_to_signaling(self) -> bool: + """Connect to the signaling server. + + :return: True if reconnection should be attempted, False if connection was replaced. + """ if self._connecting: self.logger.warning("Already connecting to signaling server, skipping") - return + return False # Don't trigger another reconnect cycle self._connecting = True + close_code: int | None = None self.logger.info("Connecting to signaling server: %s", self.signaling_url) try: self._signaling_ws = await self.http_session.ws_connect( @@ -213,42 +230,17 @@ class WebRTCGateway: self._current_reconnect_delay = self._reconnect_delay self.logger.debug("Registration sent, waiting for confirmation...") - # Message loop - self.logger.debug("Entering message loop") - async for msg in self._signaling_ws: - if msg.type == aiohttp.WSMsgType.TEXT: - try: - await self._handle_signaling_message(json.loads(msg.data)) - except Exception: - self.logger.exception("Error handling signaling message") - elif msg.type == aiohttp.WSMsgType.PING: - # WebSocket ping - autoping should handle this, just log - self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PING") - elif msg.type == aiohttp.WSMsgType.PONG: - # WebSocket pong response - just log - self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PONG") - elif msg.type == aiohttp.WSMsgType.CLOSE: - # Close frame received - self.logger.warning( - "Signaling server sent close frame: code=%s, reason=%s", - msg.data, - msg.extra, - ) - break - elif msg.type == aiohttp.WSMsgType.CLOSED: - self.logger.warning("Signaling server closed connection") - break - elif msg.type == aiohttp.WSMsgType.ERROR: - self.logger.error("WebSocket error: %s", self._signaling_ws.exception()) - break - else: - self.logger.warning("Unexpected WebSocket message type: %s", msg.type) + # Run message loop and get close code + close_code = await self._signaling_message_loop(self._signaling_ws) + # Get close code from WebSocket if not already set from CLOSE message + if close_code is None: + close_code = self._signaling_ws.close_code ws_exception = self._signaling_ws.exception() self.logger.debug( "Message loop exited - WebSocket closed: %s, close_code: %s, exception: %s", self._signaling_ws.closed, - self._signaling_ws.close_code, + close_code, ws_exception, ) except TimeoutError: @@ -262,6 +254,49 @@ class WebRTCGateway: self._connecting = False self._signaling_ws = None + # Check if this connection was replaced by another one + if close_code == self.CLOSE_CODE_REPLACED: + self.logger.info("Connection was replaced by another instance - not reconnecting") + return False + + return True + + async def _signaling_message_loop(self, ws: aiohttp.ClientWebSocketResponse) -> int | None: + """Process messages from the signaling WebSocket. + + :param ws: The WebSocket connection to process messages from. + :return: Close code if connection was closed with a code, None otherwise. + """ + close_code: int | None = None + self.logger.debug("Entering message loop") + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + try: + await self._handle_signaling_message(json.loads(msg.data)) + except Exception: + self.logger.exception("Error handling signaling message") + elif msg.type == aiohttp.WSMsgType.PING: + self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PING") + elif msg.type == aiohttp.WSMsgType.PONG: + self.logger.log(VERBOSE_LOG_LEVEL, "Received WebSocket PONG") + elif msg.type == aiohttp.WSMsgType.CLOSE: + close_code = msg.data + self.logger.warning( + "Signaling server sent close frame: code=%s, reason=%s", + msg.data, + msg.extra, + ) + break + elif msg.type == aiohttp.WSMsgType.CLOSED: + self.logger.warning("Signaling server closed connection") + break + elif msg.type == aiohttp.WSMsgType.ERROR: + self.logger.error("WebSocket error: %s", ws.exception()) + break + else: + self.logger.warning("Unexpected WebSocket message type: %s", msg.type) + return close_code + async def _register(self) -> None: """Register with the signaling server.""" if self._signaling_ws: -- 2.34.1