refactor(storage): 统一消息存储到 Core 层

问题:Server 端只存储最终文本响应,工具调用的中间消息丢失。

解决方案:
- Agent.chat() 返回 ChatResult,包含完整消息链
- Server SessionManager 简化为只管理会话元数据
- 消息 API 改为从 Core Storage 读取
- 移除 Server 端的消息存储和 addMessage 方法

影响范围:
- core: Agent.chat() 返回类型变更
- server: SessionManager 接口变更,移除消息存储
- server: GET /sessions/:id/messages 从 Core 读取
- server: 移除 POST /sessions/:id/messages 端点
This commit is contained in:
2025-12-15 10:04:22 +08:00
parent a657af9bb7
commit 6342a46e59
14 changed files with 273 additions and 503 deletions
+40 -27
View File
@@ -39,12 +39,20 @@ export interface CompressionResult {
summaryTokens?: number;
}
/**
* Chat 返回结果
*/
interface ChatResult {
text: string;
messages: unknown[];
}
/**
* Agent 实例接口
*/
interface AgentInstance {
setRegistry(registry: unknown): void;
chat(message: string, onStream?: (chunk: string) => void): Promise<string>;
chat(message: string, onStream?: (chunk: string) => void): Promise<ChatResult>;
getToolCount(): { core: number; discovered: number; total: number };
getContextUsageFormatted(): string;
getContextUsage(): TokenUsage;
@@ -52,6 +60,7 @@ interface AgentInstance {
getCompressionManager(): {
shouldCompress(messages: unknown[]): boolean;
};
getHistory(): unknown[];
}
/**
@@ -256,23 +265,17 @@ export async function processMessage(sessionId: string, content: string): Promis
}
// Core 模块不可用,返回占位响应
const errorContent = 'Agent core module not available. Please build @ai-assistant/core first.';
broadcastToSession(sessionId, {
type: 'chunk',
sessionId,
payload: {
content: 'Agent core module not available. Please build @ai-assistant/core first.',
},
});
const assistantMessage = await sessionManager.addMessage(sessionId, {
role: 'assistant',
content: 'Agent core module not available. Please build @ai-assistant/core first.',
payload: { content: errorContent },
});
broadcastToSession(sessionId, {
type: 'done',
sessionId,
payload: assistantMessage,
payload: { text: errorContent, hasToolCalls: false, messageCount: 0 },
});
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
@@ -282,7 +285,7 @@ export async function processMessage(sessionId: string, content: string): Promis
try {
// 调用 Agent 的 chat 方法,使用流式回调
const response = await agent.chat(content, (chunk: string) => {
const result = await agent.chat(content, (chunk: string) => {
// 推送流式内容
broadcastToSession(sessionId, {
type: 'chunk',
@@ -299,31 +302,41 @@ export async function processMessage(sessionId: string, content: string): Promis
}
});
// 保存助手消息
const assistantMessage = await sessionManager.addMessage(sessionId, {
role: 'assistant',
content: response,
// 消息已由 Core Agent 自动持久化,这里只更新 Server 端的会话计数
const session = sessionManager.get(sessionId);
if (session) {
// 从 Agent 获取实际消息数
const history = agent.getHistory();
session.messageCount = history.length;
session.updatedAt = new Date().toISOString();
}
// 检查是否有工具调用
const hasToolCalls = result.messages.some((m: unknown) => {
const msg = m as { content?: unknown };
return Array.isArray(msg.content) && msg.content.some((c: unknown) => {
const block = c as { type?: string };
return block.type === 'tool-call';
});
});
// 发送完成消息
broadcastToSession(sessionId, {
type: 'done',
sessionId,
payload: assistantMessage,
payload: {
text: result.text,
hasToolCalls,
messageCount: result.messages.length,
},
});
// 检查是否需要生成会话标题(首次对话完成后)
const session = sessionManager.get(sessionId);
const messages = sessionManager.getMessages(sessionId);
if (session && !session.name && messages.length === 2) {
// 首条用户消息 + 首条 AI 回复 = 2 条消息
const userMessage = messages.find(m => m.role === 'user');
if (userMessage) {
// 异步生成标题,不阻塞响应
generateSessionTitle(sessionId, userMessage.content, response).catch(err => {
console.error('[Agent] Failed to generate session title:', err);
});
}
if (session && !session.name) {
// 异步生成标题,不阻塞响应
generateSessionTitle(sessionId, content, result.text).catch(err => {
console.error('[Agent] Failed to generate session title:', err);
});
}
emitStatusEvent(sessionId, 'idle');
+23 -60
View File
@@ -6,7 +6,7 @@
import { Hono } from 'hono';
import { getSessionManager } from '../session/manager.js';
import { CreateSessionInputSchema, SendMessageInputSchema } from '../types.js';
import { CreateSessionInputSchema } from '../types.js';
export const sessionsRouter = new Hono();
@@ -99,8 +99,10 @@ sessionsRouter.delete('/:id', async (c) => {
/**
* GET /sessions/:id/messages - 获取会话消息
*
* 从 Core 存储读取完整的消息历史(包含 tool-call 和 tool-result
*/
sessionsRouter.get('/:id/messages', (c) => {
sessionsRouter.get('/:id/messages', async (c) => {
const id = c.req.param('id');
if (!sessionManager.exists(id)) {
@@ -113,66 +115,27 @@ sessionsRouter.get('/:id/messages', (c) => {
);
}
const messages = sessionManager.getMessages(id);
// 从 Core Storage 读取消息
const storage = sessionManager.getStorage();
if (!storage) {
return c.json({
success: true,
data: [],
});
}
const projectId = sessionManager.getProjectId(id);
const sessionData = await storage.loadSession(projectId, id);
if (!sessionData) {
return c.json({
success: true,
data: [],
});
}
return c.json({
success: true,
data: messages,
data: sessionData.messages,
});
});
/**
* POST /sessions/:id/messages - 发送消息
*
* 注意: 这个端点仅用于添加消息记录。
* 实际的 AI 对话应该通过 WebSocket 进行。
*/
sessionsRouter.post('/:id/messages', async (c) => {
const id = c.req.param('id');
if (!sessionManager.exists(id)) {
return c.json(
{
success: false,
error: 'Session not found',
},
404
);
}
try {
const body = await c.req.json();
const input = SendMessageInputSchema.parse(body);
const message = await sessionManager.addMessage(id, {
role: input.role,
content: input.content,
});
if (!message) {
return c.json(
{
success: false,
error: 'Failed to add message',
},
500
);
}
return c.json(
{
success: true,
data: message,
},
201
);
} catch (error) {
return c.json(
{
success: false,
error: error instanceof Error ? error.message : 'Invalid input',
},
400
);
}
});
+49 -98
View File
@@ -1,34 +1,16 @@
/**
* Session Manager
*
* 管理所有活跃的会话,支持文件持久化
* 管理所有活跃的会话元数据(不存储消息,消息由 Core 负责)
*/
import { v4 as uuidv4 } from 'uuid';
import type { Session, CreateSessionInput, Message, SessionStatus } from '../types.js';
import type { Session, CreateSessionInput, SessionStatus } from '../types.js';
// ============================================================================
// Core 模块接口定义(避免构建时依赖)
// ============================================================================
interface SessionMetadata {
id: string;
projectId: string;
parentId?: string;
agentName?: string;
createdAt: string;
updatedAt: string;
workdir: string;
title?: string;
messageCount: number;
discoveredTools: string[];
todos: unknown[];
}
interface SessionData extends Omit<SessionMetadata, 'messageCount'> {
messages: Array<{ role: string; content: unknown }>;
}
interface SessionSummary {
id: string;
title: string;
@@ -45,6 +27,20 @@ interface ProjectMetadata {
isGitRepo: boolean;
}
interface SessionData {
id: string;
projectId: string;
parentId?: string;
agentName?: string;
createdAt: string;
updatedAt: string;
workdir: string;
title?: string;
messages: Array<{ role: string; content: unknown }>;
discoveredTools: string[];
todos: unknown[];
}
interface SessionStorageInterface {
ensureDir(): Promise<void>;
generateSessionId(): string;
@@ -56,46 +52,31 @@ interface SessionStorageInterface {
deleteSession(projectId: string, sessionId: string): Promise<boolean>;
}
// ============================================================================
// 消息格式转换
// ============================================================================
/**
* 将 Server Message 转换为 Core ModelMessage 格式
*/
function toModelMessage(msg: Message): { role: string; content: string } {
return { role: msg.role, content: msg.content };
}
/**
* 将 Core ModelMessage 转换为 Server Message 格式
*/
function fromModelMessage(
msg: { role: string; content: unknown },
sessionId: string,
_index: number
): Message {
return {
id: uuidv4(),
sessionId,
role: msg.role as 'user' | 'assistant' | 'system' | 'tool',
content: typeof msg.content === 'string' ? msg.content : JSON.stringify(msg.content),
createdAt: new Date().toISOString(),
};
}
// ============================================================================
// SessionManager 类
// ============================================================================
export class SessionManager {
private sessions: Map<string, Session> = new Map();
private messages: Map<string, Message[]> = new Map();
private sessionProjects: Map<string, string> = new Map(); // sessionId -> projectId
private storage: SessionStorageInterface | null = null;
private currentProject: ProjectMetadata | null = null;
private initialized = false;
/**
* 获取 storage 实例(供外部使用)
*/
getStorage(): SessionStorageInterface | null {
return this.storage;
}
/**
* 获取 session 所属的 projectId
*/
getProjectId(sessionId: string): string {
return this.sessionProjects.get(sessionId) || this.currentProject?.id || 'default';
}
/**
* 初始化:加载 Core 模块的 SessionStorage 并恢复已有 sessions
*/
@@ -127,7 +108,7 @@ export class SessionManager {
// 记录 session -> project 映射
this.sessionProjects.set(sessionData.id, sessionData.projectId);
// 转换为 Server Session 格式
// 转换为 Server Session 格式(只保存元数据,不存储消息)
const session: Session = {
id: sessionData.id,
name: sessionData.title,
@@ -139,10 +120,6 @@ export class SessionManager {
};
this.sessions.set(session.id, session);
// 转换消息格式
const messages = sessionData.messages.map((msg, i) => fromModelMessage(msg, session.id, i));
this.messages.set(session.id, messages);
}
console.log(`[SessionManager] Loaded ${this.sessions.size} sessions from storage`);
@@ -154,18 +131,20 @@ export class SessionManager {
}
/**
* 持久化单个 session
* 持久化单个 session 的元数据
* 注意:消息存储由 Core Agent 负责,这里只更新会话元数据
*/
private async persist(sessionId: string): Promise<void> {
private async persistMetadata(sessionId: string): Promise<void> {
if (!this.storage) return;
const session = this.sessions.get(sessionId);
const messages = this.messages.get(sessionId) || [];
if (!session) return;
const projectId = this.sessionProjects.get(sessionId) || this.currentProject?.id || 'default';
// 先加载现有的 session 数据(保留消息)
const existingData = await this.storage.loadSession(projectId, sessionId);
const sessionData: SessionData = {
id: session.id,
projectId,
@@ -173,9 +152,9 @@ export class SessionManager {
updatedAt: session.updatedAt,
workdir: session.workdir,
title: session.name,
messages: messages.map(toModelMessage),
discoveredTools: [],
todos: [],
messages: existingData?.messages || [],
discoveredTools: existingData?.discoveredTools || [],
todos: existingData?.todos || [],
};
await this.storage.saveSession(sessionData);
@@ -206,11 +185,10 @@ export class SessionManager {
};
this.sessions.set(session.id, session);
this.messages.set(session.id, []);
this.sessionProjects.set(session.id, projectId);
// 持久化
await this.persist(session.id);
// 持久化空会话
await this.persistMetadata(session.id);
return session;
}
@@ -235,7 +213,6 @@ export class SessionManager {
* 删除会话
*/
async delete(id: string): Promise<boolean> {
this.messages.delete(id);
const deleted = this.sessions.delete(id);
// 从存储中删除
@@ -261,41 +238,15 @@ export class SessionManager {
}
/**
* 获取会话消息
* 更新会话消息计数(由 Agent 调用)
*/
getMessages(sessionId: string): Message[] {
return this.messages.get(sessionId) || [];
}
/**
* 添加消息
*/
async addMessage(
sessionId: string,
message: Omit<Message, 'id' | 'sessionId' | 'createdAt'>
): Promise<Message | undefined> {
const session = this.sessions.get(sessionId);
updateMessageCount(id: string, count: number): Session | undefined {
const session = this.sessions.get(id);
if (!session) return undefined;
const fullMessage: Message = {
...message,
id: uuidv4(),
sessionId,
createdAt: new Date().toISOString(),
};
const messages = this.messages.get(sessionId) || [];
messages.push(fullMessage);
this.messages.set(sessionId, messages);
// 更新会话
session.messageCount = messages.length;
session.messageCount = count;
session.updatedAt = new Date().toISOString();
// 持久化
await this.persist(sessionId);
return fullMessage;
return session;
}
/**
@@ -308,8 +259,8 @@ export class SessionManager {
session.name = name;
session.updatedAt = new Date().toISOString();
// 持久化
await this.persist(sessionId);
// 持久化元数据
await this.persistMetadata(sessionId);
return session;
}
+11 -16
View File
@@ -106,24 +106,19 @@ export async function handleWebSocketMessage(
case 'message': {
// 用户发送消息
const content = message.payload?.content || '';
const userMessage = sessionManager.addMessage(sessionId, {
role: 'user',
content,
// 广播确认收到消息
broadcastToSession(sessionId, {
type: 'message_received',
sessionId,
payload: { content },
});
if (userMessage) {
// 广播用户消息
broadcastToSession(sessionId, {
type: 'message_received',
sessionId,
payload: userMessage,
});
// 调用 Agent 处理消息(异步,不阻塞)
processMessage(sessionId, content).catch((error) => {
console.error('[WS] Agent processing error:', error);
});
}
// 调用 Agent 处理消息(异步,不阻塞)
// 消息存储由 Core Agent 负责
processMessage(sessionId, content).catch((error) => {
console.error('[WS] Agent processing error:', error);
});
break;
}