/** * SSE (Server-Sent Events) Handler * * 处理单向事件推送,用于状态更新、日志流、进度通知等 */ import type { Context } from 'hono'; import { streamSSE } from 'hono/streaming'; import { getSessionManager } from './session/manager.js'; import type { SSEEvent } from './types.js'; // 存储 SSE 订阅者 interface SSESubscriber { sessionId: string; controller: ReadableStreamDefaultController; active: boolean; } const subscribers: Map> = new Map(); /** * 向会话的所有订阅者发送 SSE 事件 */ export function emitEvent(sessionId: string, event: SSEEvent): void { const subs = subscribers.get(sessionId); if (!subs) return; const eventString = formatSSEMessage(event); for (const sub of subs) { if (!sub.active) continue; try { sub.controller.enqueue(eventString); } catch (error) { // 订阅者已断开 sub.active = false; } } } /** * 广播事件到所有会话 */ export function broadcastEvent(event: SSEEvent): void { for (const sessionId of subscribers.keys()) { emitEvent(sessionId, event); } } /** * 格式化 SSE 消息 */ function formatSSEMessage(event: SSEEvent): string { const lines: string[] = []; if (event.event) { lines.push(`event: ${event.event}`); } const data = JSON.stringify(event.data); lines.push(`data: ${data}`); lines.push(''); // 空行结束消息 return lines.join('\n') + '\n'; } /** * SSE 路由处理器 * * GET /api/sessions/:id/events */ export async function handleSSE(c: Context): Promise { const sessionId = c.req.param('id'); const sessionManager = getSessionManager(); // 验证会话 if (!sessionManager.exists(sessionId)) { return c.json( { success: false, error: 'Session not found', }, 404 ); } return streamSSE(c, async (stream) => { // 创建订阅者 const subscriber: SSESubscriber = { sessionId, controller: null as any, // Will be set by stream internals active: true, }; // 注册订阅者 if (!subscribers.has(sessionId)) { subscribers.set(sessionId, new Set()); } subscribers.get(sessionId)!.add(subscriber); console.log(`[SSE] Client subscribed to session: ${sessionId}`); // 发送初始连接事件 await stream.writeSSE({ event: 'connected', data: JSON.stringify({ timestamp: Date.now(), payload: { sessionId }, }), }); // 发送心跳保持连接 const heartbeatInterval = setInterval(async () => { if (!subscriber.active) { clearInterval(heartbeatInterval); return; } try { await stream.writeSSE({ event: 'heartbeat', data: JSON.stringify({ timestamp: Date.now(), payload: null, }), }); } catch { subscriber.active = false; clearInterval(heartbeatInterval); } }, 30000); // 每 30 秒发送心跳 // 监听关闭 stream.onAbort(() => { subscriber.active = false; clearInterval(heartbeatInterval); const subs = subscribers.get(sessionId); if (subs) { subs.delete(subscriber); if (subs.size === 0) { subscribers.delete(sessionId); } } console.log(`[SSE] Client unsubscribed from session: ${sessionId}`); }); // 保持连接打开 // 事件将通过 emitEvent() 发送 while (subscriber.active) { await new Promise((resolve) => setTimeout(resolve, 1000)); } }); } /** * 发送状态更新事件 */ export function emitStatusEvent( sessionId: string, status: string, details?: Record ): void { emitEvent(sessionId, { event: 'status', data: { timestamp: Date.now(), payload: { status, ...details }, }, }); } /** * 发送日志事件 */ export function emitLogEvent( sessionId: string, level: 'info' | 'warn' | 'error', message: string ): void { emitEvent(sessionId, { event: 'log', data: { timestamp: Date.now(), payload: { level, message }, }, }); } /** * 发送进度事件 */ export function emitProgressEvent( sessionId: string, progress: number, message?: string ): void { emitEvent(sessionId, { event: 'progress', data: { timestamp: Date.now(), payload: { progress, message }, }, }); } /** * 发送文件变更事件 */ export function emitFileChangeEvent( sessionId: string, type: 'created' | 'modified' | 'deleted', path: string ): void { emitEvent(sessionId, { event: 'file_change', data: { timestamp: Date.now(), payload: { type, path }, }, }); } /** * 获取订阅统计 */ export function getSSEStats(): { sessions: number; subscribers: number } { let totalSubscribers = 0; for (const subs of subscribers.values()) { totalSubscribers += subs.size; } return { sessions: subscribers.size, subscribers: totalSubscribers, }; }