refactor(session): 升级存储系统架构

- 添加读写锁机制防止并发写入冲突
- 实现数据迁移框架支持版本化升级
- 分层存储结构:项目/会话/消息独立存储
- 使用 Git root commit hash 作为稳定项目 ID
- 增量消息同步避免重复写入
- 每条消息独立文件,按序号命名 (0001.json, 0002.json)
This commit is contained in:
2025-12-13 10:41:36 +08:00
parent 26e8646518
commit 9ff2934089
11 changed files with 1277 additions and 487 deletions
+332 -95
View File
@@ -1,7 +1,18 @@
import * as fs from 'fs/promises';
import * as path from 'path';
import * as os from 'os';
import type { SessionData, SessionSummary } from './types.js';
import type { ModelMessage } from 'ai';
import type {
SessionData,
SessionMetadata,
SessionSummary,
StoredMessage,
CurrentSessionPointer,
ProjectMetadata,
} from './types.js';
import { Lock } from '../utils/lock.js';
import { getProjectId, isGitRepository } from './project.js';
import { runMigrations } from './migration.js';
/**
* 获取默认存储目录
@@ -17,24 +28,38 @@ function getDefaultStorageDir(): string {
/**
* 会话存储类
* 负责会话数据的读写操作
* 负责会话数据的读写操作,采用分层存储结构
*/
export class SessionStorage {
private storageDir: string;
private sessionsDir: string;
private projectDir: string;
private sessionDir: string;
private messageDir: string;
private currentSessionFile: string;
private initialized = false;
constructor(storageDir?: string) {
this.storageDir = storageDir || getDefaultStorageDir();
this.sessionsDir = path.join(this.storageDir, 'sessions');
this.projectDir = path.join(this.storageDir, 'project');
this.sessionDir = path.join(this.storageDir, 'session');
this.messageDir = path.join(this.storageDir, 'message');
this.currentSessionFile = path.join(this.storageDir, 'current-session.json');
}
/**
* 确保存储目录存在
* 确保存储目录存在并执行迁移
*/
async ensureDir(): Promise<void> {
await fs.mkdir(this.sessionsDir, { recursive: true });
if (this.initialized) return;
await fs.mkdir(this.projectDir, { recursive: true });
await fs.mkdir(this.sessionDir, { recursive: true });
await fs.mkdir(this.messageDir, { recursive: true });
// 执行数据迁移
await runMigrations(this.storageDir);
this.initialized = true;
}
/**
@@ -47,47 +72,255 @@ export class SessionStorage {
return `${timestamp}_${random}`;
}
// ========== 项目管理 ==========
/**
* 保存当前会话
* 获取或创建项目
*/
async saveCurrentSession(session: SessionData): Promise<void> {
async getOrCreateProject(workdir: string): Promise<ProjectMetadata> {
await this.ensureDir();
session.updatedAt = new Date().toISOString();
await fs.writeFile(
this.currentSessionFile,
JSON.stringify(session, null, 2),
'utf-8'
);
const projectId = await getProjectId(workdir);
const projectFile = path.join(this.projectDir, `${projectId}.json`);
using _ = await Lock.read(projectFile);
try {
const content = await fs.readFile(projectFile, 'utf-8');
return JSON.parse(content) as ProjectMetadata;
} catch {
// 项目不存在,创建新项目
const isGitRepo = await isGitRepository(workdir);
const project: ProjectMetadata = {
id: projectId,
workdir,
createdAt: new Date().toISOString(),
isGitRepo,
};
using __ = await Lock.write(projectFile);
await fs.writeFile(projectFile, JSON.stringify(project, null, 2), 'utf-8');
// 确保项目的会话目录存在
await fs.mkdir(path.join(this.sessionDir, projectId), { recursive: true });
return project;
}
}
// ========== 会话元数据管理 ==========
/**
* 保存会话元数据
*/
async saveSessionMetadata(session: SessionData): Promise<void> {
await this.ensureDir();
const metadata: SessionMetadata = {
id: session.id,
projectId: session.projectId,
parentId: session.parentId,
agentName: session.agentName,
createdAt: session.createdAt,
updatedAt: new Date().toISOString(),
workdir: session.workdir,
title: session.title,
messageCount: session.messages.length,
discoveredTools: session.discoveredTools,
todos: session.todos,
};
const sessionFile = path.join(this.sessionDir, session.projectId, `${session.id}.json`);
// 确保项目目录存在
await fs.mkdir(path.join(this.sessionDir, session.projectId), { recursive: true });
using _ = await Lock.write(sessionFile);
await fs.writeFile(sessionFile, JSON.stringify(metadata, null, 2), 'utf-8');
}
/**
* 加载当前会话
* 加载会话元数据
*/
async loadCurrentSession(): Promise<SessionData | null> {
async loadSessionMetadata(projectId: string, sessionId: string): Promise<SessionMetadata | null> {
await this.ensureDir();
const sessionFile = path.join(this.sessionDir, projectId, `${sessionId}.json`);
using _ = await Lock.read(sessionFile);
try {
const content = await fs.readFile(sessionFile, 'utf-8');
return JSON.parse(content) as SessionMetadata;
} catch {
return null;
}
}
// ========== 消息管理 ==========
/**
* 追加消息
*/
async appendMessage(sessionId: string, message: ModelMessage, index: number): Promise<void> {
await this.ensureDir();
const sessionMessageDir = path.join(this.messageDir, sessionId);
await fs.mkdir(sessionMessageDir, { recursive: true });
const messageFile = path.join(sessionMessageDir, `${String(index).padStart(4, '0')}.json`);
const storedMessage: StoredMessage = {
index,
role: message.role as StoredMessage['role'],
content: message.content,
createdAt: new Date().toISOString(),
};
using _ = await Lock.write(messageFile);
await fs.writeFile(messageFile, JSON.stringify(storedMessage, null, 2), 'utf-8');
}
/**
* 批量追加消息(用于增量同步)
*/
async syncMessages(sessionId: string, messages: ModelMessage[], startIndex: number): Promise<void> {
for (let i = startIndex; i < messages.length; i++) {
await this.appendMessage(sessionId, messages[i], i + 1);
}
}
/**
* 加载会话的所有消息
*/
async loadMessages(sessionId: string): Promise<ModelMessage[]> {
await this.ensureDir();
const sessionMessageDir = path.join(this.messageDir, sessionId);
try {
const files = await fs.readdir(sessionMessageDir);
const messageFiles = files.filter((f) => f.endsWith('.json')).sort();
const messages: ModelMessage[] = [];
for (const file of messageFiles) {
const filePath = path.join(sessionMessageDir, file);
using _ = await Lock.read(filePath);
const content = await fs.readFile(filePath, 'utf-8');
const stored = JSON.parse(content) as StoredMessage;
messages.push({
role: stored.role,
content: stored.content,
} as ModelMessage);
}
return messages;
} catch {
return [];
}
}
/**
* 删除会话的所有消息
*/
async deleteMessages(sessionId: string): Promise<void> {
const sessionMessageDir = path.join(this.messageDir, sessionId);
try {
await fs.rm(sessionMessageDir, { recursive: true, force: true });
} catch {
// 忽略错误
}
}
// ========== 完整会话操作 ==========
/**
* 保存完整会话(元数据 + 消息)
*/
async saveSession(session: SessionData, lastSyncedCount: number = 0): Promise<void> {
// 增量同步消息
await this.syncMessages(session.id, session.messages, lastSyncedCount);
// 保存元数据
await this.saveSessionMetadata(session);
}
/**
* 加载完整会话
*/
async loadSession(projectId: string, sessionId: string): Promise<SessionData | null> {
const metadata = await this.loadSessionMetadata(projectId, sessionId);
if (!metadata) return null;
const messages = await this.loadMessages(sessionId);
return {
id: metadata.id,
projectId: metadata.projectId,
parentId: metadata.parentId,
agentName: metadata.agentName,
createdAt: metadata.createdAt,
updatedAt: metadata.updatedAt,
workdir: metadata.workdir,
title: metadata.title,
discoveredTools: metadata.discoveredTools,
todos: metadata.todos,
messages,
};
}
/**
* 删除会话
*/
async deleteSession(projectId: string, sessionId: string): Promise<boolean> {
try {
// 删除消息
await this.deleteMessages(sessionId);
// 删除元数据
const sessionFile = path.join(this.sessionDir, projectId, `${sessionId}.json`);
await fs.unlink(sessionFile);
return true;
} catch {
return false;
}
}
// ========== 当前会话管理 ==========
/**
* 设置当前会话
*/
async setCurrentSession(sessionId: string): Promise<void> {
await this.ensureDir();
const pointer: CurrentSessionPointer = { sessionId };
using _ = await Lock.write(this.currentSessionFile);
await fs.writeFile(this.currentSessionFile, JSON.stringify(pointer, null, 2), 'utf-8');
}
/**
* 获取当前会话 ID
*/
async getCurrentSessionId(): Promise<string | null> {
await this.ensureDir();
using _ = await Lock.read(this.currentSessionFile);
try {
const content = await fs.readFile(this.currentSessionFile, 'utf-8');
return JSON.parse(content) as SessionData;
const pointer = JSON.parse(content) as CurrentSessionPointer;
return pointer.sessionId;
} catch {
return null;
}
}
/**
* 归档当前会话到历史
*/
async archiveCurrentSession(): Promise<void> {
const current = await this.loadCurrentSession();
if (!current || current.messages.length === 0) {
return;
}
await this.ensureDir();
const archivePath = path.join(this.sessionsDir, `${current.id}.json`);
await fs.writeFile(archivePath, JSON.stringify(current, null, 2), 'utf-8');
}
/**
* 删除当前会话文件
* 清除当前会话指针
*/
async clearCurrentSession(): Promise<void> {
try {
@@ -97,74 +330,69 @@ export class SessionStorage {
}
}
// ========== 会话列表 ==========
/**
* 列出历史会话
* 列出项目的所有会话
*/
async listSessions(): Promise<SessionSummary[]> {
async listSessionsByProject(projectId: string): Promise<SessionSummary[]> {
await this.ensureDir();
const files = await fs.readdir(this.sessionsDir);
const summaries: SessionSummary[] = [];
for (const file of files) {
if (!file.endsWith('.json')) continue;
const projectSessionDir = path.join(this.sessionDir, projectId);
try {
const filePath = path.join(this.sessionsDir, file);
const content = await fs.readFile(filePath, 'utf-8');
const session = JSON.parse(content) as SessionData;
try {
const files = await fs.readdir(projectSessionDir);
const summaries: SessionSummary[] = [];
summaries.push({
id: session.id,
title: session.title || this.generateTitle(session),
workdir: session.workdir,
messageCount: session.messages.length,
createdAt: session.createdAt,
updatedAt: session.updatedAt,
});
} catch {
// 跳过无法解析的文件
for (const file of files) {
if (!file.endsWith('.json')) continue;
try {
const filePath = path.join(projectSessionDir, file);
using _ = await Lock.read(filePath);
const content = await fs.readFile(filePath, 'utf-8');
const metadata = JSON.parse(content) as SessionMetadata;
summaries.push({
id: metadata.id,
title: metadata.title || this.generateTitleFromId(metadata.id),
workdir: metadata.workdir,
messageCount: metadata.messageCount,
createdAt: metadata.createdAt,
updatedAt: metadata.updatedAt,
});
} catch {
// 跳过无法解析的文件
}
}
}
// 按更新时间降序排列
return summaries.sort(
(a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime()
);
}
/**
* 加载指定会话
*/
async loadSession(sessionId: string): Promise<SessionData | null> {
try {
const filePath = path.join(this.sessionsDir, `${sessionId}.json`);
const content = await fs.readFile(filePath, 'utf-8');
return JSON.parse(content) as SessionData;
// 按更新时间降序排列
return summaries.sort((a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime());
} catch {
return null;
return [];
}
}
/**
* 保存指定会话(用于子会话
* 列出所有会话(跨项目
*/
async saveSession(session: SessionData): Promise<void> {
async listAllSessions(): Promise<SessionSummary[]> {
await this.ensureDir();
session.updatedAt = new Date().toISOString();
const filePath = path.join(this.sessionsDir, `${session.id}.json`);
await fs.writeFile(filePath, JSON.stringify(session, null, 2), 'utf-8');
}
/**
* 删除指定会话
*/
async deleteSession(sessionId: string): Promise<boolean> {
const allSummaries: SessionSummary[] = [];
try {
const filePath = path.join(this.sessionsDir, `${sessionId}.json`);
await fs.unlink(filePath);
return true;
const projectDirs = await fs.readdir(this.sessionDir);
for (const projectId of projectDirs) {
const summaries = await this.listSessionsByProject(projectId);
allSummaries.push(...summaries);
}
// 按更新时间降序排列
return allSummaries.sort((a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime());
} catch {
return false;
return [];
}
}
@@ -172,7 +400,7 @@ export class SessionStorage {
* 清理旧会话(保留最近 N 个)
*/
async cleanupOldSessions(keepCount: number = 50): Promise<number> {
const sessions = await this.listSessions();
const sessions = await this.listAllSessions();
if (sessions.length <= keepCount) {
return 0;
}
@@ -181,8 +409,24 @@ export class SessionStorage {
let deletedCount = 0;
for (const session of toDelete) {
if (await this.deleteSession(session.id)) {
deletedCount++;
// 需要找到对应的 projectId
// 这里通过遍历项目目录来查找
try {
const projectDirs = await fs.readdir(this.sessionDir);
for (const projectId of projectDirs) {
const sessionFile = path.join(this.sessionDir, projectId, `${session.id}.json`);
try {
await fs.access(sessionFile);
if (await this.deleteSession(projectId, session.id)) {
deletedCount++;
}
break;
} catch {
// 不在这个项目目录,继续查找
}
}
} catch {
// 忽略错误
}
}
@@ -190,17 +434,10 @@ export class SessionStorage {
}
/**
* 从会话生成标题
* 从会话 ID 生成标题
*/
private generateTitle(session: SessionData): string {
// 从第一条用户消息生成标题
const firstUserMessage = session.messages.find((m) => m.role === 'user');
if (firstUserMessage && typeof firstUserMessage.content === 'string') {
const content = firstUserMessage.content;
// 取前 50 个字符
return content.length > 50 ? content.substring(0, 50) + '...' : content;
}
return `会话 ${session.id}`;
private generateTitleFromId(sessionId: string): string {
return `会话 ${sessionId}`;
}
/**