refactor(storage): 重构消息存储为 2-message 格式

采用 OpenCode 风格的消息存储架构:
- 只有 user 和 assistant 两种角色,移除 tool/system
- ToolPart 使用状态机模式 (pending → running → completed/error)
- 新增 toModelMessages() 转换函数用于调用 AI SDK
- 删除 message-merger.ts,存储层直接返回正确格式

主要改动:
- parts.ts: ToolState 状态机(pending/running/completed/error)
- message.ts: 移除 system role,添加 parentId 关联
- converter.ts: 新增 toModelMessages() 格式转换
- manager.ts: 重构 syncMessages/partsToModelMessages
- sessions.ts: 简化路由,直接从 Core Storage 读取
This commit is contained in:
2025-12-15 13:35:32 +08:00
parent eda2ccb171
commit 9f456c1029
13 changed files with 635 additions and 513 deletions
+157
View File
@@ -0,0 +1,157 @@
/**
* 消息转换器
* 将内部存储格式转换为 AI SDK ModelMessage 格式
*/
import type { ModelMessage } from 'ai';
import type { Message } from './message.js';
import type { Part, ToolPart, TextPart } from './parts.js';
/**
* 将单条内部消息转换为 AI SDK ModelMessage 数组
*
* 对于 assistant 消息,可能产生 1-2 条 ModelMessage
* 1. assistant 消息(文本 + 工具调用)
* 2. tool 消息(如果有已完成的工具调用)
*/
function messageToModelMessages(msg: Message): ModelMessage[] {
const result: ModelMessage[] = [];
if (msg.role === 'user') {
// User 消息:提取所有文本
const textContent = extractTextContent(msg.parts);
if (textContent) {
result.push({
role: 'user',
content: textContent,
});
}
return result;
}
// Assistant 消息
const textParts = msg.parts.filter((p): p is TextPart => p.type === 'text');
const toolParts = msg.parts.filter((p): p is ToolPart => p.type === 'tool');
// 构建 assistant 消息内容
const assistantContent: unknown[] = [];
// 添加文本部分
for (const textPart of textParts) {
assistantContent.push({
type: 'text',
text: textPart.text,
});
}
// 添加工具调用部分(只有 running 或已完成的工具)
for (const toolPart of toolParts) {
if (toolPart.state.status !== 'pending') {
assistantContent.push({
type: 'tool-call',
toolCallId: toolPart.toolCallId,
toolName: toolPart.toolName,
args: toolPart.state.input,
});
}
}
// 添加 assistant 消息
if (assistantContent.length > 0) {
// 简化:如果只有一个文本内容,直接使用字符串
if (
assistantContent.length === 1 &&
(assistantContent[0] as { type: string }).type === 'text'
) {
result.push({
role: 'assistant',
content: (assistantContent[0] as { text: string }).text,
});
} else {
result.push({
role: 'assistant',
content: assistantContent,
} as ModelMessage);
}
}
// 如果有已完成/出错的工具调用,添加 tool 消息
const completedToolParts = toolParts.filter(
(p) => p.state.status === 'completed' || p.state.status === 'error'
);
if (completedToolParts.length > 0) {
const toolContent = completedToolParts.map((toolPart) => {
const state = toolPart.state;
// 使用类型断言获取正确的值
const output = state.status === 'completed'
? (state as { output: unknown }).output
: (state as { error: string }).error;
return {
type: 'tool-result' as const,
toolCallId: toolPart.toolCallId,
toolName: toolPart.toolName,
result: output,
};
});
result.push({
role: 'tool',
content: toolContent,
} as unknown as ModelMessage);
}
return result;
}
/**
* 从 Parts 中提取纯文本内容
*/
function extractTextContent(parts: Part[]): string {
return parts
.filter((p): p is TextPart => p.type === 'text')
.map((p) => p.text)
.join('');
}
/**
* 将内部消息列表转换为 AI SDK ModelMessage 数组
*
* 输入:内部存储的 Message[] (只有 user 和 assistant)
* 输出:AI SDK 格式的 ModelMessage[] (包括 user, assistant, tool)
*/
export function toModelMessages(messages: Message[]): ModelMessage[] {
const result: ModelMessage[] = [];
for (const msg of messages) {
const modelMessages = messageToModelMessages(msg);
result.push(...modelMessages);
}
return result;
}
/**
* 获取工具调用的输入参数(兼容不同状态)
*/
export function getToolInput(toolPart: ToolPart): Record<string, unknown> {
if (toolPart.state.status === 'pending') {
return {};
}
return toolPart.state.input;
}
/**
* 获取工具调用的执行时长(毫秒)
*/
export function getToolDuration(toolPart: ToolPart): number | undefined {
const state = toolPart.state;
if (state.status === 'completed' || state.status === 'error') {
return state.time.end - state.time.start;
}
if (state.status === 'running') {
return Date.now() - state.time.start;
}
return undefined;
}
+9 -2
View File
@@ -11,7 +11,7 @@ export {
createMessageInfo,
} from './message.js';
export type { Part, PartType, ToolStatus } from './parts.js';
export type { Part, PartType, ToolStatus, ToolState, ToolPart, TextPart } from './parts.js';
export {
PartSchema,
TextPartSchema,
@@ -26,10 +26,17 @@ export {
SubtaskPartSchema,
CompactionPartSchema,
RetryPartSchema,
ToolStatusSchema,
ToolStateSchema,
ToolStatePendingSchema,
ToolStateRunningSchema,
ToolStateCompletedSchema,
ToolStateErrorSchema,
createPart,
} from './parts.js';
// 消息转换器
export { toModelMessages, getToolInput, getToolDuration } from './converter.js';
// ID 生成器
export {
generateSessionId,
+194 -98
View File
@@ -175,76 +175,118 @@ export class SessionManager {
for (const messageInfo of messageInfos) {
const parts = await PartStorage.getByIds(messageInfo.id, messageInfo.partIds);
const modelMessage = this.partsToModelMessage(messageInfo.role, parts);
if (modelMessage) {
messages.push(modelMessage);
}
const modelMessages = this.partsToModelMessages(messageInfo.role, parts);
messages.push(...modelMessages);
}
return messages;
}
/**
* 将 Parts 转换为 AI SDK ModelMessage
* 将 Parts 转换为 AI SDK ModelMessage(用于加载历史消息)
*
* 新逻辑:
* - user 消息:直接转换
* - assistant 消息:转换文本和工具调用,然后为已完成的工具生成 tool 消息
*/
private partsToModelMessage(role: string, parts: Part[]): ModelMessage | null {
if (parts.length === 0) return null;
private partsToModelMessages(role: string, parts: Part[]): ModelMessage[] {
if (parts.length === 0) return [];
// 构建消息内容
const content: unknown[] = [];
const result: ModelMessage[] = [];
for (const part of parts) {
switch (part.type) {
case 'text':
if (role === 'user') {
// User 消息:只有文本和文件
const content: unknown[] = [];
for (const part of parts) {
if (part.type === 'text') {
content.push({ type: 'text', text: part.text });
break;
case 'tool':
if (role === 'assistant') {
content.push({
type: 'tool-call',
toolCallId: part.toolCallId,
toolName: part.toolName,
args: part.args,
});
} else if (role === 'tool') {
// Tool result message - AI SDK 的 tool message 格式
return {
role: 'tool',
content: [{
type: 'tool-result',
toolCallId: part.toolCallId,
toolName: part.toolName,
result: part.result,
}],
} as unknown as ModelMessage;
}
break;
case 'file':
} else if (part.type === 'file') {
content.push({
type: 'image',
image: part.data,
mimeType: part.mimeType,
});
break;
case 'reasoning':
// Reasoning 通常作为文本的一部分
}
}
if (content.length === 1 && (content[0] as { type: string }).type === 'text') {
result.push({
role: 'user',
content: (content[0] as { text: string }).text,
});
} else if (content.length > 0) {
result.push({
role: 'user',
content,
} as ModelMessage);
}
} else if (role === 'assistant') {
// Assistant 消息:文本 + 工具调用
const content: unknown[] = [];
const completedTools: Array<{ toolCallId: string; toolName: string; output: unknown }> = [];
for (const part of parts) {
if (part.type === 'text') {
content.push({ type: 'text', text: part.text });
} else if (part.type === 'tool') {
// 只有非 pending 状态的工具调用才添加到 AI SDK 消息
if (part.state.status !== 'pending') {
content.push({
type: 'tool-call',
toolCallId: part.toolCallId,
toolName: part.toolName,
args: part.state.input,
});
// 收集已完成的工具结果
if (part.state.status === 'completed') {
completedTools.push({
toolCallId: part.toolCallId,
toolName: part.toolName,
output: part.state.output,
});
} else if (part.state.status === 'error') {
completedTools.push({
toolCallId: part.toolCallId,
toolName: part.toolName,
output: part.state.error,
});
}
}
} else if (part.type === 'reasoning') {
content.push({ type: 'text', text: `[Reasoning] ${part.text}` });
break;
}
}
// 添加 assistant 消息
if (content.length === 1 && (content[0] as { type: string }).type === 'text') {
result.push({
role: 'assistant',
content: (content[0] as { text: string }).text,
});
} else if (content.length > 0) {
result.push({
role: 'assistant',
content,
} as ModelMessage);
}
// 添加 tool 消息(如果有已完成的工具)
if (completedTools.length > 0) {
result.push({
role: 'tool',
content: completedTools.map((t) => ({
type: 'tool-result',
toolCallId: t.toolCallId,
toolName: t.toolName,
result: t.output,
})),
} as unknown as ModelMessage);
}
}
// 简化:如果只有一个文本内容,直接使用字符串
if (content.length === 1 && (content[0] as { type: string }).type === 'text') {
return {
role: role as 'user' | 'assistant' | 'system',
content: (content[0] as { text: string }).text,
} as ModelMessage;
}
return {
role: role as 'user' | 'assistant' | 'system' | 'tool',
content,
} as ModelMessage;
return result;
}
/**
@@ -297,6 +339,11 @@ export class SessionManager {
/**
* 同步消息到存储(将 AI SDK 消息转换为 Message + Parts
*
* 新逻辑:只存储 user 和 assistant 消息
* - user 消息:直接存储
* - assistant 消息:合并后续的 tool 消息中的工具结果
* - tool 消息:跳过(结果合并到 assistant)
*/
async syncMessages(messages: ModelMessage[]): Promise<void> {
if (!this.currentSession) return;
@@ -306,59 +353,108 @@ export class SessionManager {
// 删除旧消息
await MessageStorage.removeBySession(sessionId);
// 保存新消息
for (const message of messages) {
const messageInfo = await MessageStorage.create(sessionId, message.role as 'user' | 'assistant' | 'system');
// 用于跟踪当前 assistant 消息的工具调用
let currentAssistantMsgId: string | null = null;
let currentUserMsgId: string | null = null;
const toolCallPartIds = new Map<string, string>(); // toolCallId -> partId
// 将消息内容转换为 Parts
const partIds: string[] = [];
for (let i = 0; i < messages.length; i++) {
const message = messages[i];
if (typeof message.content === 'string') {
// 简单文本
const part = await PartStorage.createText(messageInfo.id, message.content);
partIds.push(part.id);
} else if (Array.isArray(message.content)) {
// 复杂内容(多个 parts
for (const item of message.content) {
const itemType = (item as { type: string }).type;
if (itemType === 'text') {
const part = await PartStorage.createText(messageInfo.id, (item as { text: string }).text);
partIds.push(part.id);
} else if (itemType === 'tool-call') {
const toolCall = item as unknown as { toolCallId: string; toolName: string; args: Record<string, unknown> };
const part = await PartStorage.createTool(
messageInfo.id,
toolCall.toolCallId,
toolCall.toolName,
toolCall.args
);
partIds.push(part.id);
} else if (itemType === 'tool-result') {
const toolResult = item as unknown as { toolCallId: string; toolName: string; result: unknown };
const part = await PartStorage.create(messageInfo.id, 'tool', {
toolCallId: toolResult.toolCallId,
toolName: toolResult.toolName,
args: {},
status: 'completed',
result: toolResult.result,
});
partIds.push(part.id);
} else if (itemType === 'image') {
const img = item as unknown as { image: string; mimeType: string };
const part = await PartStorage.create(messageInfo.id, 'file', {
filename: 'image',
mimeType: img.mimeType,
data: typeof img.image === 'string' ? img.image : '',
});
partIds.push(part.id);
if (message.role === 'user') {
// User 消息
const messageInfo = await MessageStorage.create(sessionId, 'user');
currentUserMsgId = messageInfo.id;
const partIds: string[] = [];
if (typeof message.content === 'string') {
const part = await PartStorage.createText(messageInfo.id, message.content);
partIds.push(part.id);
} else if (Array.isArray(message.content)) {
for (const item of message.content) {
const itemType = (item as { type: string }).type;
if (itemType === 'text') {
const part = await PartStorage.createText(messageInfo.id, (item as { text: string }).text);
partIds.push(part.id);
} else if (itemType === 'image') {
const img = item as unknown as { image: string; mimeType: string };
const part = await PartStorage.create(messageInfo.id, 'file', {
filename: 'image',
mimeType: img.mimeType,
data: typeof img.image === 'string' ? img.image : '',
});
partIds.push(part.id);
}
}
}
}
// 更新消息的 partIds
if (partIds.length > 0) {
await MessageStorage.update(sessionId, messageInfo.id, { partIds });
if (partIds.length > 0) {
await MessageStorage.update(sessionId, messageInfo.id, { partIds });
}
// 重置工具调用追踪
currentAssistantMsgId = null;
toolCallPartIds.clear();
} else if (message.role === 'assistant') {
// Assistant 消息
const messageInfo = await MessageStorage.create(sessionId, 'assistant', {
parentId: currentUserMsgId ?? undefined,
});
currentAssistantMsgId = messageInfo.id;
const partIds: string[] = [];
if (typeof message.content === 'string') {
const part = await PartStorage.createText(messageInfo.id, message.content);
partIds.push(part.id);
} else if (Array.isArray(message.content)) {
for (const item of message.content) {
const itemType = (item as { type: string }).type;
if (itemType === 'text') {
const part = await PartStorage.createText(messageInfo.id, (item as { text: string }).text);
partIds.push(part.id);
} else if (itemType === 'tool-call') {
const toolCall = item as unknown as { toolCallId: string; toolName: string; args: Record<string, unknown> };
// 创建 running 状态的工具 Part
const part = await PartStorage.createToolRunning(
messageInfo.id,
toolCall.toolCallId,
toolCall.toolName,
toolCall.args ?? {}
);
partIds.push(part.id);
toolCallPartIds.set(toolCall.toolCallId, part.id);
}
}
}
if (partIds.length > 0) {
await MessageStorage.update(sessionId, messageInfo.id, { partIds });
}
} else if (message.role === 'tool' && currentAssistantMsgId) {
// Tool 消息:更新对应 assistant 消息中的工具 Part 状态
if (Array.isArray(message.content)) {
for (const item of message.content) {
const itemType = (item as { type: string }).type;
if (itemType === 'tool-result') {
const toolResult = item as unknown as { toolCallId: string; toolName: string; result: unknown };
const partId = toolCallPartIds.get(toolResult.toolCallId);
if (partId) {
// 更新工具状态为 completed
// 获取原始 start time
const part = await PartStorage.get(currentAssistantMsgId, partId);
const startTime = part?.type === 'tool' && part.state.status === 'running'
? part.state.time.start
: Date.now();
await PartStorage.setToolCompleted(currentAssistantMsgId, partId, toolResult.result, startTime);
}
}
}
}
// 不创建新消息,跳过 tool role
}
// 忽略 system 消息(system prompt 通过其他方式注入)
}
}
+10 -4
View File
@@ -1,9 +1,9 @@
import { z } from 'zod';
/**
* 消息角色
* 消息角色(只有 user 和 assistant,不存储 tool/system
*/
export const MessageRoleSchema = z.enum(['user', 'assistant', 'system']);
export const MessageRoleSchema = z.enum(['user', 'assistant']);
export type MessageRole = z.infer<typeof MessageRoleSchema>;
/**
@@ -13,6 +13,8 @@ export const MessageInfoSchema = z.object({
id: z.string(),
sessionId: z.string(),
role: MessageRoleSchema,
// assistant 消息指向对应的 user 消息
parentId: z.string().optional(),
createdAt: z.number(), // Unix timestamp in milliseconds
// Part IDs 列表(按顺序)
partIds: z.array(z.string()),
@@ -66,14 +68,18 @@ export function createMessageInfo(
sessionId: string,
role: MessageRole,
partIds: string[] = [],
metadata?: MessageInfo['metadata']
options?: {
parentId?: string;
metadata?: MessageInfo['metadata'];
}
): MessageInfo {
return {
id: generateMessageId(),
sessionId,
role,
parentId: options?.parentId,
createdAt: Date.now(),
partIds,
metadata,
metadata: options?.metadata,
};
}
+55 -10
View File
@@ -18,24 +18,69 @@ export const TextPartSchema = PartBaseSchema.extend({
export type TextPart = z.infer<typeof TextPartSchema>;
/**
* 工具调用状态
* 工具状态机 - Pending(等待执行)
*/
export const ToolStatusSchema = z.enum(['pending', 'running', 'completed', 'error']);
export type ToolStatus = z.infer<typeof ToolStatusSchema>;
export const ToolStatePendingSchema = z.object({
status: z.literal('pending'),
});
export type ToolStatePending = z.infer<typeof ToolStatePendingSchema>;
/**
* 工具调用 Part
* 工具状态机 - Running(执行中)
*/
export const ToolStateRunningSchema = z.object({
status: z.literal('running'),
input: z.record(z.string(), z.unknown()),
time: z.object({ start: z.number() }),
});
export type ToolStateRunning = z.infer<typeof ToolStateRunningSchema>;
/**
* 工具状态机 - Completed(执行完成)
*/
export const ToolStateCompletedSchema = z.object({
status: z.literal('completed'),
input: z.record(z.string(), z.unknown()),
output: z.unknown(),
time: z.object({ start: z.number(), end: z.number() }),
});
export type ToolStateCompleted = z.infer<typeof ToolStateCompletedSchema>;
/**
* 工具状态机 - Error(执行出错)
*/
export const ToolStateErrorSchema = z.object({
status: z.literal('error'),
input: z.record(z.string(), z.unknown()),
error: z.string(),
time: z.object({ start: z.number(), end: z.number() }),
});
export type ToolStateError = z.infer<typeof ToolStateErrorSchema>;
/**
* 工具状态联合类型
*/
export const ToolStateSchema = z.discriminatedUnion('status', [
ToolStatePendingSchema,
ToolStateRunningSchema,
ToolStateCompletedSchema,
ToolStateErrorSchema,
]);
export type ToolState = z.infer<typeof ToolStateSchema>;
/**
* 工具状态字面量(用于类型检查)
*/
export type ToolStatus = ToolState['status'];
/**
* 工具调用 Part(使用状态机模式)
*/
export const ToolPartSchema = PartBaseSchema.extend({
type: z.literal('tool'),
toolCallId: z.string(),
toolName: z.string(),
args: z.record(z.string(), z.unknown()).default({}),
status: ToolStatusSchema,
result: z.unknown().optional(),
error: z.string().optional(),
startedAt: z.number().optional(),
completedAt: z.number().optional(),
state: ToolStateSchema,
});
export type ToolPart = z.infer<typeof ToolPartSchema>;
+1 -1
View File
@@ -26,7 +26,7 @@ export type { MessageInfo } from './message.js';
// Part storage
export * as PartStorage from './part.js';
export type { Part, PartType, ToolPart, ToolStatus } from './part.js';
export type { Part, PartType, ToolPart, ToolStatus, ToolState } from './part.js';
// Todo storage
export * as TodoStorage from './todo.js';
@@ -14,6 +14,7 @@ export async function create(
sessionId: string,
role: MessageInfo['role'],
options?: {
parentId?: string;
partIds?: string[];
metadata?: MessageInfo['metadata'];
}
@@ -22,6 +23,7 @@ export async function create(
id: generateMessageId(),
sessionId,
role,
parentId: options?.parentId,
createdAt: Date.now(),
partIds: options?.partIds || [],
metadata: options?.metadata,
+86 -25
View File
@@ -1,10 +1,10 @@
import * as base from './base.js';
import { generatePartId } from '../id.js';
import type { Part, PartType, ToolPart, ToolStatus } from '../parts.js';
import type { Part, PartType, ToolPart, ToolStatus, ToolState } from '../parts.js';
import { PartSchema } from '../parts.js';
// Re-export types
export type { Part, PartType, ToolPart, ToolStatus } from '../parts.js';
export type { Part, PartType, ToolPart, ToolStatus, ToolState } from '../parts.js';
/**
* 创建 Part
@@ -60,32 +60,75 @@ export async function update<T extends Part>(
}
/**
* 更新 ToolPart 状态
* 更新 ToolPart 状态(新的状态机模式)
*/
export async function updateToolStatus(
export async function updateToolState(
messageId: string,
partId: string,
status: ToolStatus,
result?: unknown,
error?: string
state: ToolState
): Promise<ToolPart> {
return base.update(['part', messageId, partId], (part: ToolPart) => {
part.status = status;
if (status === 'running') {
part.startedAt = Date.now();
}
if (status === 'completed' || status === 'error') {
part.completedAt = Date.now();
if (result !== undefined) {
part.result = result;
}
if (error !== undefined) {
part.error = error;
}
}
part.state = state;
}) as Promise<ToolPart>;
}
/**
* 更新 ToolPart 为 running 状态
*/
export async function setToolRunning(
messageId: string,
partId: string,
input: Record<string, unknown>
): Promise<ToolPart> {
return updateToolState(messageId, partId, {
status: 'running',
input,
time: { start: Date.now() },
});
}
/**
* 更新 ToolPart 为 completed 状态
*/
export async function setToolCompleted(
messageId: string,
partId: string,
output: unknown,
startTime: number
): Promise<ToolPart> {
// 先获取当前 part 以获取 input
const part = await get(messageId, partId) as ToolPart | null;
const input = part?.state.status !== 'pending' ? part?.state.input : {};
return updateToolState(messageId, partId, {
status: 'completed',
input: input ?? {},
output,
time: { start: startTime, end: Date.now() },
});
}
/**
* 更新 ToolPart 为 error 状态
*/
export async function setToolError(
messageId: string,
partId: string,
error: string,
startTime: number
): Promise<ToolPart> {
// 先获取当前 part 以获取 input
const part = await get(messageId, partId) as ToolPart | null;
const input = part?.state.status !== 'pending' ? part?.state.input : {};
return updateToolState(messageId, partId, {
status: 'error',
input: input ?? {},
error,
time: { start: startTime, end: Date.now() },
});
}
/**
* 删除 Part
*/
@@ -144,19 +187,37 @@ export async function createText(messageId: string, text: string): Promise<Part>
}
/**
* 创建工具调用 Part
* 创建工具调用 Partpending 状态)
*/
export async function createTool(
messageId: string,
toolCallId: string,
toolName: string,
args?: Record<string, unknown>
toolName: string
): Promise<ToolPart> {
return create<ToolPart>(messageId, 'tool', {
toolCallId,
toolName,
args: args ?? {},
status: 'pending',
state: { status: 'pending' },
});
}
/**
* 创建工具调用 Part(直接 running 状态)
*/
export async function createToolRunning(
messageId: string,
toolCallId: string,
toolName: string,
input: Record<string, unknown>
): Promise<ToolPart> {
return create<ToolPart>(messageId, 'tool', {
toolCallId,
toolName,
state: {
status: 'running',
input,
time: { start: Date.now() },
},
});
}