From: Marcel van der Veldt Date: Sat, 6 Dec 2025 01:06:03 +0000 (+0100) Subject: Always use webrtc for sendspin web player X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9df44b991cf8d432406a1ee1ead01057d60dcc64;p=music-assistant-server.git Always use webrtc for sendspin web player --- diff --git a/music_assistant/controllers/webserver/remote_access/__init__.py b/music_assistant/controllers/webserver/remote_access/__init__.py index f65e3eba..b33da00b 100644 --- a/music_assistant/controllers/webserver/remote_access/__init__.py +++ b/music_assistant/controllers/webserver/remote_access/__init__.py @@ -168,6 +168,28 @@ class RemoteAccessManager: self.logger.exception("Error getting HA Cloud status: %s", err) return False, None + async def get_ice_servers(self) -> list[dict[str, str]]: + """Get ICE servers for WebRTC connections. + + Returns HA Cloud TURN servers if available, otherwise returns public STUN servers. + This method can be called regardless of whether remote access is enabled. + + :return: List of ICE server configurations. + """ + # Default public STUN servers + default_ice_servers: list[dict[str, str]] = [ + {"urls": "stun:stun.l.google.com:19302"}, + {"urls": "stun:stun.cloudflare.com:3478"}, + {"urls": "stun:stun.home-assistant.io:3478"}, + ] + + # Try to get HA Cloud ICE servers + _, ice_servers = await self._get_ha_cloud_status() + if ice_servers: + return ice_servers + + return default_ice_servers + @property def is_enabled(self) -> bool: """Return whether WebRTC remote access is enabled.""" diff --git a/music_assistant/controllers/webserver/remote_access/gateway.py b/music_assistant/controllers/webserver/remote_access/gateway.py index 39351b58..ad49584e 100644 --- a/music_assistant/controllers/webserver/remote_access/gateway.py +++ b/music_assistant/controllers/webserver/remote_access/gateway.py @@ -48,7 +48,7 @@ class WebRTCSession: session_id: str peer_connection: RTCPeerConnection - data_channel: Any = None + data_channel: Any = None # Main API channel (ma-api) local_ws: Any = None message_queue: asyncio.Queue[str] = field(default_factory=asyncio.Queue) forward_to_local_task: asyncio.Task[None] | None = None @@ -190,10 +190,10 @@ class WebRTCGateway: await asyncio.sleep(0.5) self.logger.debug("Sending registration") await self._register() - self._is_connected = True + # Note: _is_connected is set to True when we receive "registered" confirmation # Reset reconnect delay on successful connection self._current_reconnect_delay = self._reconnect_delay - self.logger.info("Connected and registered with signaling server") + self.logger.info("Registration sent, waiting for confirmation...") # Message loop self.logger.debug("Entering message loop") @@ -273,12 +273,11 @@ class WebRTCGateway: # Server responded to our ping, connection is alive pass elif msg_type == "registered": + self._is_connected = True self.logger.info("Registered with signaling server as: %s", message.get("remoteId")) elif msg_type == "error": - self.logger.error( - "Signaling server error: %s", - message.get("message", "Unknown error"), - ) + error_msg = message.get("error") or message.get("message", "Unknown error") + self.logger.error("Signaling server error: %s", error_msg) elif msg_type == "client-connected": session_id = message.get("sessionId") if session_id: @@ -312,6 +311,10 @@ class WebRTCGateway: @pc.on("datachannel") def on_datachannel(channel: Any) -> None: + # Main API channel (ma-api) + self.logger.debug( + "Received API DataChannel '%s' for session %s", channel.label, session_id + ) session.data_channel = channel asyncio.create_task(self._setup_data_channel(session)) diff --git a/music_assistant/providers/sendspin/provider.py b/music_assistant/providers/sendspin/provider.py index a7f0cc98..6adba80d 100644 --- a/music_assistant/providers/sendspin/provider.py +++ b/music_assistant/providers/sendspin/provider.py @@ -2,9 +2,15 @@ from __future__ import annotations +import asyncio +import contextlib +import secrets from collections.abc import Callable -from typing import TYPE_CHECKING, cast +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, cast +import aiohttp +from aiortc import RTCConfiguration, RTCIceCandidate, RTCPeerConnection, RTCSessionDescription from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer from music_assistant_models.enums import ProviderFeature @@ -17,11 +23,24 @@ if TYPE_CHECKING: from music_assistant_models.provider import ProviderManifest +@dataclass +class SendspinWebRTCSession: + """Represents an active WebRTC session for a Sendspin client.""" + + session_id: str + peer_connection: RTCPeerConnection + data_channel: Any = None + sendspin_ws: aiohttp.ClientWebSocketResponse | None = None + forward_task: asyncio.Task[None] | None = None + message_queue: asyncio.Queue[str | bytes] = field(default_factory=asyncio.Queue) + + class SendspinProvider(PlayerProvider): """Player Provider for Sendspin.""" server_api: SendspinServer unregister_cbs: list[Callable[[], None]] + _webrtc_sessions: dict[str, SendspinWebRTCSession] def __init__( self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig @@ -31,12 +50,16 @@ class SendspinProvider(PlayerProvider): self.server_api = SendspinServer( self.mass.loop, mass.server_id, "Music Assistant", self.mass.http_session ) + self._webrtc_sessions = {} self.unregister_cbs = [ self.server_api.add_event_listener(self.event_cb), - # For the web player - self.mass.webserver.register_dynamic_route( - "/sendspin", self.server_api.on_client_connect - ), + # WebRTC signaling commands for Sendspin connections + # this is used to establish WebRTC DataChannels with Sendspin clients + # for example the WebPlayer in the Music Assistant frontend or supported (mobile) apps + self.mass.register_api_command("sendspin/connect", self.handle_webrtc_connect), + self.mass.register_api_command("sendspin/ice", self.handle_webrtc_ice), + self.mass.register_api_command("sendspin/disconnect", self.handle_webrtc_disconnect), + self.mass.register_api_command("sendspin/ice_servers", self.handle_get_ice_servers), ] async def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None: @@ -78,6 +101,10 @@ class SendspinProvider(PlayerProvider): Called when provider is deregistered (e.g. MA exiting or config reloading). is_removed will be set to True when the provider is removed from the configuration. """ + # Close all WebRTC sessions + for session_id in list(self._webrtc_sessions.keys()): + await self._close_webrtc_session(session_id) + # Stop the Sendspin server await self.server_api.close() @@ -87,3 +114,233 @@ class SendspinProvider(PlayerProvider): for player in self.players: self.logger.debug("Unloading player %s", player.name) await self.mass.players.unregister(player.player_id) + + async def handle_webrtc_connect(self, offer: dict[str, str]) -> dict[str, Any]: + """ + Handle WebRTC connection request for Sendspin. + + This command is called via an authenticated API connection. + The client sends a WebRTC offer and receives an answer. + + :param offer: WebRTC offer with 'sdp' and 'type' fields. + :return: Dictionary with session_id and WebRTC answer. + """ + session_id = secrets.token_urlsafe(16) + self.logger.debug("Creating Sendspin WebRTC session %s", session_id) + + # Create peer connection with STUN servers + config = RTCConfiguration(iceServers=[]) + pc = RTCPeerConnection(configuration=config) + + session = SendspinWebRTCSession( + session_id=session_id, + peer_connection=pc, + ) + self._webrtc_sessions[session_id] = session + + # Track ICE candidates to send back + ice_candidates: list[dict[str, Any]] = [] + + @pc.on("icecandidate") + def on_ice_candidate(candidate: Any) -> None: + if candidate: + ice_candidates.append( + { + "candidate": candidate.candidate, + "sdpMid": candidate.sdpMid, + "sdpMLineIndex": candidate.sdpMLineIndex, + } + ) + + @pc.on("datachannel") + def on_datachannel(channel: Any) -> None: + self.logger.debug("Sendspin DataChannel opened for session %s", session_id) + session.data_channel = channel + asyncio.create_task(self._setup_sendspin_bridge(session)) + + @pc.on("connectionstatechange") + async def on_connection_state_change() -> None: + self.logger.debug( + "Sendspin WebRTC connection state: %s for session %s", + pc.connectionState, + session_id, + ) + if pc.connectionState in ("failed", "closed", "disconnected"): + await self._close_webrtc_session(session_id) + + # Set remote description (offer from client) + await pc.setRemoteDescription(RTCSessionDescription(sdp=offer["sdp"], type=offer["type"])) + + # Create and set local description (answer) + answer = await pc.createAnswer() + await pc.setLocalDescription(answer) + + # Wait briefly for ICE gathering + await asyncio.sleep(0.1) + + return { + "session_id": session_id, + "answer": { + "sdp": pc.localDescription.sdp, + "type": pc.localDescription.type, + }, + "ice_candidates": ice_candidates, + } + + async def handle_webrtc_ice( + self, session_id: str, candidate: dict[str, Any] + ) -> dict[str, bool]: + """ + Handle ICE candidate from client. + + :param session_id: The WebRTC session ID. + :param candidate: ICE candidate with 'candidate', 'sdpMid', 'sdpMLineIndex'. + :return: Dictionary with success status. + """ + session = self._webrtc_sessions.get(session_id) + if not session: + self.logger.warning("ICE candidate for unknown session %s", session_id) + return {"success": False} + + try: + candidate_str = candidate.get("candidate") + sdp_mid = candidate.get("sdpMid") + sdp_mline_index = candidate.get("sdpMLineIndex") + + if not candidate_str: + return {"success": False} + + # Create RTCIceCandidate from the SDP string + ice_candidate = RTCIceCandidate( + component=1, + foundation="", + ip="", + port=0, + priority=0, + protocol="udp", + type="host", + sdpMid=str(sdp_mid) if sdp_mid else None, + sdpMLineIndex=int(sdp_mline_index) if sdp_mline_index is not None else None, + ) + # Parse the candidate string to populate the fields + ice_candidate.candidate = str(candidate_str) # type: ignore[attr-defined] + + await session.peer_connection.addIceCandidate(ice_candidate) + return {"success": True} + except Exception as err: + self.logger.exception("Failed to add ICE candidate: %s", err) + return {"success": False} + + async def handle_webrtc_disconnect(self, session_id: str) -> dict[str, bool]: + """ + Handle WebRTC disconnect request. + + :param session_id: The WebRTC session ID to disconnect. + :return: Dictionary with success status. + """ + await self._close_webrtc_session(session_id) + return {"success": True} + + async def handle_get_ice_servers(self) -> list[dict[str, str]]: + """ + Get ICE servers for Sendspin WebRTC connections. + + Returns HA Cloud TURN servers if available, otherwise returns public STUN servers. + + :return: List of ICE server configurations. + """ + return await self.mass.webserver.remote_access.get_ice_servers() + + async def _close_webrtc_session(self, session_id: str) -> None: + """Close a WebRTC session and clean up resources.""" + session = self._webrtc_sessions.pop(session_id, None) + if not session: + return + + self.logger.debug("Closing Sendspin WebRTC session %s", session_id) + + # Cancel forward task + if session.forward_task and not session.forward_task.done(): + session.forward_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await session.forward_task + + # Close Sendspin WebSocket + if session.sendspin_ws and not session.sendspin_ws.closed: + await session.sendspin_ws.close() + + # Close peer connection + await session.peer_connection.close() + + async def _forward_to_sendspin(self, session: SendspinWebRTCSession) -> None: + """Forward messages from queue to Sendspin WebSocket.""" + try: + while session.sendspin_ws and not session.sendspin_ws.closed: + msg = await session.message_queue.get() + if session.sendspin_ws and not session.sendspin_ws.closed: + if isinstance(msg, bytes): + await session.sendspin_ws.send_bytes(msg) + else: + await session.sendspin_ws.send_str(msg) + except asyncio.CancelledError: + raise + except Exception: + self.logger.exception("Error forwarding to Sendspin") + + async def _forward_from_sendspin(self, session: SendspinWebRTCSession, channel: Any) -> None: + """Forward messages from Sendspin WebSocket to DataChannel.""" + try: + ws = session.sendspin_ws + if ws is None: + return + async for msg in ws: + if msg.type in {aiohttp.WSMsgType.TEXT, aiohttp.WSMsgType.BINARY}: + if channel and channel.readyState == "open": + channel.send(msg.data) + elif msg.type in {aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED}: + break + except asyncio.CancelledError: + raise + except Exception: + self.logger.exception("Error forwarding from Sendspin") + + async def _setup_sendspin_bridge(self, session: SendspinWebRTCSession) -> None: + """Set up the bridge between WebRTC DataChannel and internal Sendspin server.""" + channel = session.data_channel + if not channel: + return + + loop = asyncio.get_event_loop() + + # Register message handler FIRST to capture any messages sent immediately + @channel.on("message") # type: ignore[misc] + def on_message(message: str | bytes) -> None: + if session.forward_task and not session.forward_task.done(): + loop.call_soon_threadsafe(session.message_queue.put_nowait, message) + else: + # Queue message even if forward task not started yet + session.message_queue.put_nowait(message) + + @channel.on("close") # type: ignore[misc] + def on_close() -> None: + asyncio.run_coroutine_threadsafe(self._close_webrtc_session(session.session_id), loop) + + try: + # Connect to internal Sendspin server + sendspin_url = "ws://127.0.0.1:8927/sendspin" + session.sendspin_ws = await self.mass.http_session.ws_connect(sendspin_url) + self.logger.debug( + "Connected to internal Sendspin server for session %s", session.session_id + ) + + # Start forwarding tasks + session.forward_task = asyncio.create_task(self._forward_to_sendspin(session)) + asyncio.create_task(self._forward_from_sendspin(session, channel)) + + self.logger.debug("Sendspin bridge established for session %s", session.session_id) + + except Exception: + self.logger.exception( + "Failed to setup Sendspin bridge for session %s", session.session_id + ) + await self._close_webrtc_session(session.session_id)