feat(agent): 实现流式输出取消功能
- 添加 AbortController 管理,支持取消正在进行的请求 - Agent.chat() 新增 abortSignal 参数,传递给 streamText/generateText - 取消时保存用户消息和 AI 已输出的部分内容 - cancelProcessing 实际调用 abort() 中止流式请求
This commit is contained in:
@@ -23,6 +23,14 @@ import { getProviderRegistry, resolveApiKey } from '../provider/index.js';
|
|||||||
import { getHookManager } from '../hooks/index.js';
|
import { getHookManager } from '../hooks/index.js';
|
||||||
import { getGitManager } from '../git/index.js';
|
import { getGitManager } from '../git/index.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Agent.chat() 选项
|
||||||
|
*/
|
||||||
|
export interface AgentChatOptions {
|
||||||
|
onStream?: (text: string) => void;
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
|
}
|
||||||
|
|
||||||
export class Agent {
|
export class Agent {
|
||||||
private getModel: (model: string) => LanguageModel;
|
private getModel: (model: string) => LanguageModel;
|
||||||
private config: AgentConfig;
|
private config: AgentConfig;
|
||||||
@@ -310,10 +318,13 @@ export class Agent {
|
|||||||
/**
|
/**
|
||||||
* 发送消息并处理响应(流式)
|
* 发送消息并处理响应(流式)
|
||||||
* @param userMessage 用户消息文本或包含图片的 UserInput
|
* @param userMessage 用户消息文本或包含图片的 UserInput
|
||||||
* @param onStream 流式输出回调
|
* @param options 选项,包含 onStream 回调和 abortSignal
|
||||||
* @returns ChatResult 包含最终文本和完整的响应消息链
|
* @returns ChatResult 包含最终文本和完整的响应消息链
|
||||||
*/
|
*/
|
||||||
async chat(userMessage: string | UserInput, onStream?: (text: string) => void): Promise<ChatResult> {
|
async chat(userMessage: string | UserInput, options?: AgentChatOptions | ((text: string) => void)): Promise<ChatResult> {
|
||||||
|
// 兼容旧的 onStream 参数
|
||||||
|
const opts: AgentChatOptions = typeof options === 'function' ? { onStream: options } : (options || {});
|
||||||
|
const { onStream, abortSignal } = opts;
|
||||||
// 处理带图片的消息
|
// 处理带图片的消息
|
||||||
let processedMessage = userMessage;
|
let processedMessage = userMessage;
|
||||||
|
|
||||||
@@ -391,6 +402,7 @@ export class Agent {
|
|||||||
tools: vercelTools,
|
tools: vercelTools,
|
||||||
maxOutputTokens: this.config.maxTokens,
|
maxOutputTokens: this.config.maxTokens,
|
||||||
stopWhen: stepCountIs(10), // 允许最多 10 轮工具调用
|
stopWhen: stepCountIs(10), // 允许最多 10 轮工具调用
|
||||||
|
abortSignal, // 支持取消
|
||||||
onChunk: ({ chunk }) => {
|
onChunk: ({ chunk }) => {
|
||||||
if (chunk.type === 'tool-call') {
|
if (chunk.type === 'tool-call') {
|
||||||
onStream(`\n[调用工具: ${chunk.toolName}]\n`);
|
onStream(`\n[调用工具: ${chunk.toolName}]\n`);
|
||||||
@@ -413,14 +425,50 @@ export class Agent {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// 流式输出文本
|
// 流式输出文本
|
||||||
for await (const chunk of result.textStream) {
|
let aborted = false;
|
||||||
fullResponse += chunk;
|
try {
|
||||||
onStream(chunk);
|
for await (const chunk of result.textStream) {
|
||||||
}
|
// 检查是否已中止
|
||||||
|
if (abortSignal?.aborted) {
|
||||||
|
aborted = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
fullResponse += chunk;
|
||||||
|
onStream(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
// 等待完成并获取完整的响应消息(包括工具调用和结果)
|
// 如果是手动中止(通过 break 退出),保存已收到的内容
|
||||||
const response = await result.response;
|
if (aborted) {
|
||||||
responseMessages = response.messages as ModelMessage[];
|
onStream?.('\n[已取消]\n');
|
||||||
|
if (fullResponse) {
|
||||||
|
this.conversationHistory.push({
|
||||||
|
role: 'assistant',
|
||||||
|
content: fullResponse + '\n[已取消]',
|
||||||
|
} as ModelMessage);
|
||||||
|
await this.persistSession();
|
||||||
|
}
|
||||||
|
return { text: fullResponse, messages: [] };
|
||||||
|
}
|
||||||
|
|
||||||
|
// 等待完成并获取完整的响应消息(包括工具调用和结果)
|
||||||
|
const response = await result.response;
|
||||||
|
responseMessages = response.messages as ModelMessage[];
|
||||||
|
} catch (error) {
|
||||||
|
// 如果是中止错误(AbortController.abort() 抛出),优雅处理
|
||||||
|
if (error instanceof Error && (error.name === 'AbortError' || abortSignal?.aborted)) {
|
||||||
|
onStream?.('\n[已取消]\n');
|
||||||
|
// 取消时也要保存已收到的内容
|
||||||
|
if (fullResponse) {
|
||||||
|
this.conversationHistory.push({
|
||||||
|
role: 'assistant',
|
||||||
|
content: fullResponse + '\n[已取消]',
|
||||||
|
} as ModelMessage);
|
||||||
|
await this.persistSession();
|
||||||
|
}
|
||||||
|
return { text: fullResponse, messages: [] };
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// 非流式模式
|
// 非流式模式
|
||||||
const result = await generateText({
|
const result = await generateText({
|
||||||
@@ -430,6 +478,7 @@ export class Agent {
|
|||||||
tools: vercelTools,
|
tools: vercelTools,
|
||||||
maxOutputTokens: this.config.maxTokens,
|
maxOutputTokens: this.config.maxTokens,
|
||||||
stopWhen: stepCountIs(10), // 允许最多 10 轮工具调用
|
stopWhen: stepCountIs(10), // 允许最多 10 轮工具调用
|
||||||
|
abortSignal, // 支持取消
|
||||||
});
|
});
|
||||||
|
|
||||||
fullResponse = result.text;
|
fullResponse = result.text;
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
export { Agent } from './core/agent.js';
|
export { Agent } from './core/agent.js';
|
||||||
|
export type { AgentChatOptions } from './core/agent.js';
|
||||||
export { toolRegistry, todoManager, initTaskContext, updateTaskDescription, updateSkillDescription } from './tools/index.js';
|
export { toolRegistry, todoManager, initTaskContext, updateTaskDescription, updateSkillDescription } from './tools/index.js';
|
||||||
export { loadConfig, saveConfig, getConfig, loadVisionConfig, ConfigurationError } from './utils/config.js';
|
export { loadConfig, saveConfig, getConfig, loadVisionConfig, ConfigurationError } from './utils/config.js';
|
||||||
export type { VisionConfig } from './utils/config.js';
|
export type { VisionConfig } from './utils/config.js';
|
||||||
|
|||||||
@@ -67,13 +67,21 @@ interface SessionManagerConstructor {
|
|||||||
new (): SessionManagerInstance;
|
new (): SessionManagerInstance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chat 选项接口
|
||||||
|
*/
|
||||||
|
interface ChatOptions {
|
||||||
|
onStream?: (chunk: string) => void;
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Agent 实例接口
|
* Agent 实例接口
|
||||||
*/
|
*/
|
||||||
interface AgentInstance {
|
interface AgentInstance {
|
||||||
setRegistry(registry: unknown): void;
|
setRegistry(registry: unknown): void;
|
||||||
setSessionManager(manager: SessionManagerInstance): void;
|
setSessionManager(manager: SessionManagerInstance): void;
|
||||||
chat(message: string, onStream?: (chunk: string) => void): Promise<ChatResult>;
|
chat(message: string, options?: ChatOptions): Promise<ChatResult>;
|
||||||
getToolCount(): { core: number; discovered: number; total: number };
|
getToolCount(): { core: number; discovered: number; total: number };
|
||||||
getContextUsageFormatted(): string;
|
getContextUsageFormatted(): string;
|
||||||
getContextUsage(): TokenUsage;
|
getContextUsage(): TokenUsage;
|
||||||
@@ -149,6 +157,9 @@ const agentCache: Map<string, AgentInstance> = new Map();
|
|||||||
// SessionManager 实例缓存(每个 session 一个)
|
// SessionManager 实例缓存(每个 session 一个)
|
||||||
const sessionManagerCache: Map<string, SessionManagerInstance> = new Map();
|
const sessionManagerCache: Map<string, SessionManagerInstance> = new Map();
|
||||||
|
|
||||||
|
// AbortController 缓存(每个 session 一个,用于取消正在进行的请求)
|
||||||
|
const abortControllerCache: Map<string, AbortController> = new Map();
|
||||||
|
|
||||||
// 配置错误缓存(用于向客户端返回友好错误)
|
// 配置错误缓存(用于向客户端返回友好错误)
|
||||||
let lastConfigError: { provider: string; message: string } | null = null;
|
let lastConfigError: { provider: string; message: string } | null = null;
|
||||||
|
|
||||||
@@ -313,6 +324,16 @@ export async function destroyAgent(sessionId: string): Promise<void> {
|
|||||||
export async function processMessage(sessionId: string, content: string): Promise<void> {
|
export async function processMessage(sessionId: string, content: string): Promise<void> {
|
||||||
const sessionManager = getSessionManager();
|
const sessionManager = getSessionManager();
|
||||||
|
|
||||||
|
// 取消之前可能存在的请求
|
||||||
|
const existingController = abortControllerCache.get(sessionId);
|
||||||
|
if (existingController) {
|
||||||
|
existingController.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建新的 AbortController
|
||||||
|
const abortController = new AbortController();
|
||||||
|
abortControllerCache.set(sessionId, abortController);
|
||||||
|
|
||||||
// 更新状态
|
// 更新状态
|
||||||
sessionManager.updateStatus(sessionId, 'busy' as SessionStatus);
|
sessionManager.updateStatus(sessionId, 'busy' as SessionStatus);
|
||||||
emitStatusEvent(sessionId, 'processing', { message: '正在处理...' });
|
emitStatusEvent(sessionId, 'processing', { message: '正在处理...' });
|
||||||
@@ -338,6 +359,7 @@ export async function processMessage(sessionId: string, content: string): Promis
|
|||||||
emitLogEvent(sessionId, 'error', `配置错误: ${lastConfigError.message}`);
|
emitLogEvent(sessionId, 'error', `配置错误: ${lastConfigError.message}`);
|
||||||
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
||||||
emitStatusEvent(sessionId, 'idle');
|
emitStatusEvent(sessionId, 'idle');
|
||||||
|
abortControllerCache.delete(sessionId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -357,26 +379,33 @@ export async function processMessage(sessionId: string, content: string): Promis
|
|||||||
|
|
||||||
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
||||||
emitStatusEvent(sessionId, 'idle');
|
emitStatusEvent(sessionId, 'idle');
|
||||||
|
abortControllerCache.delete(sessionId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 调用 Agent 的 chat 方法,使用流式回调
|
// 调用 Agent 的 chat 方法,使用流式回调和 AbortSignal
|
||||||
const result = await agent.chat(content, (chunk: string) => {
|
const result = await agent.chat(content, {
|
||||||
// 推送流式内容
|
onStream: (chunk: string) => {
|
||||||
broadcastToSession(sessionId, {
|
// 检查是否已取消
|
||||||
type: 'chunk',
|
if (abortController.signal.aborted) return;
|
||||||
sessionId,
|
|
||||||
payload: { content: chunk },
|
|
||||||
});
|
|
||||||
|
|
||||||
// 检测工具调用
|
// 推送流式内容
|
||||||
if (chunk.includes('[调用工具:')) {
|
broadcastToSession(sessionId, {
|
||||||
const match = chunk.match(/\[调用工具: (.+?)\]/);
|
type: 'chunk',
|
||||||
if (match) {
|
sessionId,
|
||||||
emitLogEvent(sessionId, 'info', `调用工具: ${match[1]}`);
|
payload: { content: chunk },
|
||||||
|
});
|
||||||
|
|
||||||
|
// 检测工具调用
|
||||||
|
if (chunk.includes('[调用工具:')) {
|
||||||
|
const match = chunk.match(/\[调用工具: (.+?)\]/);
|
||||||
|
if (match) {
|
||||||
|
emitLogEvent(sessionId, 'info', `调用工具: ${match[1]}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
abortSignal: abortController.signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
// 消息已由 Core Agent 自动持久化,这里只更新 Server 端的会话计数
|
// 消息已由 Core Agent 自动持久化,这里只更新 Server 端的会话计数
|
||||||
@@ -429,6 +458,8 @@ export async function processMessage(sessionId: string, content: string): Promis
|
|||||||
|
|
||||||
emitLogEvent(sessionId, 'error', errorMessage);
|
emitLogEvent(sessionId, 'error', errorMessage);
|
||||||
} finally {
|
} finally {
|
||||||
|
// 清理 AbortController
|
||||||
|
abortControllerCache.delete(sessionId);
|
||||||
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -437,11 +468,23 @@ export async function processMessage(sessionId: string, content: string): Promis
|
|||||||
* 取消正在进行的处理
|
* 取消正在进行的处理
|
||||||
*/
|
*/
|
||||||
export function cancelProcessing(sessionId: string): void {
|
export function cancelProcessing(sessionId: string): void {
|
||||||
// TODO: 实现取消逻辑
|
// 获取并中止 AbortController
|
||||||
// 目前 AI SDK 的 streamText 不支持取消
|
const controller = abortControllerCache.get(sessionId);
|
||||||
|
if (controller) {
|
||||||
|
controller.abort();
|
||||||
|
abortControllerCache.delete(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
const sessionManager = getSessionManager();
|
const sessionManager = getSessionManager();
|
||||||
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
sessionManager.updateStatus(sessionId, 'idle' as SessionStatus);
|
||||||
emitStatusEvent(sessionId, 'cancelled');
|
emitStatusEvent(sessionId, 'cancelled');
|
||||||
|
|
||||||
|
// 通知客户端已取消
|
||||||
|
broadcastToSession(sessionId, {
|
||||||
|
type: 'cancelled',
|
||||||
|
sessionId,
|
||||||
|
payload: { message: '用户取消了请求' },
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user