import aiohttp
from aiortc import (
RTCConfiguration,
- RTCIceCandidate,
RTCIceServer,
RTCPeerConnection,
RTCSessionDescription,
)
+from aiortc.sdp import candidate_from_sdp
from music_assistant.constants import MASS_LOGGER_NAME
async def _register(self) -> None:
"""Register with the signaling server."""
if self._signaling_ws:
- # Prepare ICE servers for the signaling server to relay to clients
- # Mask credentials in logs for security
- ice_servers_for_log = [
- {k: v if k != "credential" else "***" for k, v in s.items()}
- for s in self.ice_servers
- ]
- registration_msg = {
- "type": "register-server",
- "remoteId": self.remote_id,
- "iceServers": self.ice_servers,
- }
- self.logger.debug(
- "Sending registration with Remote ID: %s and %d ICE servers: %s",
- self.remote_id,
- len(self.ice_servers),
- ice_servers_for_log,
+ await self._signaling_ws.send_json(
+ {
+ "type": "register-server",
+ "remoteId": self.remote_id,
+ "iceServers": self.ice_servers,
+ }
)
- await self._signaling_ws.send_json(registration_msg)
- self.logger.debug("Registration message sent successfully")
- else:
- self.logger.warning("Cannot register: signaling websocket is not connected")
async def _handle_signaling_message(self, message: dict[str, Any]) -> None:
"""Handle incoming signaling messages.
:param message: The signaling message.
"""
msg_type = message.get("type")
- self.logger.debug("Received signaling message: %s - Full message: %s", msg_type, message)
if msg_type == "ping":
- # Respond to ping with pong
if self._signaling_ws:
await self._signaling_ws.send_json({"type": "pong"})
elif msg_type == "pong":
- # Server responded to our ping, connection is alive
pass
elif msg_type == "registered":
self._is_connected = True
session_id = message.get("sessionId")
if session_id:
await self._create_session(session_id)
- # Send session-ready with fresh ICE servers back to signaling server
- # This allows the signaling server to relay fresh TURN credentials to the client
+ # Send session-ready with fresh ICE servers for the client
fresh_ice_servers = await self._get_fresh_ice_servers()
if self._signaling_ws:
await self._signaling_ws.send_json(
:param session_id: The session ID.
"""
- # Get fresh ICE servers for this session (important for TURN credentials that expire)
session_ice_servers = await self._get_fresh_ice_servers()
- self.logger.debug(
- "Creating session %s with %d ICE servers: %s",
- session_id,
- len(session_ice_servers),
- [
- {k: v if k != "credential" else "***" for k, v in s.items()}
- for s in session_ice_servers
- ],
- )
config = RTCConfiguration(
iceServers=[RTCIceServer(**server) for server in session_ice_servers]
)
@pc.on("datachannel")
def on_datachannel(channel: Any) -> None:
- # Main API channel (ma-api)
- self.logger.debug(
- "Received API DataChannel '%s' for session %s", channel.label, session_id
- )
session.data_channel = channel
asyncio.create_task(self._setup_data_channel(session))
@pc.on("connectionstatechange")
async def on_connectionstatechange() -> None:
- self.logger.debug("Session %s connection state: %s", session_id, pc.connectionState)
if pc.connectionState == "failed":
await self._close_session(session_id)
- @pc.on("iceconnectionstatechange")
- async def on_iceconnectionstatechange() -> None:
- self.logger.debug(
- "Session %s ICE connection state: %s", session_id, pc.iceConnectionState
- )
-
- @pc.on("icegatheringstatechange")
- async def on_icegatheringstatechange() -> None:
- self.logger.debug(
- "Session %s ICE gathering state: %s", session_id, pc.iceGatheringState
- )
-
async def _handle_offer(self, session_id: str, offer: dict[str, Any]) -> None:
"""Handle incoming WebRTC offer.
return
pc = session.peer_connection
- # Check if peer connection is already closed or closing
if pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Ignoring offer for session %s - connection state: %s",
- session_id,
- pc.connectionState,
- )
return
sdp = offer.get("sdp")
)
)
- # Check again if session was closed during setRemoteDescription
if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Session %s closed during setRemoteDescription, aborting offer handling",
- session_id,
- )
return
answer = await pc.createAnswer()
- # Check again before setLocalDescription
if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Session %s closed during createAnswer, aborting offer handling",
- session_id,
- )
return
await pc.setLocalDescription(answer)
- # Final check before sending answer
+ # Wait for ICE gathering to complete before sending the answer
+ # aiortc doesn't support trickle ICE, candidates are embedded in SDP after gathering
+ gather_timeout = 30
+ gather_start = asyncio.get_event_loop().time()
+ while pc.iceGatheringState != "complete":
+ if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
+ return
+ if asyncio.get_event_loop().time() - gather_start > gather_timeout:
+ self.logger.warning("Session %s ICE gathering timeout", session_id)
+ break
+ await asyncio.sleep(0.1)
+
if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Session %s closed during setLocalDescription, skipping answer transmission",
- session_id,
- )
return
- # Debug: Log ICE candidates in the answer SDP
- sdp_lines = pc.localDescription.sdp.split("\n")
- ice_candidates = [line for line in sdp_lines if line.startswith("a=candidate:")]
- self.logger.debug(
- "Session %s answer SDP contains %d ICE candidates: %s",
- session_id,
- len(ice_candidates),
- ice_candidates,
- )
-
if self._signaling_ws:
await self._signaling_ws.send_json(
{
if not session or not candidate:
return
- # Check if peer connection is already closed or closing
pc = session.peer_connection
if pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Ignoring ICE candidate for session %s - connection state: %s",
- session_id,
- pc.connectionState,
- )
return
candidate_str = candidate.get("candidate")
if not candidate_str:
return
- # Create RTCIceCandidate from the SDP string
try:
- ice_candidate = RTCIceCandidate(
- component=1,
- foundation="",
- ip="",
- port=0,
- priority=0,
- protocol="udp",
- type="host",
- sdpMid=str(sdp_mid) if sdp_mid else None,
- sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None,
+ # Parse ICE candidate - browser sends "candidate:..." format
+ if candidate_str.startswith("candidate:"):
+ sdp_candidate_str = candidate_str[len("candidate:") :]
+ else:
+ sdp_candidate_str = candidate_str
+
+ ice_candidate = candidate_from_sdp(sdp_candidate_str)
+ ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None
+ ice_candidate.sdpMLineIndex = (
+ int(sdp_mline_index) if sdp_mline_index is not None else None
)
- # Parse the candidate string to populate the fields
- ice_candidate.candidate = str(candidate_str) # type: ignore[attr-defined]
- # Check if session was closed before adding candidate
if session_id not in self.sessions or pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Session %s closed before adding ICE candidate, skipping",
- session_id,
- )
return
await session.peer_connection.addIceCandidate(ice_candidate)
except Exception:
- self.logger.exception(
- "Failed to add ICE candidate for session %s: %s", session_id, candidate
- )
+ self.logger.exception("Failed to add ICE candidate for session %s", session_id)
async def _setup_data_channel(self, session: WebRTCSession) -> None:
"""Set up data channel and bridge to local WebSocket.
import aiohttp
from aiortc import (
RTCConfiguration,
- RTCIceCandidate,
RTCIceServer,
RTCPeerConnection,
RTCSessionDescription,
)
+from aiortc.sdp import candidate_from_sdp
from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
from music_assistant_models.enums import ProviderFeature
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
- # Wait briefly for ICE gathering
- await asyncio.sleep(0.1)
+ # Wait for ICE gathering to complete before sending the answer
+ # This is required because aiortc doesn't support trickle ICE -
+ # candidates are only embedded in the SDP after gathering is complete
+ # See: https://github.com/aiortc/aiortc/issues/1344
+ gather_timeout = 30 # seconds
+ gather_start = asyncio.get_event_loop().time()
+ while pc.iceGatheringState != "complete":
+ if pc.connectionState in ("closed", "failed"):
+ self.logger.debug(
+ "Sendspin session %s closed during ICE gathering, aborting",
+ session_id,
+ )
+ raise RuntimeError("Connection closed during ICE gathering")
+ if asyncio.get_event_loop().time() - gather_start > gather_timeout:
+ self.logger.warning(
+ "Sendspin session %s ICE gathering timeout after %ds, sending answer anyway",
+ session_id,
+ gather_timeout,
+ )
+ break
+ await asyncio.sleep(0.1)
+
+ self.logger.debug(
+ "Sendspin session %s ICE gathering completed in %.1fs",
+ session_id,
+ asyncio.get_event_loop().time() - gather_start,
+ )
return {
"session_id": session_id,
if not candidate_str:
return {"success": False}
- # Create RTCIceCandidate from the SDP string
- ice_candidate = RTCIceCandidate(
- component=1,
- foundation="",
- ip="",
- port=0,
- priority=0,
- protocol="udp",
- type="host",
- sdpMid=str(sdp_mid) if sdp_mid else None,
- sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None,
+ # Parse the ICE candidate using aiortc's candidate_from_sdp function
+ # Browser sends "candidate:..." format, candidate_from_sdp expects without prefix
+ if candidate_str.startswith("candidate:"):
+ sdp_candidate_str = candidate_str[len("candidate:") :]
+ else:
+ sdp_candidate_str = candidate_str
+
+ ice_candidate = candidate_from_sdp(sdp_candidate_str)
+ ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None
+ ice_candidate.sdpMLineIndex = (
+ int(sdp_mline_index) if sdp_mline_index is not None else None
)
- # Parse the candidate string to populate the fields
- ice_candidate.candidate = str(candidate_str) # type: ignore[attr-defined]
await session.peer_connection.addIceCandidate(ice_candidate)
return {"success": True}