Cleanup webrtc logic
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 6 Dec 2025 13:50:57 +0000 (14:50 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 6 Dec 2025 13:50:57 +0000 (14:50 +0100)
music_assistant/controllers/webserver/remote_access/gateway.py
music_assistant/providers/sendspin/provider.py

index 9f643042f52c9581b92e0d2f47d9f3a8a93051a4..b1ca4abd6a19fcd5911eacb06b62f40932c17f5f 100644 (file)
@@ -20,11 +20,11 @@ from typing import Any
 import aiohttp
 from aiortc import (
     RTCConfiguration,
-    RTCIceCandidate,
     RTCIceServer,
     RTCPeerConnection,
     RTCSessionDescription,
 )
+from aiortc.sdp import candidate_from_sdp
 
 from music_assistant.constants import MASS_LOGGER_NAME
 
@@ -270,27 +270,13 @@ class WebRTCGateway:
     async def _register(self) -> None:
         """Register with the signaling server."""
         if self._signaling_ws:
-            # Prepare ICE servers for the signaling server to relay to clients
-            # Mask credentials in logs for security
-            ice_servers_for_log = [
-                {k: v if k != "credential" else "***" for k, v in s.items()}
-                for s in self.ice_servers
-            ]
-            registration_msg = {
-                "type": "register-server",
-                "remoteId": self.remote_id,
-                "iceServers": self.ice_servers,
-            }
-            self.logger.debug(
-                "Sending registration with Remote ID: %s and %d ICE servers: %s",
-                self.remote_id,
-                len(self.ice_servers),
-                ice_servers_for_log,
+            await self._signaling_ws.send_json(
+                {
+                    "type": "register-server",
+                    "remoteId": self.remote_id,
+                    "iceServers": self.ice_servers,
+                }
             )
-            await self._signaling_ws.send_json(registration_msg)
-            self.logger.debug("Registration message sent successfully")
-        else:
-            self.logger.warning("Cannot register: signaling websocket is not connected")
 
     async def _handle_signaling_message(self, message: dict[str, Any]) -> None:
         """Handle incoming signaling messages.
@@ -298,14 +284,11 @@ class WebRTCGateway:
         :param message: The signaling message.
         """
         msg_type = message.get("type")
-        self.logger.debug("Received signaling message: %s - Full message: %s", msg_type, message)
 
         if msg_type == "ping":
-            # Respond to ping with pong
             if self._signaling_ws:
                 await self._signaling_ws.send_json({"type": "pong"})
         elif msg_type == "pong":
-            # Server responded to our ping, connection is alive
             pass
         elif msg_type == "registered":
             self._is_connected = True
@@ -317,8 +300,7 @@ class WebRTCGateway:
             session_id = message.get("sessionId")
             if session_id:
                 await self._create_session(session_id)
-                # Send session-ready with fresh ICE servers back to signaling server
-                # This allows the signaling server to relay fresh TURN credentials to the client
+                # Send session-ready with fresh ICE servers for the client
                 fresh_ice_servers = await self._get_fresh_ice_servers()
                 if self._signaling_ws:
                     await self._signaling_ws.send_json(
@@ -348,17 +330,7 @@ class WebRTCGateway:
 
         :param session_id: The session ID.
         """
-        # Get fresh ICE servers for this session (important for TURN credentials that expire)
         session_ice_servers = await self._get_fresh_ice_servers()
-        self.logger.debug(
-            "Creating session %s with %d ICE servers: %s",
-            session_id,
-            len(session_ice_servers),
-            [
-                {k: v if k != "credential" else "***" for k, v in s.items()}
-                for s in session_ice_servers
-            ],
-        )
         config = RTCConfiguration(
             iceServers=[RTCIceServer(**server) for server in session_ice_servers]
         )
@@ -368,10 +340,6 @@ class WebRTCGateway:
 
         @pc.on("datachannel")
         def on_datachannel(channel: Any) -> None:
-            # Main API channel (ma-api)
-            self.logger.debug(
-                "Received API DataChannel '%s' for session %s", channel.label, session_id
-            )
             session.data_channel = channel
             asyncio.create_task(self._setup_data_channel(session))
 
@@ -392,22 +360,9 @@ class WebRTCGateway:
 
         @pc.on("connectionstatechange")
         async def on_connectionstatechange() -> None:
-            self.logger.debug("Session %s connection state: %s", session_id, pc.connectionState)
             if pc.connectionState == "failed":
                 await self._close_session(session_id)
 
-        @pc.on("iceconnectionstatechange")
-        async def on_iceconnectionstatechange() -> None:
-            self.logger.debug(
-                "Session %s ICE connection state: %s", session_id, pc.iceConnectionState
-            )
-
-        @pc.on("icegatheringstatechange")
-        async def on_icegatheringstatechange() -> None:
-            self.logger.debug(
-                "Session %s ICE gathering state: %s", session_id, pc.iceGatheringState
-            )
-
     async def _handle_offer(self, session_id: str, offer: dict[str, Any]) -> None:
         """Handle incoming WebRTC offer.
 
@@ -419,13 +374,7 @@ class WebRTCGateway:
             return
         pc = session.peer_connection
 
-        # Check if peer connection is already closed or closing
         if pc.connectionState in ("closed", "failed"):
-            self.logger.debug(
-                "Ignoring offer for session %s - connection state: %s",
-                session_id,
-                pc.connectionState,
-            )
             return
 
         sdp = offer.get("sdp")
@@ -442,44 +391,31 @@ class WebRTCGateway:
                 )
             )
 
-            # Check again if session was closed during setRemoteDescription
             if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
-                self.logger.debug(
-                    "Session %s closed during setRemoteDescription, aborting offer handling",
-                    session_id,
-                )
                 return
 
             answer = await pc.createAnswer()
 
-            # Check again before setLocalDescription
             if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
-                self.logger.debug(
-                    "Session %s closed during createAnswer, aborting offer handling",
-                    session_id,
-                )
                 return
 
             await pc.setLocalDescription(answer)
 
-            # Final check before sending answer
+            # Wait for ICE gathering to complete before sending the answer
+            # aiortc doesn't support trickle ICE, candidates are embedded in SDP after gathering
+            gather_timeout = 30
+            gather_start = asyncio.get_event_loop().time()
+            while pc.iceGatheringState != "complete":
+                if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
+                    return
+                if asyncio.get_event_loop().time() - gather_start > gather_timeout:
+                    self.logger.warning("Session %s ICE gathering timeout", session_id)
+                    break
+                await asyncio.sleep(0.1)
+
             if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
-                self.logger.debug(
-                    "Session %s closed during setLocalDescription, skipping answer transmission",
-                    session_id,
-                )
                 return
 
-            # Debug: Log ICE candidates in the answer SDP
-            sdp_lines = pc.localDescription.sdp.split("\n")
-            ice_candidates = [line for line in sdp_lines if line.startswith("a=candidate:")]
-            self.logger.debug(
-                "Session %s answer SDP contains %d ICE candidates: %s",
-                session_id,
-                len(ice_candidates),
-                ice_candidates,
-            )
-
             if self._signaling_ws:
                 await self._signaling_ws.send_json(
                     {
@@ -506,14 +442,8 @@ class WebRTCGateway:
         if not session or not candidate:
             return
 
-        # Check if peer connection is already closed or closing
         pc = session.peer_connection
         if pc.connectionState in ("closed", "failed"):
-            self.logger.debug(
-                "Ignoring ICE candidate for session %s - connection state: %s",
-                session_id,
-                pc.connectionState,
-            )
             return
 
         candidate_str = candidate.get("candidate")
@@ -523,35 +453,25 @@ class WebRTCGateway:
         if not candidate_str:
             return
 
-        # Create RTCIceCandidate from the SDP string
         try:
-            ice_candidate = RTCIceCandidate(
-                component=1,
-                foundation="",
-                ip="",
-                port=0,
-                priority=0,
-                protocol="udp",
-                type="host",
-                sdpMid=str(sdp_mid) if sdp_mid else None,
-                sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None,
+            # Parse ICE candidate - browser sends "candidate:..." format
+            if candidate_str.startswith("candidate:"):
+                sdp_candidate_str = candidate_str[len("candidate:") :]
+            else:
+                sdp_candidate_str = candidate_str
+
+            ice_candidate = candidate_from_sdp(sdp_candidate_str)
+            ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None
+            ice_candidate.sdpMLineIndex = (
+                int(sdp_mline_index) if sdp_mline_index is not None else None
             )
-            # Parse the candidate string to populate the fields
-            ice_candidate.candidate = str(candidate_str)  # type: ignore[attr-defined]
 
-            # Check if session was closed before adding candidate
             if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
-                self.logger.debug(
-                    "Session %s closed before adding ICE candidate, skipping",
-                    session_id,
-                )
                 return
 
             await session.peer_connection.addIceCandidate(ice_candidate)
         except Exception:
-            self.logger.exception(
-                "Failed to add ICE candidate for session %s: %s", session_id, candidate
-            )
+            self.logger.exception("Failed to add ICE candidate for session %s", session_id)
 
     async def _setup_data_channel(self, session: WebRTCSession) -> None:
         """Set up data channel and bridge to local WebSocket.
index eb65f0e6839a1ca7548b11f92a778a9e1900372b..05f92663b1410951e23435acd2e367ba4e64a41c 100644 (file)
@@ -12,11 +12,11 @@ from typing import TYPE_CHECKING, Any, cast
 import aiohttp
 from aiortc import (
     RTCConfiguration,
-    RTCIceCandidate,
     RTCIceServer,
     RTCPeerConnection,
     RTCSessionDescription,
 )
+from aiortc.sdp import candidate_from_sdp
 from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
 from music_assistant_models.enums import ProviderFeature
 
@@ -188,8 +188,33 @@ class SendspinProvider(PlayerProvider):
         answer = await pc.createAnswer()
         await pc.setLocalDescription(answer)
 
-        # Wait briefly for ICE gathering
-        await asyncio.sleep(0.1)
+        # Wait for ICE gathering to complete before sending the answer
+        # This is required because aiortc doesn't support trickle ICE -
+        # candidates are only embedded in the SDP after gathering is complete
+        # See: https://github.com/aiortc/aiortc/issues/1344
+        gather_timeout = 30  # seconds
+        gather_start = asyncio.get_event_loop().time()
+        while pc.iceGatheringState != "complete":
+            if pc.connectionState in ("closed", "failed"):
+                self.logger.debug(
+                    "Sendspin session %s closed during ICE gathering, aborting",
+                    session_id,
+                )
+                raise RuntimeError("Connection closed during ICE gathering")
+            if asyncio.get_event_loop().time() - gather_start > gather_timeout:
+                self.logger.warning(
+                    "Sendspin session %s ICE gathering timeout after %ds, sending answer anyway",
+                    session_id,
+                    gather_timeout,
+                )
+                break
+            await asyncio.sleep(0.1)
+
+        self.logger.debug(
+            "Sendspin session %s ICE gathering completed in %.1fs",
+            session_id,
+            asyncio.get_event_loop().time() - gather_start,
+        )
 
         return {
             "session_id": session_id,
@@ -223,20 +248,18 @@ class SendspinProvider(PlayerProvider):
             if not candidate_str:
                 return {"success": False}
 
-            # Create RTCIceCandidate from the SDP string
-            ice_candidate = RTCIceCandidate(
-                component=1,
-                foundation="",
-                ip="",
-                port=0,
-                priority=0,
-                protocol="udp",
-                type="host",
-                sdpMid=str(sdp_mid) if sdp_mid else None,
-                sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None,
+            # Parse the ICE candidate using aiortc's candidate_from_sdp function
+            # Browser sends "candidate:..." format, candidate_from_sdp expects without prefix
+            if candidate_str.startswith("candidate:"):
+                sdp_candidate_str = candidate_str[len("candidate:") :]
+            else:
+                sdp_candidate_str = candidate_str
+
+            ice_candidate = candidate_from_sdp(sdp_candidate_str)
+            ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None
+            ice_candidate.sdpMLineIndex = (
+                int(sdp_mline_index) if sdp_mline_index is not None else None
             )
-            # Parse the candidate string to populate the fields
-            ice_candidate.candidate = str(candidate_str)  # type: ignore[attr-defined]
 
             await session.peer_connection.addIceCandidate(ice_candidate)
             return {"success": True}