From: Marcel van der Veldt Date: Thu, 18 Dec 2025 15:53:55 +0000 (+0100) Subject: Add Sendspin proxy for web player (#2840) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=4068a908065fc336df38c9bf7c8b80cf12ebda84;p=music-assistant-server.git Add Sendspin proxy for web player (#2840) --- diff --git a/music_assistant/controllers/webserver/controller.py b/music_assistant/controllers/webserver/controller.py index 15070875..3f0f4951 100644 --- a/music_assistant/controllers/webserver/controller.py +++ b/music_assistant/controllers/webserver/controller.py @@ -56,6 +56,7 @@ from .helpers.auth_middleware import ( ) from .helpers.auth_providers import BuiltinLoginProvider, get_ha_user_role from .remote_access import RemoteAccessManager +from .sendspin_proxy import SendspinProxyHandler from .websocket_client import WebsocketClientHandler if TYPE_CHECKING: @@ -93,6 +94,7 @@ class WebserverController(CoreController): self.manifest.icon = "web-box" self.auth = AuthenticationManager(self) self.remote_access = RemoteAccessManager(self) + self._sendspin_proxy = SendspinProxyHandler(self) @property def base_url(self) -> str: @@ -288,6 +290,8 @@ class WebserverController(CoreController): # add first-time setup routes routes.append(("GET", "/setup", self._handle_setup_page)) routes.append(("POST", "/setup", self._handle_setup)) + # add sendspin proxy route (authenticated WebSocket proxy to internal sendspin server) + routes.append(("GET", "/sendspin", self._sendspin_proxy.handle_sendspin_proxy)) await self.auth.setup() # start the webserver all_ip_addresses = await get_ip_addresses() diff --git a/music_assistant/controllers/webserver/remote_access/gateway.py b/music_assistant/controllers/webserver/remote_access/gateway.py index da5ab52a..323ef19b 100644 --- a/music_assistant/controllers/webserver/remote_access/gateway.py +++ b/music_assistant/controllers/webserver/remote_access/gateway.py @@ -38,11 +38,18 @@ class WebRTCSession: session_id: str peer_connection: RTCPeerConnection - data_channel: Any = None # Main API channel (ma-api) + # Main API channel (ma-api) - bridges to local MA WebSocket API + data_channel: Any = None local_ws: Any = None message_queue: asyncio.Queue[str] = field(default_factory=asyncio.Queue) forward_to_local_task: asyncio.Task[None] | None = None forward_from_local_task: asyncio.Task[None] | None = None + # Sendspin channel - bridges to internal sendspin server + sendspin_channel: Any = None + sendspin_ws: Any = None + sendspin_queue: asyncio.Queue[str | bytes] = field(default_factory=asyncio.Queue) + sendspin_to_local_task: asyncio.Task[None] | None = None + sendspin_from_local_task: asyncio.Task[None] | None = None class WebRTCGateway: @@ -77,17 +84,16 @@ class WebRTCGateway: ice_servers: list[dict[str, Any]] | None = None, ice_servers_callback: Callable[[], Awaitable[list[dict[str, Any]]]] | None = None, ) -> None: - """Initialize the WebRTC Gateway. + """ + Initialize the WebRTC Gateway. - :param http_session: Shared aiohttp ClientSession to use for HTTP/WebSocket connections. + :param http_session: Shared aiohttp ClientSession for HTTP/WebSocket connections. :param remote_id: Remote ID for this server instance. :param certificate: Persistent RTCCertificate for DTLS, enabling client-side pinning. :param signaling_url: WebSocket URL of the signaling server. :param local_ws_url: Local WebSocket URL to bridge to. :param ice_servers: List of ICE server configurations (used at registration time). :param ice_servers_callback: Optional callback to fetch fresh ICE servers for each session. - If provided, this will be called for each client connection to get fresh TURN - credentials. If not provided, the static ice_servers will be used. """ self.http_session = http_session self.signaling_url = signaling_url @@ -369,8 +375,12 @@ class WebRTCGateway: @pc.on("datachannel") def on_datachannel(channel: Any) -> None: - session.data_channel = channel - asyncio.create_task(self._setup_data_channel(session)) + if channel.label == "sendspin": + session.sendspin_channel = channel + asyncio.create_task(self._setup_sendspin_channel(session)) + else: + session.data_channel = channel + asyncio.create_task(self._setup_data_channel(session)) @pc.on("icecandidate") async def on_icecandidate(candidate: Any) -> None: @@ -658,9 +668,115 @@ class WebRTCGateway: with contextlib.suppress(asyncio.CancelledError): await session.forward_from_local_task + # Cancel sendspin forwarding tasks + if session.sendspin_to_local_task and not session.sendspin_to_local_task.done(): + session.sendspin_to_local_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await session.sendspin_to_local_task + + if session.sendspin_from_local_task and not session.sendspin_from_local_task.done(): + session.sendspin_from_local_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await session.sendspin_from_local_task + # Close connections if session.local_ws and not session.local_ws.closed: await session.local_ws.close() + if session.sendspin_ws and not session.sendspin_ws.closed: + await session.sendspin_ws.close() if session.data_channel: session.data_channel.close() + if session.sendspin_channel: + session.sendspin_channel.close() await session.peer_connection.close() + + async def _setup_sendspin_channel(self, session: WebRTCSession) -> None: + """Set up sendspin data channel and bridge to internal sendspin server. + + :param session: The WebRTC session. + """ + channel = session.sendspin_channel + if not channel: + return + + try: + session.sendspin_ws = await self.http_session.ws_connect("ws://127.0.0.1:8927/sendspin") + self.logger.debug("Sendspin channel connected for session %s", session.session_id) + + loop = asyncio.get_event_loop() + + session.sendspin_to_local_task = asyncio.create_task( + self._forward_sendspin_to_local(session) + ) + session.sendspin_from_local_task = asyncio.create_task( + self._forward_sendspin_from_local(session) + ) + + @channel.on("message") # type: ignore[untyped-decorator] + def on_message(message: str | bytes) -> None: + if session.sendspin_to_local_task and not session.sendspin_to_local_task.done(): + loop.call_soon_threadsafe(session.sendspin_queue.put_nowait, message) + + @channel.on("close") # type: ignore[untyped-decorator] + def on_close() -> None: + if session.sendspin_ws and not session.sendspin_ws.closed: + asyncio.run_coroutine_threadsafe(session.sendspin_ws.close(), loop) + + except Exception: + self.logger.exception( + "Failed to connect sendspin channel to internal server for session %s", + session.session_id, + ) + # Clean up partial state on failure + if session.sendspin_to_local_task: + session.sendspin_to_local_task.cancel() + if session.sendspin_from_local_task: + session.sendspin_from_local_task.cancel() + if session.sendspin_ws and not session.sendspin_ws.closed: + await session.sendspin_ws.close() + + async def _forward_sendspin_to_local(self, session: WebRTCSession) -> None: + """Forward messages from sendspin DataChannel to internal sendspin server. + + :param session: The WebRTC session. + """ + try: + while session.sendspin_ws and not session.sendspin_ws.closed: + message = await session.sendspin_queue.get() + if session.sendspin_ws and not session.sendspin_ws.closed: + if isinstance(message, bytes): + await session.sendspin_ws.send_bytes(message) + else: + await session.sendspin_ws.send_str(message) + except asyncio.CancelledError: + self.logger.debug( + "Sendspin forward to local task cancelled for session %s", + session.session_id, + ) + raise + except Exception: + self.logger.exception("Error forwarding sendspin to local") + + async def _forward_sendspin_from_local(self, session: WebRTCSession) -> None: + """Forward messages from internal sendspin server to sendspin DataChannel. + + :param session: The WebRTC session. + """ + if not session.sendspin_ws or session.sendspin_ws.closed: + return + + try: + async for msg in session.sendspin_ws: + if msg.type in {aiohttp.WSMsgType.TEXT, aiohttp.WSMsgType.BINARY}: + if session.sendspin_channel and session.sendspin_channel.readyState == "open": + session.sendspin_channel.send(msg.data) + elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): + break + except asyncio.CancelledError: + self.logger.debug( + "Sendspin forward from local task cancelled for session %s", + session.session_id, + ) + raise + except Exception: + self.logger.exception("Error forwarding sendspin from local") diff --git a/music_assistant/controllers/webserver/sendspin_proxy.py b/music_assistant/controllers/webserver/sendspin_proxy.py new file mode 100644 index 00000000..50385546 --- /dev/null +++ b/music_assistant/controllers/webserver/sendspin_proxy.py @@ -0,0 +1,202 @@ +"""Sendspin WebSocket proxy handler for Music Assistant. + +This module provides an authenticated WebSocket proxy to the internal Sendspin server, +allowing web clients to connect through the main webserver instead of requiring direct +access to the Sendspin port. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +import logging +from typing import TYPE_CHECKING + +from aiohttp import WSMsgType, web + +from music_assistant.constants import MASS_LOGGER_NAME + +if TYPE_CHECKING: + import aiohttp + + from music_assistant.controllers.webserver import WebserverController + +LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.sendspin_proxy") +INTERNAL_SENDSPIN_URL = "ws://127.0.0.1:8927/sendspin" + + +class SendspinProxyHandler: + """Handler for proxying WebSocket connections to the internal Sendspin server.""" + + def __init__(self, webserver: WebserverController) -> None: + """Initialize the Sendspin proxy handler. + + :param webserver: The webserver controller instance. + """ + self.webserver = webserver + self.mass = webserver.mass + self.logger = LOGGER + + async def handle_sendspin_proxy(self, request: web.Request) -> web.WebSocketResponse: + """ + Handle incoming WebSocket connection and proxy to internal Sendspin server. + + Authentication is required as the first message. The client must send: + {"type": "auth", "token": ""} + + After successful authentication, all messages are proxied bidirectionally. + + :param request: The incoming HTTP request to upgrade to WebSocket. + :return: The WebSocket response. + """ + wsock = web.WebSocketResponse(heartbeat=55) + await wsock.prepare(request) + + self.logger.debug("Sendspin proxy connection from %s", request.remote) + + try: + auth_result = await self._authenticate(wsock) + if not auth_result: + return wsock + except TimeoutError: + self.logger.warning("Auth timeout for sendspin proxy from %s", request.remote) + await wsock.close(code=4001, message=b"Authentication timeout") + return wsock + except Exception: + self.logger.exception("Auth error for sendspin proxy") + await wsock.close(code=4001, message=b"Authentication error") + return wsock + + try: + internal_ws = await self.mass.http_session.ws_connect(INTERNAL_SENDSPIN_URL) + except Exception: + self.logger.exception("Failed to connect to internal Sendspin server") + await wsock.close(code=1011, message=b"Internal server error") + return wsock + + self.logger.debug("Sendspin proxy authenticated and connected for %s", request.remote) + + try: + await self._proxy_messages(wsock, internal_ws) + finally: + if not internal_ws.closed: + await internal_ws.close() + if not wsock.closed: + await wsock.close() + + return wsock + + async def _authenticate(self, wsock: web.WebSocketResponse) -> bool: + """Wait for and validate authentication message. + + :param wsock: The client WebSocket connection. + :return: True if authentication succeeded, False otherwise. + """ + async with asyncio.timeout(10): + msg = await wsock.receive() + + if msg.type != WSMsgType.TEXT: + await wsock.close(code=4001, message=b"Expected text message for auth") + return False + + try: + auth_data = json.loads(msg.data) + except json.JSONDecodeError: + await wsock.close(code=4001, message=b"Invalid JSON in auth message") + return False + + if auth_data.get("type") != "auth": + await wsock.close(code=4001, message=b"First message must be auth") + return False + + token = auth_data.get("token") + if not token: + await wsock.close(code=4001, message=b"Token required in auth message") + return False + + user = await self.webserver.auth.authenticate_with_token(token) + if not user: + await wsock.close(code=4001, message=b"Invalid or expired token") + return False + + # Auto-whitelist player for users with player filters + client_id = auth_data.get("client_id") + if client_id and user.player_filter and client_id not in user.player_filter: + self.logger.debug( + "Auto-whitelisting Sendspin player %s for user %s", client_id, user.username + ) + new_filter = [*user.player_filter, client_id] + await self.webserver.auth.update_user_filters( + user, player_filter=new_filter, provider_filter=None + ) + + self.logger.debug("Sendspin proxy authenticated user: %s", user.username) + await wsock.send_str('{"type": "auth_ok"}') + return True + + async def _proxy_messages( + self, + client_ws: web.WebSocketResponse, + internal_ws: aiohttp.ClientWebSocketResponse, + ) -> None: + """ + Proxy messages bidirectionally between client and internal Sendspin server. + + :param client_ws: The client WebSocket connection. + :param internal_ws: The internal Sendspin server WebSocket connection. + """ + client_to_internal = asyncio.create_task( + self._forward_client_to_internal(client_ws, internal_ws) + ) + internal_to_client = asyncio.create_task( + self._forward_internal_to_client(client_ws, internal_ws) + ) + + _done, pending = await asyncio.wait( + [client_to_internal, internal_to_client], + return_when=asyncio.FIRST_COMPLETED, + ) + + for task in pending: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + async def _forward_client_to_internal( + self, + client_ws: web.WebSocketResponse, + internal_ws: aiohttp.ClientWebSocketResponse, + ) -> None: + """ + Forward messages from client to internal Sendspin server. + + :param client_ws: The client WebSocket connection. + :param internal_ws: The internal Sendspin server WebSocket connection. + """ + async for msg in client_ws: + if msg.type == WSMsgType.TEXT: + await internal_ws.send_str(msg.data) + elif msg.type == WSMsgType.BINARY: + await internal_ws.send_bytes(msg.data) + elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR): + break + + async def _forward_internal_to_client( + self, + client_ws: web.WebSocketResponse, + internal_ws: aiohttp.ClientWebSocketResponse, + ) -> None: + """ + Forward messages from internal Sendspin server to client. + + :param client_ws: The client WebSocket connection. + :param internal_ws: The internal Sendspin server WebSocket connection. + """ + async for msg in internal_ws: + if msg.type == WSMsgType.TEXT: + await client_ws.send_str(msg.data) + elif msg.type == WSMsgType.BINARY: + await client_ws.send_bytes(msg.data) + elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR): + break diff --git a/music_assistant/providers/sendspin/provider.py b/music_assistant/providers/sendspin/provider.py index bcfb16c4..8ed7aa58 100644 --- a/music_assistant/providers/sendspin/provider.py +++ b/music_assistant/providers/sendspin/provider.py @@ -3,20 +3,12 @@ from __future__ import annotations import asyncio -import contextlib -import secrets from collections.abc import Callable -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, cast -import aiohttp -from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription -from aiortc.sdp import candidate_from_sdp from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer from music_assistant_models.enums import ProviderFeature -from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user -from music_assistant.helpers.webrtc_certificate import create_peer_connection_with_certificate from music_assistant.mass import MusicAssistant from music_assistant.models.player_provider import PlayerProvider from music_assistant.providers.sendspin.player import SendspinPlayer @@ -26,24 +18,11 @@ if TYPE_CHECKING: from music_assistant_models.provider import ProviderManifest -@dataclass -class SendspinWebRTCSession: - """Represents an active WebRTC session for a Sendspin client.""" - - session_id: str - peer_connection: RTCPeerConnection - data_channel: Any = None - sendspin_ws: aiohttp.ClientWebSocketResponse | None = None - forward_task: asyncio.Task[None] | None = None - message_queue: asyncio.Queue[str | bytes] = field(default_factory=asyncio.Queue) - - class SendspinProvider(PlayerProvider): """Player Provider for Sendspin.""" server_api: SendspinServer unregister_cbs: list[Callable[[], None]] - _webrtc_sessions: dict[str, SendspinWebRTCSession] _pending_unregisters: dict[str, asyncio.Event] def __init__( @@ -54,20 +33,9 @@ class SendspinProvider(PlayerProvider): self.server_api = SendspinServer( self.mass.loop, mass.server_id, "Music Assistant", self.mass.http_session ) - self._webrtc_sessions = {} self._pending_unregisters = {} self.unregister_cbs = [ self.server_api.add_event_listener(self.event_cb), - # WebRTC signaling commands for Sendspin connections - # this is used to establish WebRTC DataChannels with Sendspin clients - # for example the WebPlayer in the Music Assistant frontend or supported (mobile) apps - self.mass.register_api_command( - "sendspin/connection_info", self.handle_get_connection_info - ), - self.mass.register_api_command("sendspin/connect", self.handle_webrtc_connect), - self.mass.register_api_command("sendspin/ice", self.handle_webrtc_ice), - self.mass.register_api_command("sendspin/disconnect", self.handle_webrtc_disconnect), - self.mass.register_api_command("sendspin/ice_servers", self.handle_get_ice_servers), ] async def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None: @@ -121,12 +89,9 @@ class SendspinProvider(PlayerProvider): Handle unload/close of the provider. Called when provider is deregistered (e.g. MA exiting or config reloading). - is_removed will be set to True when the provider is removed from the configuration. - """ - # Close all WebRTC sessions - for session_id in list(self._webrtc_sessions.keys()): - await self._close_webrtc_session(session_id) + :param is_removed: True when the provider is removed from the configuration. + """ # Stop the Sendspin server await self.server_api.close() @@ -136,300 +101,3 @@ class SendspinProvider(PlayerProvider): for player in self.players: self.logger.debug("Unloading player %s", player.name) await self.mass.players.unregister(player.player_id) - - async def handle_webrtc_connect(self, offer: dict[str, str]) -> dict[str, Any]: - """ - Handle WebRTC connection request for Sendspin. - - This command is called via an authenticated API connection. - The client sends a WebRTC offer and receives an answer. - - :param offer: WebRTC offer with 'sdp' and 'type' fields. - :return: Dictionary with session_id and WebRTC answer. - """ - session_id = secrets.token_urlsafe(16) - self.logger.debug("Creating Sendspin WebRTC session %s", session_id) - - # Get ICE servers (may include HA Cloud TURN servers) - ice_servers = await self.mass.webserver.remote_access.get_ice_servers() - self.logger.debug( - "Creating Sendspin WebRTC session with %d ICE servers", - len(ice_servers), - ) - - # Create peer connection with ICE servers and persistent certificate - config = RTCConfiguration(iceServers=[RTCIceServer(**server) for server in ice_servers]) - certificate = self.mass.webserver.remote_access.certificate - pc = create_peer_connection_with_certificate(certificate, configuration=config) - - session = SendspinWebRTCSession( - session_id=session_id, - peer_connection=pc, - ) - self._webrtc_sessions[session_id] = session - - # Track ICE candidates to send back - ice_candidates: list[dict[str, Any]] = [] - - @pc.on("icecandidate") - def on_ice_candidate(candidate: Any) -> None: - if candidate: - ice_candidates.append( - { - "candidate": candidate.candidate, - "sdpMid": candidate.sdpMid, - "sdpMLineIndex": candidate.sdpMLineIndex, - } - ) - - @pc.on("datachannel") - def on_datachannel(channel: Any) -> None: - self.logger.debug("Sendspin DataChannel opened for session %s", session_id) - session.data_channel = channel - asyncio.create_task(self._setup_sendspin_bridge(session)) - - @pc.on("connectionstatechange") - async def on_connection_state_change() -> None: - self.logger.debug( - "Sendspin WebRTC connection state: %s for session %s", - pc.connectionState, - session_id, - ) - if pc.connectionState in ("failed", "closed", "disconnected"): - await self._close_webrtc_session(session_id) - - # Set remote description (offer from client) - await pc.setRemoteDescription(RTCSessionDescription(sdp=offer["sdp"], type=offer["type"])) - - # Create and set local description (answer) - answer = await pc.createAnswer() - await pc.setLocalDescription(answer) - - # Wait for ICE gathering to complete before sending the answer - # This is required because aiortc doesn't support trickle ICE - - # candidates are only embedded in the SDP after gathering is complete - # See: https://github.com/aiortc/aiortc/issues/1344 - gather_timeout = 30 # seconds - gather_start = asyncio.get_event_loop().time() - while pc.iceGatheringState != "complete": - if pc.connectionState in ("closed", "failed"): - self.logger.debug( - "Sendspin session %s closed during ICE gathering, aborting", - session_id, - ) - raise RuntimeError("Connection closed during ICE gathering") - if asyncio.get_event_loop().time() - gather_start > gather_timeout: - self.logger.warning( - "Sendspin session %s ICE gathering timeout after %ds, sending answer anyway", - session_id, - gather_timeout, - ) - break - await asyncio.sleep(0.1) - - self.logger.debug( - "Sendspin session %s ICE gathering completed in %.1fs", - session_id, - asyncio.get_event_loop().time() - gather_start, - ) - - return { - "session_id": session_id, - "answer": { - "sdp": pc.localDescription.sdp, - "type": pc.localDescription.type, - }, - "ice_candidates": ice_candidates, - # Include local WebSocket URL for direct connection attempts - # Frontend can try this first before falling back to WebRTC - "local_ws_url": f"ws://{self.mass.streams.publish_ip}:8927/sendspin", - } - - async def handle_webrtc_ice( - self, session_id: str, candidate: dict[str, Any] - ) -> dict[str, bool]: - """ - Handle ICE candidate from client. - - :param session_id: The WebRTC session ID. - :param candidate: ICE candidate with 'candidate', 'sdpMid', 'sdpMLineIndex'. - :return: Dictionary with success status. - """ - session = self._webrtc_sessions.get(session_id) - if not session: - self.logger.warning("ICE candidate for unknown session %s", session_id) - return {"success": False} - - try: - candidate_str = candidate.get("candidate") - sdp_mid = candidate.get("sdpMid") - sdp_mline_index = candidate.get("sdpMLineIndex") - - if not candidate_str: - return {"success": False} - - # Parse the ICE candidate using aiortc's candidate_from_sdp function - # Browser sends "candidate:..." format, candidate_from_sdp expects without prefix - if candidate_str.startswith("candidate:"): - sdp_candidate_str = candidate_str[len("candidate:") :] - else: - sdp_candidate_str = candidate_str - - ice_candidate = candidate_from_sdp(sdp_candidate_str) - ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None - ice_candidate.sdpMLineIndex = ( - int(sdp_mline_index) if sdp_mline_index is not None else None - ) - - await session.peer_connection.addIceCandidate(ice_candidate) - return {"success": True} - except Exception as err: - self.logger.exception("Failed to add ICE candidate: %s", err) - return {"success": False} - - async def handle_webrtc_disconnect(self, session_id: str) -> dict[str, bool]: - """ - Handle WebRTC disconnect request. - - :param session_id: The WebRTC session ID to disconnect. - :return: Dictionary with success status. - """ - await self._close_webrtc_session(session_id) - return {"success": True} - - async def handle_get_ice_servers(self) -> list[dict[str, str]]: - """ - Get ICE servers for Sendspin WebRTC connections. - - Returns HA Cloud TURN servers if available, otherwise returns public STUN servers. - - :return: List of ICE server configurations. - """ - return await self.mass.webserver.remote_access.get_ice_servers() - - async def handle_get_connection_info(self, client_id: str | None = None) -> dict[str, Any]: - """ - Get connection info for Sendspin. - - Returns the local WebSocket URL for direct connection attempts, - and ICE servers for WebRTC fallback. - - The frontend should try the local WebSocket URL first for lower latency, - and fall back to WebRTC if the direct connection fails. - - :param client_id: Optional Sendspin client ID for auto-whitelisting. - :return: Dictionary with local_ws_url and ice_servers. - """ - # Auto-whitelist the player for users with player filters enabled - # This allows users with restricted player access to still use the web player - if client_id and (user := get_current_user()): - if user.player_filter and client_id not in user.player_filter: - self.logger.debug( - "Auto-whitelisting Sendspin player %s for user %s", - client_id, - user.username, - ) - new_filter = [*user.player_filter, client_id] - await self.mass.webserver.auth.update_user_filters( - user, player_filter=new_filter, provider_filter=None - ) - user.player_filter = new_filter - - return { - "local_ws_url": f"ws://{self.mass.streams.publish_ip}:8927/sendspin", - "ice_servers": await self.mass.webserver.remote_access.get_ice_servers(), - } - - async def _close_webrtc_session(self, session_id: str) -> None: - """Close a WebRTC session and clean up resources.""" - session = self._webrtc_sessions.pop(session_id, None) - if not session: - return - - self.logger.debug("Closing Sendspin WebRTC session %s", session_id) - - # Cancel forward task - if session.forward_task and not session.forward_task.done(): - session.forward_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await session.forward_task - - # Close Sendspin WebSocket - if session.sendspin_ws and not session.sendspin_ws.closed: - await session.sendspin_ws.close() - - # Close peer connection - await session.peer_connection.close() - - async def _forward_to_sendspin(self, session: SendspinWebRTCSession) -> None: - """Forward messages from queue to Sendspin WebSocket.""" - try: - while session.sendspin_ws and not session.sendspin_ws.closed: - msg = await session.message_queue.get() - if session.sendspin_ws and not session.sendspin_ws.closed: - if isinstance(msg, bytes): - await session.sendspin_ws.send_bytes(msg) - else: - await session.sendspin_ws.send_str(msg) - except asyncio.CancelledError: - raise - except Exception: - self.logger.exception("Error forwarding to Sendspin") - - async def _forward_from_sendspin(self, session: SendspinWebRTCSession, channel: Any) -> None: - """Forward messages from Sendspin WebSocket to DataChannel.""" - try: - ws = session.sendspin_ws - if ws is None: - return - async for msg in ws: - if msg.type in {aiohttp.WSMsgType.TEXT, aiohttp.WSMsgType.BINARY}: - if channel and channel.readyState == "open": - channel.send(msg.data) - elif msg.type in {aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED}: - break - except asyncio.CancelledError: - raise - except Exception: - self.logger.exception("Error forwarding from Sendspin") - - async def _setup_sendspin_bridge(self, session: SendspinWebRTCSession) -> None: - """Set up the bridge between WebRTC DataChannel and internal Sendspin server.""" - channel = session.data_channel - if not channel: - return - - loop = asyncio.get_event_loop() - - # Register message handler FIRST to capture any messages sent immediately - @channel.on("message") # type: ignore[untyped-decorator] - def on_message(message: str | bytes) -> None: - if session.forward_task and not session.forward_task.done(): - loop.call_soon_threadsafe(session.message_queue.put_nowait, message) - else: - # Queue message even if forward task not started yet - session.message_queue.put_nowait(message) - - @channel.on("close") # type: ignore[untyped-decorator] - def on_close() -> None: - asyncio.run_coroutine_threadsafe(self._close_webrtc_session(session.session_id), loop) - - try: - # Connect to internal Sendspin server - sendspin_url = "ws://127.0.0.1:8927/sendspin" - session.sendspin_ws = await self.mass.http_session.ws_connect(sendspin_url) - self.logger.debug( - "Connected to internal Sendspin server for session %s", session.session_id - ) - - # Start forwarding tasks - session.forward_task = asyncio.create_task(self._forward_to_sendspin(session)) - asyncio.create_task(self._forward_from_sendspin(session, channel)) - - self.logger.debug("Sendspin bridge established for session %s", session.session_id) - - except Exception: - self.logger.exception( - "Failed to setup Sendspin bridge for session %s", session.session_id - ) - await self._close_webrtc_session(session.session_id)