Always use webrtc for sendspin web player
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 6 Dec 2025 01:06:03 +0000 (02:06 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 6 Dec 2025 01:06:03 +0000 (02:06 +0100)
music_assistant/controllers/webserver/remote_access/__init__.py
music_assistant/controllers/webserver/remote_access/gateway.py
music_assistant/providers/sendspin/provider.py

index f65e3eba2e7ae5711399c3e85ae3a1da786a2654..b33da00bd9b9da52dbcc24d465ee17b55d8f7885 100644 (file)
@@ -168,6 +168,28 @@ class RemoteAccessManager:
             self.logger.exception("Error getting HA Cloud status: %s", err)
         return False, None
 
+    async def get_ice_servers(self) -> list[dict[str, str]]:
+        """Get ICE servers for WebRTC connections.
+
+        Returns HA Cloud TURN servers if available, otherwise returns public STUN servers.
+        This method can be called regardless of whether remote access is enabled.
+
+        :return: List of ICE server configurations.
+        """
+        # Default public STUN servers
+        default_ice_servers: list[dict[str, str]] = [
+            {"urls": "stun:stun.l.google.com:19302"},
+            {"urls": "stun:stun.cloudflare.com:3478"},
+            {"urls": "stun:stun.home-assistant.io:3478"},
+        ]
+
+        # Try to get HA Cloud ICE servers
+        _, ice_servers = await self._get_ha_cloud_status()
+        if ice_servers:
+            return ice_servers
+
+        return default_ice_servers
+
     @property
     def is_enabled(self) -> bool:
         """Return whether WebRTC remote access is enabled."""
index 39351b58e668f66fcec94e981b01e56834f06173..ad49584ee7e35e35b0ded851ab5e37225e1eee7d 100644 (file)
@@ -48,7 +48,7 @@ class WebRTCSession:
 
     session_id: str
     peer_connection: RTCPeerConnection
-    data_channel: Any = None
+    data_channel: Any = None  # Main API channel (ma-api)
     local_ws: Any = None
     message_queue: asyncio.Queue[str] = field(default_factory=asyncio.Queue)
     forward_to_local_task: asyncio.Task[None] | None = None
@@ -190,10 +190,10 @@ class WebRTCGateway:
             await asyncio.sleep(0.5)
             self.logger.debug("Sending registration")
             await self._register()
-            self._is_connected = True
+            # Note: _is_connected is set to True when we receive "registered" confirmation
             # Reset reconnect delay on successful connection
             self._current_reconnect_delay = self._reconnect_delay
-            self.logger.info("Connected and registered with signaling server")
+            self.logger.info("Registration sent, waiting for confirmation...")
 
             # Message loop
             self.logger.debug("Entering message loop")
@@ -273,12 +273,11 @@ class WebRTCGateway:
             # Server responded to our ping, connection is alive
             pass
         elif msg_type == "registered":
+            self._is_connected = True
             self.logger.info("Registered with signaling server as: %s", message.get("remoteId"))
         elif msg_type == "error":
-            self.logger.error(
-                "Signaling server error: %s",
-                message.get("message", "Unknown error"),
-            )
+            error_msg = message.get("error") or message.get("message", "Unknown error")
+            self.logger.error("Signaling server error: %s", error_msg)
         elif msg_type == "client-connected":
             session_id = message.get("sessionId")
             if session_id:
@@ -312,6 +311,10 @@ class WebRTCGateway:
 
         @pc.on("datachannel")
         def on_datachannel(channel: Any) -> None:
+            # Main API channel (ma-api)
+            self.logger.debug(
+                "Received API DataChannel '%s' for session %s", channel.label, session_id
+            )
             session.data_channel = channel
             asyncio.create_task(self._setup_data_channel(session))
 
index a7f0cc980507b2bd21b9fb811e39db828950a411..6adba80d6f23f93f9c8eeda541ad23817e2266f1 100644 (file)
@@ -2,9 +2,15 @@
 
 from __future__ import annotations
 
+import asyncio
+import contextlib
+import secrets
 from collections.abc import Callable
-from typing import TYPE_CHECKING, cast
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any, cast
 
+import aiohttp
+from aiortc import RTCConfiguration, RTCIceCandidate, RTCPeerConnection, RTCSessionDescription
 from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
 from music_assistant_models.enums import ProviderFeature
 
@@ -17,11 +23,24 @@ if TYPE_CHECKING:
     from music_assistant_models.provider import ProviderManifest
 
 
+@dataclass
+class SendspinWebRTCSession:
+    """Represents an active WebRTC session for a Sendspin client."""
+
+    session_id: str
+    peer_connection: RTCPeerConnection
+    data_channel: Any = None
+    sendspin_ws: aiohttp.ClientWebSocketResponse | None = None
+    forward_task: asyncio.Task[None] | None = None
+    message_queue: asyncio.Queue[str | bytes] = field(default_factory=asyncio.Queue)
+
+
 class SendspinProvider(PlayerProvider):
     """Player Provider for Sendspin."""
 
     server_api: SendspinServer
     unregister_cbs: list[Callable[[], None]]
+    _webrtc_sessions: dict[str, SendspinWebRTCSession]
 
     def __init__(
         self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
@@ -31,12 +50,16 @@ class SendspinProvider(PlayerProvider):
         self.server_api = SendspinServer(
             self.mass.loop, mass.server_id, "Music Assistant", self.mass.http_session
         )
+        self._webrtc_sessions = {}
         self.unregister_cbs = [
             self.server_api.add_event_listener(self.event_cb),
-            # For the web player
-            self.mass.webserver.register_dynamic_route(
-                "/sendspin", self.server_api.on_client_connect
-            ),
+            # WebRTC signaling commands for Sendspin connections
+            # this is used to establish WebRTC DataChannels with Sendspin clients
+            # for example the WebPlayer in the Music Assistant frontend or supported (mobile) apps
+            self.mass.register_api_command("sendspin/connect", self.handle_webrtc_connect),
+            self.mass.register_api_command("sendspin/ice", self.handle_webrtc_ice),
+            self.mass.register_api_command("sendspin/disconnect", self.handle_webrtc_disconnect),
+            self.mass.register_api_command("sendspin/ice_servers", self.handle_get_ice_servers),
         ]
 
     async def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None:
@@ -78,6 +101,10 @@ class SendspinProvider(PlayerProvider):
         Called when provider is deregistered (e.g. MA exiting or config reloading).
         is_removed will be set to True when the provider is removed from the configuration.
         """
+        # Close all WebRTC sessions
+        for session_id in list(self._webrtc_sessions.keys()):
+            await self._close_webrtc_session(session_id)
+
         # Stop the Sendspin server
         await self.server_api.close()
 
@@ -87,3 +114,233 @@ class SendspinProvider(PlayerProvider):
         for player in self.players:
             self.logger.debug("Unloading player %s", player.name)
             await self.mass.players.unregister(player.player_id)
+
+    async def handle_webrtc_connect(self, offer: dict[str, str]) -> dict[str, Any]:
+        """
+        Handle WebRTC connection request for Sendspin.
+
+        This command is called via an authenticated API connection.
+        The client sends a WebRTC offer and receives an answer.
+
+        :param offer: WebRTC offer with 'sdp' and 'type' fields.
+        :return: Dictionary with session_id and WebRTC answer.
+        """
+        session_id = secrets.token_urlsafe(16)
+        self.logger.debug("Creating Sendspin WebRTC session %s", session_id)
+
+        # Create peer connection with STUN servers
+        config = RTCConfiguration(iceServers=[])
+        pc = RTCPeerConnection(configuration=config)
+
+        session = SendspinWebRTCSession(
+            session_id=session_id,
+            peer_connection=pc,
+        )
+        self._webrtc_sessions[session_id] = session
+
+        # Track ICE candidates to send back
+        ice_candidates: list[dict[str, Any]] = []
+
+        @pc.on("icecandidate")
+        def on_ice_candidate(candidate: Any) -> None:
+            if candidate:
+                ice_candidates.append(
+                    {
+                        "candidate": candidate.candidate,
+                        "sdpMid": candidate.sdpMid,
+                        "sdpMLineIndex": candidate.sdpMLineIndex,
+                    }
+                )
+
+        @pc.on("datachannel")
+        def on_datachannel(channel: Any) -> None:
+            self.logger.debug("Sendspin DataChannel opened for session %s", session_id)
+            session.data_channel = channel
+            asyncio.create_task(self._setup_sendspin_bridge(session))
+
+        @pc.on("connectionstatechange")
+        async def on_connection_state_change() -> None:
+            self.logger.debug(
+                "Sendspin WebRTC connection state: %s for session %s",
+                pc.connectionState,
+                session_id,
+            )
+            if pc.connectionState in ("failed", "closed", "disconnected"):
+                await self._close_webrtc_session(session_id)
+
+        # Set remote description (offer from client)
+        await pc.setRemoteDescription(RTCSessionDescription(sdp=offer["sdp"], type=offer["type"]))
+
+        # Create and set local description (answer)
+        answer = await pc.createAnswer()
+        await pc.setLocalDescription(answer)
+
+        # Wait briefly for ICE gathering
+        await asyncio.sleep(0.1)
+
+        return {
+            "session_id": session_id,
+            "answer": {
+                "sdp": pc.localDescription.sdp,
+                "type": pc.localDescription.type,
+            },
+            "ice_candidates": ice_candidates,
+        }
+
+    async def handle_webrtc_ice(
+        self, session_id: str, candidate: dict[str, Any]
+    ) -> dict[str, bool]:
+        """
+        Handle ICE candidate from client.
+
+        :param session_id: The WebRTC session ID.
+        :param candidate: ICE candidate with 'candidate', 'sdpMid', 'sdpMLineIndex'.
+        :return: Dictionary with success status.
+        """
+        session = self._webrtc_sessions.get(session_id)
+        if not session:
+            self.logger.warning("ICE candidate for unknown session %s", session_id)
+            return {"success": False}
+
+        try:
+            candidate_str = candidate.get("candidate")
+            sdp_mid = candidate.get("sdpMid")
+            sdp_mline_index = candidate.get("sdpMLineIndex")
+
+            if not candidate_str:
+                return {"success": False}
+
+            # Create RTCIceCandidate from the SDP string
+            ice_candidate = RTCIceCandidate(
+                component=1,
+                foundation="",
+                ip="",
+                port=0,
+                priority=0,
+                protocol="udp",
+                type="host",
+                sdpMid=str(sdp_mid) if sdp_mid else None,
+                sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None,
+            )
+            # Parse the candidate string to populate the fields
+            ice_candidate.candidate = str(candidate_str)  # type: ignore[attr-defined]
+
+            await session.peer_connection.addIceCandidate(ice_candidate)
+            return {"success": True}
+        except Exception as err:
+            self.logger.exception("Failed to add ICE candidate: %s", err)
+            return {"success": False}
+
+    async def handle_webrtc_disconnect(self, session_id: str) -> dict[str, bool]:
+        """
+        Handle WebRTC disconnect request.
+
+        :param session_id: The WebRTC session ID to disconnect.
+        :return: Dictionary with success status.
+        """
+        await self._close_webrtc_session(session_id)
+        return {"success": True}
+
+    async def handle_get_ice_servers(self) -> list[dict[str, str]]:
+        """
+        Get ICE servers for Sendspin WebRTC connections.
+
+        Returns HA Cloud TURN servers if available, otherwise returns public STUN servers.
+
+        :return: List of ICE server configurations.
+        """
+        return await self.mass.webserver.remote_access.get_ice_servers()
+
+    async def _close_webrtc_session(self, session_id: str) -> None:
+        """Close a WebRTC session and clean up resources."""
+        session = self._webrtc_sessions.pop(session_id, None)
+        if not session:
+            return
+
+        self.logger.debug("Closing Sendspin WebRTC session %s", session_id)
+
+        # Cancel forward task
+        if session.forward_task and not session.forward_task.done():
+            session.forward_task.cancel()
+            with contextlib.suppress(asyncio.CancelledError):
+                await session.forward_task
+
+        # Close Sendspin WebSocket
+        if session.sendspin_ws and not session.sendspin_ws.closed:
+            await session.sendspin_ws.close()
+
+        # Close peer connection
+        await session.peer_connection.close()
+
+    async def _forward_to_sendspin(self, session: SendspinWebRTCSession) -> None:
+        """Forward messages from queue to Sendspin WebSocket."""
+        try:
+            while session.sendspin_ws and not session.sendspin_ws.closed:
+                msg = await session.message_queue.get()
+                if session.sendspin_ws and not session.sendspin_ws.closed:
+                    if isinstance(msg, bytes):
+                        await session.sendspin_ws.send_bytes(msg)
+                    else:
+                        await session.sendspin_ws.send_str(msg)
+        except asyncio.CancelledError:
+            raise
+        except Exception:
+            self.logger.exception("Error forwarding to Sendspin")
+
+    async def _forward_from_sendspin(self, session: SendspinWebRTCSession, channel: Any) -> None:
+        """Forward messages from Sendspin WebSocket to DataChannel."""
+        try:
+            ws = session.sendspin_ws
+            if ws is None:
+                return
+            async for msg in ws:
+                if msg.type in {aiohttp.WSMsgType.TEXT, aiohttp.WSMsgType.BINARY}:
+                    if channel and channel.readyState == "open":
+                        channel.send(msg.data)
+                elif msg.type in {aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED}:
+                    break
+        except asyncio.CancelledError:
+            raise
+        except Exception:
+            self.logger.exception("Error forwarding from Sendspin")
+
+    async def _setup_sendspin_bridge(self, session: SendspinWebRTCSession) -> None:
+        """Set up the bridge between WebRTC DataChannel and internal Sendspin server."""
+        channel = session.data_channel
+        if not channel:
+            return
+
+        loop = asyncio.get_event_loop()
+
+        # Register message handler FIRST to capture any messages sent immediately
+        @channel.on("message")  # type: ignore[misc]
+        def on_message(message: str | bytes) -> None:
+            if session.forward_task and not session.forward_task.done():
+                loop.call_soon_threadsafe(session.message_queue.put_nowait, message)
+            else:
+                # Queue message even if forward task not started yet
+                session.message_queue.put_nowait(message)
+
+        @channel.on("close")  # type: ignore[misc]
+        def on_close() -> None:
+            asyncio.run_coroutine_threadsafe(self._close_webrtc_session(session.session_id), loop)
+
+        try:
+            # Connect to internal Sendspin server
+            sendspin_url = "ws://127.0.0.1:8927/sendspin"
+            session.sendspin_ws = await self.mass.http_session.ws_connect(sendspin_url)
+            self.logger.debug(
+                "Connected to internal Sendspin server for session %s", session.session_id
+            )
+
+            # Start forwarding tasks
+            session.forward_task = asyncio.create_task(self._forward_to_sendspin(session))
+            asyncio.create_task(self._forward_from_sendspin(session, channel))
+
+            self.logger.debug("Sendspin bridge established for session %s", session.session_id)
+
+        except Exception:
+            self.logger.exception(
+                "Failed to setup Sendspin bridge for session %s", session.session_id
+            )
+            await self._close_webrtc_session(session.session_id)