return
try:
- session.sendspin_ws = await self.http_session.ws_connect("ws://127.0.0.1:8927/sendspin")
- self.logger.debug("Sendspin channel connected for session %s", session.session_id)
-
loop = asyncio.get_event_loop()
- session.sendspin_to_local_task = asyncio.create_task(
- self._forward_sendspin_to_local(session)
- )
- session.sendspin_from_local_task = asyncio.create_task(
- self._forward_sendspin_from_local(session)
- )
-
@channel.on("message") # type: ignore[untyped-decorator]
def on_message(message: str | bytes) -> None:
- if session.sendspin_to_local_task and not session.sendspin_to_local_task.done():
+ # Queue if task not yet created (None) or still running.
+ # Only drop when task exists and is done (shutdown).
+ if (
+ session.sendspin_to_local_task is None
+ or not session.sendspin_to_local_task.done()
+ ):
loop.call_soon_threadsafe(session.sendspin_queue.put_nowait, message)
@channel.on("close") # type: ignore[untyped-decorator]
if session.sendspin_ws and not session.sendspin_ws.closed:
asyncio.run_coroutine_threadsafe(session.sendspin_ws.close(), loop)
+ session.sendspin_ws = await self.http_session.ws_connect("ws://127.0.0.1:8927/sendspin")
+ self.logger.debug("Sendspin channel connected for session %s", session.session_id)
+
+ # Start forwarding tasks - queued messages will be processed
+ session.sendspin_to_local_task = asyncio.create_task(
+ self._forward_sendspin_to_local(session)
+ )
+ session.sendspin_from_local_task = asyncio.create_task(
+ self._forward_sendspin_from_local(session)
+ )
+
except Exception:
self.logger.exception(
"Failed to connect sendspin channel to internal server for session %s",