/** * WebSocket Handler * * 处理实时双向通信,主要用于 AI 对话流 */ import type { WSContext } from 'hono/ws'; import { getSessionManager } from './session/manager.js'; import { processMessage, cancelProcessing } from './agent/index.js'; import type { ClientMessage, ServerMessage } from './types.js'; // 存储活跃的 WebSocket 连接 const connections: Map> = new Map(); /** * 获取会话的所有连接 */ export function getSessionConnections(sessionId: string): Set { return connections.get(sessionId) || new Set(); } /** * 向会话的所有连接发送消息 */ export function broadcastToSession(sessionId: string, message: ServerMessage): void { const conns = connections.get(sessionId); if (!conns) return; const data = JSON.stringify(message); for (const ws of conns) { try { ws.send(data); } catch (error) { console.error('Failed to send message:', error); } } } /** * WebSocket 连接处理器 */ export function handleWebSocket(ws: WSContext, sessionId: string): void { const sessionManager = getSessionManager(); // 验证会话 if (!sessionManager.exists(sessionId)) { ws.send( JSON.stringify({ type: 'error', sessionId, payload: { message: 'Session not found' }, } as ServerMessage) ); ws.close(4004, 'Session not found'); return; } // 注册连接 if (!connections.has(sessionId)) { connections.set(sessionId, new Set()); } connections.get(sessionId)!.add(ws); // 更新会话状态 sessionManager.updateStatus(sessionId, 'active'); // 发送连接成功消息 ws.send( JSON.stringify({ type: 'connected', sessionId, payload: { message: 'Connected to session' }, } as ServerMessage) ); console.log(`[WS] Client connected to session: ${sessionId}`); } /** * WebSocket 消息处理器 */ export async function handleWebSocketMessage( ws: WSContext, sessionId: string, data: unknown ): Promise { const sessionManager = getSessionManager(); try { // 处理不同类型的数据 let text: string; if (typeof data === 'string') { text = data; } else if (data instanceof ArrayBuffer || data instanceof SharedArrayBuffer) { text = new TextDecoder().decode(data as ArrayBuffer); } else if (data instanceof Blob) { text = await data.text(); } else { text = String(data); } const message: ClientMessage = JSON.parse(text); switch (message.type) { case 'message': { // 用户发送消息 const content = message.payload?.content || ''; const userMessage = sessionManager.addMessage(sessionId, { role: 'user', content, }); if (userMessage) { // 广播用户消息 broadcastToSession(sessionId, { type: 'message_received', sessionId, payload: userMessage, }); // 调用 Agent 处理消息(异步,不阻塞) processMessage(sessionId, content).catch((error) => { console.error('[WS] Agent processing error:', error); }); } break; } case 'cancel': { // 取消当前操作 cancelProcessing(sessionId); broadcastToSession(sessionId, { type: 'cancelled', sessionId, payload: { message: 'Operation cancelled' }, }); break; } case 'tool_response': { // 工具执行结果 (用于人工确认场景) // TODO: 处理工具响应 break; } default: ws.send( JSON.stringify({ type: 'error', sessionId, payload: { message: `Unknown message type: ${(message as any).type}` }, } as ServerMessage) ); } } catch (error) { ws.send( JSON.stringify({ type: 'error', sessionId, payload: { message: error instanceof Error ? error.message : 'Failed to process message', }, } as ServerMessage) ); } } /** * WebSocket 关闭处理器 */ export function handleWebSocketClose(ws: WSContext, sessionId: string): void { const conns = connections.get(sessionId); if (conns) { conns.delete(ws); if (conns.size === 0) { connections.delete(sessionId); // 当没有连接时,更新会话状态 const sessionManager = getSessionManager(); sessionManager.updateStatus(sessionId, 'idle'); } } console.log(`[WS] Client disconnected from session: ${sessionId}`); } /** * 获取连接统计 */ export function getConnectionStats(): { sessions: number; connections: number } { let totalConnections = 0; for (const conns of connections.values()) { totalConnections += conns.size; } return { sessions: connections.size, connections: totalConnections, }; }