From cafe6d1541be4fe96618f755eb44a9f7c78c4107 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sat, 6 Dec 2025 14:50:57 +0100 Subject: [PATCH] Cleanup webrtc logic --- .../webserver/remote_access/gateway.py | 142 ++++-------------- .../providers/sendspin/provider.py | 55 +++++-- 2 files changed, 70 insertions(+), 127 deletions(-) diff --git a/music_assistant/controllers/webserver/remote_access/gateway.py b/music_assistant/controllers/webserver/remote_access/gateway.py index 9f643042..b1ca4abd 100644 --- a/music_assistant/controllers/webserver/remote_access/gateway.py +++ b/music_assistant/controllers/webserver/remote_access/gateway.py @@ -20,11 +20,11 @@ from typing import Any import aiohttp from aiortc import ( RTCConfiguration, - RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription, ) +from aiortc.sdp import candidate_from_sdp from music_assistant.constants import MASS_LOGGER_NAME @@ -270,27 +270,13 @@ class WebRTCGateway: async def _register(self) -> None: """Register with the signaling server.""" if self._signaling_ws: - # Prepare ICE servers for the signaling server to relay to clients - # Mask credentials in logs for security - ice_servers_for_log = [ - {k: v if k != "credential" else "***" for k, v in s.items()} - for s in self.ice_servers - ] - registration_msg = { - "type": "register-server", - "remoteId": self.remote_id, - "iceServers": self.ice_servers, - } - self.logger.debug( - "Sending registration with Remote ID: %s and %d ICE servers: %s", - self.remote_id, - len(self.ice_servers), - ice_servers_for_log, + await self._signaling_ws.send_json( + { + "type": "register-server", + "remoteId": self.remote_id, + "iceServers": self.ice_servers, + } ) - await self._signaling_ws.send_json(registration_msg) - self.logger.debug("Registration message sent successfully") - else: - self.logger.warning("Cannot register: signaling websocket is not connected") async def _handle_signaling_message(self, message: dict[str, Any]) -> None: """Handle incoming signaling messages. @@ -298,14 +284,11 @@ class WebRTCGateway: :param message: The signaling message. """ msg_type = message.get("type") - self.logger.debug("Received signaling message: %s - Full message: %s", msg_type, message) if msg_type == "ping": - # Respond to ping with pong if self._signaling_ws: await self._signaling_ws.send_json({"type": "pong"}) elif msg_type == "pong": - # Server responded to our ping, connection is alive pass elif msg_type == "registered": self._is_connected = True @@ -317,8 +300,7 @@ class WebRTCGateway: session_id = message.get("sessionId") if session_id: await self._create_session(session_id) - # Send session-ready with fresh ICE servers back to signaling server - # This allows the signaling server to relay fresh TURN credentials to the client + # Send session-ready with fresh ICE servers for the client fresh_ice_servers = await self._get_fresh_ice_servers() if self._signaling_ws: await self._signaling_ws.send_json( @@ -348,17 +330,7 @@ class WebRTCGateway: :param session_id: The session ID. """ - # Get fresh ICE servers for this session (important for TURN credentials that expire) session_ice_servers = await self._get_fresh_ice_servers() - self.logger.debug( - "Creating session %s with %d ICE servers: %s", - session_id, - len(session_ice_servers), - [ - {k: v if k != "credential" else "***" for k, v in s.items()} - for s in session_ice_servers - ], - ) config = RTCConfiguration( iceServers=[RTCIceServer(**server) for server in session_ice_servers] ) @@ -368,10 +340,6 @@ class WebRTCGateway: @pc.on("datachannel") def on_datachannel(channel: Any) -> None: - # Main API channel (ma-api) - self.logger.debug( - "Received API DataChannel '%s' for session %s", channel.label, session_id - ) session.data_channel = channel asyncio.create_task(self._setup_data_channel(session)) @@ -392,22 +360,9 @@ class WebRTCGateway: @pc.on("connectionstatechange") async def on_connectionstatechange() -> None: - self.logger.debug("Session %s connection state: %s", session_id, pc.connectionState) if pc.connectionState == "failed": await self._close_session(session_id) - @pc.on("iceconnectionstatechange") - async def on_iceconnectionstatechange() -> None: - self.logger.debug( - "Session %s ICE connection state: %s", session_id, pc.iceConnectionState - ) - - @pc.on("icegatheringstatechange") - async def on_icegatheringstatechange() -> None: - self.logger.debug( - "Session %s ICE gathering state: %s", session_id, pc.iceGatheringState - ) - async def _handle_offer(self, session_id: str, offer: dict[str, Any]) -> None: """Handle incoming WebRTC offer. @@ -419,13 +374,7 @@ class WebRTCGateway: return pc = session.peer_connection - # Check if peer connection is already closed or closing if pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Ignoring offer for session %s - connection state: %s", - session_id, - pc.connectionState, - ) return sdp = offer.get("sdp") @@ -442,44 +391,31 @@ class WebRTCGateway: ) ) - # Check again if session was closed during setRemoteDescription if session_id not in self.sessions or pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Session %s closed during setRemoteDescription, aborting offer handling", - session_id, - ) return answer = await pc.createAnswer() - # Check again before setLocalDescription if session_id not in self.sessions or pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Session %s closed during createAnswer, aborting offer handling", - session_id, - ) return await pc.setLocalDescription(answer) - # Final check before sending answer + # Wait for ICE gathering to complete before sending the answer + # aiortc doesn't support trickle ICE, candidates are embedded in SDP after gathering + gather_timeout = 30 + gather_start = asyncio.get_event_loop().time() + while pc.iceGatheringState != "complete": + if session_id not in self.sessions or pc.connectionState in ("closed", "failed"): + return + if asyncio.get_event_loop().time() - gather_start > gather_timeout: + self.logger.warning("Session %s ICE gathering timeout", session_id) + break + await asyncio.sleep(0.1) + if session_id not in self.sessions or pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Session %s closed during setLocalDescription, skipping answer transmission", - session_id, - ) return - # Debug: Log ICE candidates in the answer SDP - sdp_lines = pc.localDescription.sdp.split("\n") - ice_candidates = [line for line in sdp_lines if line.startswith("a=candidate:")] - self.logger.debug( - "Session %s answer SDP contains %d ICE candidates: %s", - session_id, - len(ice_candidates), - ice_candidates, - ) - if self._signaling_ws: await self._signaling_ws.send_json( { @@ -506,14 +442,8 @@ class WebRTCGateway: if not session or not candidate: return - # Check if peer connection is already closed or closing pc = session.peer_connection if pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Ignoring ICE candidate for session %s - connection state: %s", - session_id, - pc.connectionState, - ) return candidate_str = candidate.get("candidate") @@ -523,35 +453,25 @@ class WebRTCGateway: if not candidate_str: return - # Create RTCIceCandidate from the SDP string try: - ice_candidate = RTCIceCandidate( - component=1, - foundation="", - ip="", - port=0, - priority=0, - protocol="udp", - type="host", - sdpMid=str(sdp_mid) if sdp_mid else None, - sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None, + # Parse ICE candidate - browser sends "candidate:..." format + if candidate_str.startswith("candidate:"): + sdp_candidate_str = candidate_str[len("candidate:") :] + else: + sdp_candidate_str = candidate_str + + ice_candidate = candidate_from_sdp(sdp_candidate_str) + ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None + ice_candidate.sdpMLineIndex = ( + int(sdp_mline_index) if sdp_mline_index is not None else None ) - # Parse the candidate string to populate the fields - ice_candidate.candidate = str(candidate_str) # type: ignore[attr-defined] - # Check if session was closed before adding candidate if session_id not in self.sessions or pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Session %s closed before adding ICE candidate, skipping", - session_id, - ) return await session.peer_connection.addIceCandidate(ice_candidate) except Exception: - self.logger.exception( - "Failed to add ICE candidate for session %s: %s", session_id, candidate - ) + self.logger.exception("Failed to add ICE candidate for session %s", session_id) async def _setup_data_channel(self, session: WebRTCSession) -> None: """Set up data channel and bridge to local WebSocket. diff --git a/music_assistant/providers/sendspin/provider.py b/music_assistant/providers/sendspin/provider.py index eb65f0e6..05f92663 100644 --- a/music_assistant/providers/sendspin/provider.py +++ b/music_assistant/providers/sendspin/provider.py @@ -12,11 +12,11 @@ from typing import TYPE_CHECKING, Any, cast import aiohttp from aiortc import ( RTCConfiguration, - RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription, ) +from aiortc.sdp import candidate_from_sdp from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer from music_assistant_models.enums import ProviderFeature @@ -188,8 +188,33 @@ class SendspinProvider(PlayerProvider): answer = await pc.createAnswer() await pc.setLocalDescription(answer) - # Wait briefly for ICE gathering - await asyncio.sleep(0.1) + # Wait for ICE gathering to complete before sending the answer + # This is required because aiortc doesn't support trickle ICE - + # candidates are only embedded in the SDP after gathering is complete + # See: https://github.com/aiortc/aiortc/issues/1344 + gather_timeout = 30 # seconds + gather_start = asyncio.get_event_loop().time() + while pc.iceGatheringState != "complete": + if pc.connectionState in ("closed", "failed"): + self.logger.debug( + "Sendspin session %s closed during ICE gathering, aborting", + session_id, + ) + raise RuntimeError("Connection closed during ICE gathering") + if asyncio.get_event_loop().time() - gather_start > gather_timeout: + self.logger.warning( + "Sendspin session %s ICE gathering timeout after %ds, sending answer anyway", + session_id, + gather_timeout, + ) + break + await asyncio.sleep(0.1) + + self.logger.debug( + "Sendspin session %s ICE gathering completed in %.1fs", + session_id, + asyncio.get_event_loop().time() - gather_start, + ) return { "session_id": session_id, @@ -223,20 +248,18 @@ class SendspinProvider(PlayerProvider): if not candidate_str: return {"success": False} - # Create RTCIceCandidate from the SDP string - ice_candidate = RTCIceCandidate( - component=1, - foundation="", - ip="", - port=0, - priority=0, - protocol="udp", - type="host", - sdpMid=str(sdp_mid) if sdp_mid else None, - sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None, + # Parse the ICE candidate using aiortc's candidate_from_sdp function + # Browser sends "candidate:..." format, candidate_from_sdp expects without prefix + if candidate_str.startswith("candidate:"): + sdp_candidate_str = candidate_str[len("candidate:") :] + else: + sdp_candidate_str = candidate_str + + ice_candidate = candidate_from_sdp(sdp_candidate_str) + ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None + ice_candidate.sdpMLineIndex = ( + int(sdp_mline_index) if sdp_mline_index is not None else None ) - # Parse the candidate string to populate the fields - ice_candidate.candidate = str(candidate_str) # type: ignore[attr-defined] await session.peer_connection.addIceCandidate(ice_candidate) return {"success": True} -- 2.34.1