session_id: str
peer_connection: RTCPeerConnection
- data_channel: Any = None # Main API channel (ma-api)
+ # Main API channel (ma-api) - bridges to local MA WebSocket API
+ data_channel: Any = None
local_ws: Any = None
message_queue: asyncio.Queue[str] = field(default_factory=asyncio.Queue)
forward_to_local_task: asyncio.Task[None] | None = None
forward_from_local_task: asyncio.Task[None] | None = None
+ # Sendspin channel - bridges to internal sendspin server
+ sendspin_channel: Any = None
+ sendspin_ws: Any = None
+ sendspin_queue: asyncio.Queue[str | bytes] = field(default_factory=asyncio.Queue)
+ sendspin_to_local_task: asyncio.Task[None] | None = None
+ sendspin_from_local_task: asyncio.Task[None] | None = None
class WebRTCGateway:
ice_servers: list[dict[str, Any]] | None = None,
ice_servers_callback: Callable[[], Awaitable[list[dict[str, Any]]]] | None = None,
) -> None:
- """Initialize the WebRTC Gateway.
+ """
+ Initialize the WebRTC Gateway.
- :param http_session: Shared aiohttp ClientSession to use for HTTP/WebSocket connections.
+ :param http_session: Shared aiohttp ClientSession for HTTP/WebSocket connections.
:param remote_id: Remote ID for this server instance.
:param certificate: Persistent RTCCertificate for DTLS, enabling client-side pinning.
:param signaling_url: WebSocket URL of the signaling server.
:param local_ws_url: Local WebSocket URL to bridge to.
:param ice_servers: List of ICE server configurations (used at registration time).
:param ice_servers_callback: Optional callback to fetch fresh ICE servers for each session.
- If provided, this will be called for each client connection to get fresh TURN
- credentials. If not provided, the static ice_servers will be used.
"""
self.http_session = http_session
self.signaling_url = signaling_url
@pc.on("datachannel")
def on_datachannel(channel: Any) -> None:
- session.data_channel = channel
- asyncio.create_task(self._setup_data_channel(session))
+ if channel.label == "sendspin":
+ session.sendspin_channel = channel
+ asyncio.create_task(self._setup_sendspin_channel(session))
+ else:
+ session.data_channel = channel
+ asyncio.create_task(self._setup_data_channel(session))
@pc.on("icecandidate")
async def on_icecandidate(candidate: Any) -> None:
with contextlib.suppress(asyncio.CancelledError):
await session.forward_from_local_task
+ # Cancel sendspin forwarding tasks
+ if session.sendspin_to_local_task and not session.sendspin_to_local_task.done():
+ session.sendspin_to_local_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await session.sendspin_to_local_task
+
+ if session.sendspin_from_local_task and not session.sendspin_from_local_task.done():
+ session.sendspin_from_local_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await session.sendspin_from_local_task
+
# Close connections
if session.local_ws and not session.local_ws.closed:
await session.local_ws.close()
+ if session.sendspin_ws and not session.sendspin_ws.closed:
+ await session.sendspin_ws.close()
if session.data_channel:
session.data_channel.close()
+ if session.sendspin_channel:
+ session.sendspin_channel.close()
await session.peer_connection.close()
+
+ async def _setup_sendspin_channel(self, session: WebRTCSession) -> None:
+ """Set up sendspin data channel and bridge to internal sendspin server.
+
+ :param session: The WebRTC session.
+ """
+ channel = session.sendspin_channel
+ if not channel:
+ return
+
+ try:
+ session.sendspin_ws = await self.http_session.ws_connect("ws://127.0.0.1:8927/sendspin")
+ self.logger.debug("Sendspin channel connected for session %s", session.session_id)
+
+ loop = asyncio.get_event_loop()
+
+ session.sendspin_to_local_task = asyncio.create_task(
+ self._forward_sendspin_to_local(session)
+ )
+ session.sendspin_from_local_task = asyncio.create_task(
+ self._forward_sendspin_from_local(session)
+ )
+
+ @channel.on("message") # type: ignore[untyped-decorator]
+ def on_message(message: str | bytes) -> None:
+ if session.sendspin_to_local_task and not session.sendspin_to_local_task.done():
+ loop.call_soon_threadsafe(session.sendspin_queue.put_nowait, message)
+
+ @channel.on("close") # type: ignore[untyped-decorator]
+ def on_close() -> None:
+ if session.sendspin_ws and not session.sendspin_ws.closed:
+ asyncio.run_coroutine_threadsafe(session.sendspin_ws.close(), loop)
+
+ except Exception:
+ self.logger.exception(
+ "Failed to connect sendspin channel to internal server for session %s",
+ session.session_id,
+ )
+ # Clean up partial state on failure
+ if session.sendspin_to_local_task:
+ session.sendspin_to_local_task.cancel()
+ if session.sendspin_from_local_task:
+ session.sendspin_from_local_task.cancel()
+ if session.sendspin_ws and not session.sendspin_ws.closed:
+ await session.sendspin_ws.close()
+
+ async def _forward_sendspin_to_local(self, session: WebRTCSession) -> None:
+ """Forward messages from sendspin DataChannel to internal sendspin server.
+
+ :param session: The WebRTC session.
+ """
+ try:
+ while session.sendspin_ws and not session.sendspin_ws.closed:
+ message = await session.sendspin_queue.get()
+ if session.sendspin_ws and not session.sendspin_ws.closed:
+ if isinstance(message, bytes):
+ await session.sendspin_ws.send_bytes(message)
+ else:
+ await session.sendspin_ws.send_str(message)
+ except asyncio.CancelledError:
+ self.logger.debug(
+ "Sendspin forward to local task cancelled for session %s",
+ session.session_id,
+ )
+ raise
+ except Exception:
+ self.logger.exception("Error forwarding sendspin to local")
+
+ async def _forward_sendspin_from_local(self, session: WebRTCSession) -> None:
+ """Forward messages from internal sendspin server to sendspin DataChannel.
+
+ :param session: The WebRTC session.
+ """
+ if not session.sendspin_ws or session.sendspin_ws.closed:
+ return
+
+ try:
+ async for msg in session.sendspin_ws:
+ if msg.type in {aiohttp.WSMsgType.TEXT, aiohttp.WSMsgType.BINARY}:
+ if session.sendspin_channel and session.sendspin_channel.readyState == "open":
+ session.sendspin_channel.send(msg.data)
+ elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
+ break
+ except asyncio.CancelledError:
+ self.logger.debug(
+ "Sendspin forward from local task cancelled for session %s",
+ session.session_id,
+ )
+ raise
+ except Exception:
+ self.logger.exception("Error forwarding sendspin from local")
--- /dev/null
+"""Sendspin WebSocket proxy handler for Music Assistant.
+
+This module provides an authenticated WebSocket proxy to the internal Sendspin server,
+allowing web clients to connect through the main webserver instead of requiring direct
+access to the Sendspin port.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import json
+import logging
+from typing import TYPE_CHECKING
+
+from aiohttp import WSMsgType, web
+
+from music_assistant.constants import MASS_LOGGER_NAME
+
+if TYPE_CHECKING:
+ import aiohttp
+
+ from music_assistant.controllers.webserver import WebserverController
+
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.sendspin_proxy")
+INTERNAL_SENDSPIN_URL = "ws://127.0.0.1:8927/sendspin"
+
+
+class SendspinProxyHandler:
+ """Handler for proxying WebSocket connections to the internal Sendspin server."""
+
+ def __init__(self, webserver: WebserverController) -> None:
+ """Initialize the Sendspin proxy handler.
+
+ :param webserver: The webserver controller instance.
+ """
+ self.webserver = webserver
+ self.mass = webserver.mass
+ self.logger = LOGGER
+
+ async def handle_sendspin_proxy(self, request: web.Request) -> web.WebSocketResponse:
+ """
+ Handle incoming WebSocket connection and proxy to internal Sendspin server.
+
+ Authentication is required as the first message. The client must send:
+ {"type": "auth", "token": "<access_token>"}
+
+ After successful authentication, all messages are proxied bidirectionally.
+
+ :param request: The incoming HTTP request to upgrade to WebSocket.
+ :return: The WebSocket response.
+ """
+ wsock = web.WebSocketResponse(heartbeat=55)
+ await wsock.prepare(request)
+
+ self.logger.debug("Sendspin proxy connection from %s", request.remote)
+
+ try:
+ auth_result = await self._authenticate(wsock)
+ if not auth_result:
+ return wsock
+ except TimeoutError:
+ self.logger.warning("Auth timeout for sendspin proxy from %s", request.remote)
+ await wsock.close(code=4001, message=b"Authentication timeout")
+ return wsock
+ except Exception:
+ self.logger.exception("Auth error for sendspin proxy")
+ await wsock.close(code=4001, message=b"Authentication error")
+ return wsock
+
+ try:
+ internal_ws = await self.mass.http_session.ws_connect(INTERNAL_SENDSPIN_URL)
+ except Exception:
+ self.logger.exception("Failed to connect to internal Sendspin server")
+ await wsock.close(code=1011, message=b"Internal server error")
+ return wsock
+
+ self.logger.debug("Sendspin proxy authenticated and connected for %s", request.remote)
+
+ try:
+ await self._proxy_messages(wsock, internal_ws)
+ finally:
+ if not internal_ws.closed:
+ await internal_ws.close()
+ if not wsock.closed:
+ await wsock.close()
+
+ return wsock
+
+ async def _authenticate(self, wsock: web.WebSocketResponse) -> bool:
+ """Wait for and validate authentication message.
+
+ :param wsock: The client WebSocket connection.
+ :return: True if authentication succeeded, False otherwise.
+ """
+ async with asyncio.timeout(10):
+ msg = await wsock.receive()
+
+ if msg.type != WSMsgType.TEXT:
+ await wsock.close(code=4001, message=b"Expected text message for auth")
+ return False
+
+ try:
+ auth_data = json.loads(msg.data)
+ except json.JSONDecodeError:
+ await wsock.close(code=4001, message=b"Invalid JSON in auth message")
+ return False
+
+ if auth_data.get("type") != "auth":
+ await wsock.close(code=4001, message=b"First message must be auth")
+ return False
+
+ token = auth_data.get("token")
+ if not token:
+ await wsock.close(code=4001, message=b"Token required in auth message")
+ return False
+
+ user = await self.webserver.auth.authenticate_with_token(token)
+ if not user:
+ await wsock.close(code=4001, message=b"Invalid or expired token")
+ return False
+
+ # Auto-whitelist player for users with player filters
+ client_id = auth_data.get("client_id")
+ if client_id and user.player_filter and client_id not in user.player_filter:
+ self.logger.debug(
+ "Auto-whitelisting Sendspin player %s for user %s", client_id, user.username
+ )
+ new_filter = [*user.player_filter, client_id]
+ await self.webserver.auth.update_user_filters(
+ user, player_filter=new_filter, provider_filter=None
+ )
+
+ self.logger.debug("Sendspin proxy authenticated user: %s", user.username)
+ await wsock.send_str('{"type": "auth_ok"}')
+ return True
+
+ async def _proxy_messages(
+ self,
+ client_ws: web.WebSocketResponse,
+ internal_ws: aiohttp.ClientWebSocketResponse,
+ ) -> None:
+ """
+ Proxy messages bidirectionally between client and internal Sendspin server.
+
+ :param client_ws: The client WebSocket connection.
+ :param internal_ws: The internal Sendspin server WebSocket connection.
+ """
+ client_to_internal = asyncio.create_task(
+ self._forward_client_to_internal(client_ws, internal_ws)
+ )
+ internal_to_client = asyncio.create_task(
+ self._forward_internal_to_client(client_ws, internal_ws)
+ )
+
+ _done, pending = await asyncio.wait(
+ [client_to_internal, internal_to_client],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ for task in pending:
+ task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await task
+
+ async def _forward_client_to_internal(
+ self,
+ client_ws: web.WebSocketResponse,
+ internal_ws: aiohttp.ClientWebSocketResponse,
+ ) -> None:
+ """
+ Forward messages from client to internal Sendspin server.
+
+ :param client_ws: The client WebSocket connection.
+ :param internal_ws: The internal Sendspin server WebSocket connection.
+ """
+ async for msg in client_ws:
+ if msg.type == WSMsgType.TEXT:
+ await internal_ws.send_str(msg.data)
+ elif msg.type == WSMsgType.BINARY:
+ await internal_ws.send_bytes(msg.data)
+ elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR):
+ break
+
+ async def _forward_internal_to_client(
+ self,
+ client_ws: web.WebSocketResponse,
+ internal_ws: aiohttp.ClientWebSocketResponse,
+ ) -> None:
+ """
+ Forward messages from internal Sendspin server to client.
+
+ :param client_ws: The client WebSocket connection.
+ :param internal_ws: The internal Sendspin server WebSocket connection.
+ """
+ async for msg in internal_ws:
+ if msg.type == WSMsgType.TEXT:
+ await client_ws.send_str(msg.data)
+ elif msg.type == WSMsgType.BINARY:
+ await client_ws.send_bytes(msg.data)
+ elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR):
+ break
from __future__ import annotations
import asyncio
-import contextlib
-import secrets
from collections.abc import Callable
-from dataclasses import dataclass, field
-from typing import TYPE_CHECKING, Any, cast
+from typing import TYPE_CHECKING, cast
-import aiohttp
-from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription
-from aiortc.sdp import candidate_from_sdp
from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
from music_assistant_models.enums import ProviderFeature
-from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
-from music_assistant.helpers.webrtc_certificate import create_peer_connection_with_certificate
from music_assistant.mass import MusicAssistant
from music_assistant.models.player_provider import PlayerProvider
from music_assistant.providers.sendspin.player import SendspinPlayer
from music_assistant_models.provider import ProviderManifest
-@dataclass
-class SendspinWebRTCSession:
- """Represents an active WebRTC session for a Sendspin client."""
-
- session_id: str
- peer_connection: RTCPeerConnection
- data_channel: Any = None
- sendspin_ws: aiohttp.ClientWebSocketResponse | None = None
- forward_task: asyncio.Task[None] | None = None
- message_queue: asyncio.Queue[str | bytes] = field(default_factory=asyncio.Queue)
-
-
class SendspinProvider(PlayerProvider):
"""Player Provider for Sendspin."""
server_api: SendspinServer
unregister_cbs: list[Callable[[], None]]
- _webrtc_sessions: dict[str, SendspinWebRTCSession]
_pending_unregisters: dict[str, asyncio.Event]
def __init__(
self.server_api = SendspinServer(
self.mass.loop, mass.server_id, "Music Assistant", self.mass.http_session
)
- self._webrtc_sessions = {}
self._pending_unregisters = {}
self.unregister_cbs = [
self.server_api.add_event_listener(self.event_cb),
- # WebRTC signaling commands for Sendspin connections
- # this is used to establish WebRTC DataChannels with Sendspin clients
- # for example the WebPlayer in the Music Assistant frontend or supported (mobile) apps
- self.mass.register_api_command(
- "sendspin/connection_info", self.handle_get_connection_info
- ),
- self.mass.register_api_command("sendspin/connect", self.handle_webrtc_connect),
- self.mass.register_api_command("sendspin/ice", self.handle_webrtc_ice),
- self.mass.register_api_command("sendspin/disconnect", self.handle_webrtc_disconnect),
- self.mass.register_api_command("sendspin/ice_servers", self.handle_get_ice_servers),
]
async def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None:
Handle unload/close of the provider.
Called when provider is deregistered (e.g. MA exiting or config reloading).
- is_removed will be set to True when the provider is removed from the configuration.
- """
- # Close all WebRTC sessions
- for session_id in list(self._webrtc_sessions.keys()):
- await self._close_webrtc_session(session_id)
+ :param is_removed: True when the provider is removed from the configuration.
+ """
# Stop the Sendspin server
await self.server_api.close()
for player in self.players:
self.logger.debug("Unloading player %s", player.name)
await self.mass.players.unregister(player.player_id)
-
- async def handle_webrtc_connect(self, offer: dict[str, str]) -> dict[str, Any]:
- """
- Handle WebRTC connection request for Sendspin.
-
- This command is called via an authenticated API connection.
- The client sends a WebRTC offer and receives an answer.
-
- :param offer: WebRTC offer with 'sdp' and 'type' fields.
- :return: Dictionary with session_id and WebRTC answer.
- """
- session_id = secrets.token_urlsafe(16)
- self.logger.debug("Creating Sendspin WebRTC session %s", session_id)
-
- # Get ICE servers (may include HA Cloud TURN servers)
- ice_servers = await self.mass.webserver.remote_access.get_ice_servers()
- self.logger.debug(
- "Creating Sendspin WebRTC session with %d ICE servers",
- len(ice_servers),
- )
-
- # Create peer connection with ICE servers and persistent certificate
- config = RTCConfiguration(iceServers=[RTCIceServer(**server) for server in ice_servers])
- certificate = self.mass.webserver.remote_access.certificate
- pc = create_peer_connection_with_certificate(certificate, configuration=config)
-
- session = SendspinWebRTCSession(
- session_id=session_id,
- peer_connection=pc,
- )
- self._webrtc_sessions[session_id] = session
-
- # Track ICE candidates to send back
- ice_candidates: list[dict[str, Any]] = []
-
- @pc.on("icecandidate")
- def on_ice_candidate(candidate: Any) -> None:
- if candidate:
- ice_candidates.append(
- {
- "candidate": candidate.candidate,
- "sdpMid": candidate.sdpMid,
- "sdpMLineIndex": candidate.sdpMLineIndex,
- }
- )
-
- @pc.on("datachannel")
- def on_datachannel(channel: Any) -> None:
- self.logger.debug("Sendspin DataChannel opened for session %s", session_id)
- session.data_channel = channel
- asyncio.create_task(self._setup_sendspin_bridge(session))
-
- @pc.on("connectionstatechange")
- async def on_connection_state_change() -> None:
- self.logger.debug(
- "Sendspin WebRTC connection state: %s for session %s",
- pc.connectionState,
- session_id,
- )
- if pc.connectionState in ("failed", "closed", "disconnected"):
- await self._close_webrtc_session(session_id)
-
- # Set remote description (offer from client)
- await pc.setRemoteDescription(RTCSessionDescription(sdp=offer["sdp"], type=offer["type"]))
-
- # Create and set local description (answer)
- answer = await pc.createAnswer()
- await pc.setLocalDescription(answer)
-
- # Wait for ICE gathering to complete before sending the answer
- # This is required because aiortc doesn't support trickle ICE -
- # candidates are only embedded in the SDP after gathering is complete
- # See: https://github.com/aiortc/aiortc/issues/1344
- gather_timeout = 30 # seconds
- gather_start = asyncio.get_event_loop().time()
- while pc.iceGatheringState != "complete":
- if pc.connectionState in ("closed", "failed"):
- self.logger.debug(
- "Sendspin session %s closed during ICE gathering, aborting",
- session_id,
- )
- raise RuntimeError("Connection closed during ICE gathering")
- if asyncio.get_event_loop().time() - gather_start > gather_timeout:
- self.logger.warning(
- "Sendspin session %s ICE gathering timeout after %ds, sending answer anyway",
- session_id,
- gather_timeout,
- )
- break
- await asyncio.sleep(0.1)
-
- self.logger.debug(
- "Sendspin session %s ICE gathering completed in %.1fs",
- session_id,
- asyncio.get_event_loop().time() - gather_start,
- )
-
- return {
- "session_id": session_id,
- "answer": {
- "sdp": pc.localDescription.sdp,
- "type": pc.localDescription.type,
- },
- "ice_candidates": ice_candidates,
- # Include local WebSocket URL for direct connection attempts
- # Frontend can try this first before falling back to WebRTC
- "local_ws_url": f"ws://{self.mass.streams.publish_ip}:8927/sendspin",
- }
-
- async def handle_webrtc_ice(
- self, session_id: str, candidate: dict[str, Any]
- ) -> dict[str, bool]:
- """
- Handle ICE candidate from client.
-
- :param session_id: The WebRTC session ID.
- :param candidate: ICE candidate with 'candidate', 'sdpMid', 'sdpMLineIndex'.
- :return: Dictionary with success status.
- """
- session = self._webrtc_sessions.get(session_id)
- if not session:
- self.logger.warning("ICE candidate for unknown session %s", session_id)
- return {"success": False}
-
- try:
- candidate_str = candidate.get("candidate")
- sdp_mid = candidate.get("sdpMid")
- sdp_mline_index = candidate.get("sdpMLineIndex")
-
- if not candidate_str:
- return {"success": False}
-
- # Parse the ICE candidate using aiortc's candidate_from_sdp function
- # Browser sends "candidate:..." format, candidate_from_sdp expects without prefix
- if candidate_str.startswith("candidate:"):
- sdp_candidate_str = candidate_str[len("candidate:") :]
- else:
- sdp_candidate_str = candidate_str
-
- ice_candidate = candidate_from_sdp(sdp_candidate_str)
- ice_candidate.sdpMid = str(sdp_mid) if sdp_mid else None
- ice_candidate.sdpMLineIndex = (
- int(sdp_mline_index) if sdp_mline_index is not None else None
- )
-
- await session.peer_connection.addIceCandidate(ice_candidate)
- return {"success": True}
- except Exception as err:
- self.logger.exception("Failed to add ICE candidate: %s", err)
- return {"success": False}
-
- async def handle_webrtc_disconnect(self, session_id: str) -> dict[str, bool]:
- """
- Handle WebRTC disconnect request.
-
- :param session_id: The WebRTC session ID to disconnect.
- :return: Dictionary with success status.
- """
- await self._close_webrtc_session(session_id)
- return {"success": True}
-
- async def handle_get_ice_servers(self) -> list[dict[str, str]]:
- """
- Get ICE servers for Sendspin WebRTC connections.
-
- Returns HA Cloud TURN servers if available, otherwise returns public STUN servers.
-
- :return: List of ICE server configurations.
- """
- return await self.mass.webserver.remote_access.get_ice_servers()
-
- async def handle_get_connection_info(self, client_id: str | None = None) -> dict[str, Any]:
- """
- Get connection info for Sendspin.
-
- Returns the local WebSocket URL for direct connection attempts,
- and ICE servers for WebRTC fallback.
-
- The frontend should try the local WebSocket URL first for lower latency,
- and fall back to WebRTC if the direct connection fails.
-
- :param client_id: Optional Sendspin client ID for auto-whitelisting.
- :return: Dictionary with local_ws_url and ice_servers.
- """
- # Auto-whitelist the player for users with player filters enabled
- # This allows users with restricted player access to still use the web player
- if client_id and (user := get_current_user()):
- if user.player_filter and client_id not in user.player_filter:
- self.logger.debug(
- "Auto-whitelisting Sendspin player %s for user %s",
- client_id,
- user.username,
- )
- new_filter = [*user.player_filter, client_id]
- await self.mass.webserver.auth.update_user_filters(
- user, player_filter=new_filter, provider_filter=None
- )
- user.player_filter = new_filter
-
- return {
- "local_ws_url": f"ws://{self.mass.streams.publish_ip}:8927/sendspin",
- "ice_servers": await self.mass.webserver.remote_access.get_ice_servers(),
- }
-
- async def _close_webrtc_session(self, session_id: str) -> None:
- """Close a WebRTC session and clean up resources."""
- session = self._webrtc_sessions.pop(session_id, None)
- if not session:
- return
-
- self.logger.debug("Closing Sendspin WebRTC session %s", session_id)
-
- # Cancel forward task
- if session.forward_task and not session.forward_task.done():
- session.forward_task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await session.forward_task
-
- # Close Sendspin WebSocket
- if session.sendspin_ws and not session.sendspin_ws.closed:
- await session.sendspin_ws.close()
-
- # Close peer connection
- await session.peer_connection.close()
-
- async def _forward_to_sendspin(self, session: SendspinWebRTCSession) -> None:
- """Forward messages from queue to Sendspin WebSocket."""
- try:
- while session.sendspin_ws and not session.sendspin_ws.closed:
- msg = await session.message_queue.get()
- if session.sendspin_ws and not session.sendspin_ws.closed:
- if isinstance(msg, bytes):
- await session.sendspin_ws.send_bytes(msg)
- else:
- await session.sendspin_ws.send_str(msg)
- except asyncio.CancelledError:
- raise
- except Exception:
- self.logger.exception("Error forwarding to Sendspin")
-
- async def _forward_from_sendspin(self, session: SendspinWebRTCSession, channel: Any) -> None:
- """Forward messages from Sendspin WebSocket to DataChannel."""
- try:
- ws = session.sendspin_ws
- if ws is None:
- return
- async for msg in ws:
- if msg.type in {aiohttp.WSMsgType.TEXT, aiohttp.WSMsgType.BINARY}:
- if channel and channel.readyState == "open":
- channel.send(msg.data)
- elif msg.type in {aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED}:
- break
- except asyncio.CancelledError:
- raise
- except Exception:
- self.logger.exception("Error forwarding from Sendspin")
-
- async def _setup_sendspin_bridge(self, session: SendspinWebRTCSession) -> None:
- """Set up the bridge between WebRTC DataChannel and internal Sendspin server."""
- channel = session.data_channel
- if not channel:
- return
-
- loop = asyncio.get_event_loop()
-
- # Register message handler FIRST to capture any messages sent immediately
- @channel.on("message") # type: ignore[untyped-decorator]
- def on_message(message: str | bytes) -> None:
- if session.forward_task and not session.forward_task.done():
- loop.call_soon_threadsafe(session.message_queue.put_nowait, message)
- else:
- # Queue message even if forward task not started yet
- session.message_queue.put_nowait(message)
-
- @channel.on("close") # type: ignore[untyped-decorator]
- def on_close() -> None:
- asyncio.run_coroutine_threadsafe(self._close_webrtc_session(session.session_id), loop)
-
- try:
- # Connect to internal Sendspin server
- sendspin_url = "ws://127.0.0.1:8927/sendspin"
- session.sendspin_ws = await self.mass.http_session.ws_connect(sendspin_url)
- self.logger.debug(
- "Connected to internal Sendspin server for session %s", session.session_id
- )
-
- # Start forwarding tasks
- session.forward_task = asyncio.create_task(self._forward_to_sendspin(session))
- asyncio.create_task(self._forward_from_sendspin(session, channel))
-
- self.logger.debug("Sendspin bridge established for session %s", session.session_id)
-
- except Exception:
- self.logger.exception(
- "Failed to setup Sendspin bridge for session %s", session.session_id
- )
- await self._close_webrtc_session(session.session_id)