diff --git a/packages/core/src/core/agent.ts b/packages/core/src/core/agent.ts index d622bcc..3341eb9 100644 --- a/packages/core/src/core/agent.ts +++ b/packages/core/src/core/agent.ts @@ -23,6 +23,14 @@ import { getProviderRegistry, resolveApiKey } from '../provider/index.js'; import { getHookManager } from '../hooks/index.js'; import { getGitManager } from '../git/index.js'; +/** + * Agent.chat() 选项 + */ +export interface AgentChatOptions { + onStream?: (text: string) => void; + abortSignal?: AbortSignal; +} + export class Agent { private getModel: (model: string) => LanguageModel; private config: AgentConfig; @@ -310,10 +318,13 @@ export class Agent { /** * 发送消息并处理响应(流式) * @param userMessage 用户消息文本或包含图片的 UserInput - * @param onStream 流式输出回调 + * @param options 选项,包含 onStream 回调和 abortSignal * @returns ChatResult 包含最终文本和完整的响应消息链 */ - async chat(userMessage: string | UserInput, onStream?: (text: string) => void): Promise { + async chat(userMessage: string | UserInput, options?: AgentChatOptions | ((text: string) => void)): Promise { + // 兼容旧的 onStream 参数 + const opts: AgentChatOptions = typeof options === 'function' ? { onStream: options } : (options || {}); + const { onStream, abortSignal } = opts; // 处理带图片的消息 let processedMessage = userMessage; @@ -391,6 +402,7 @@ export class Agent { tools: vercelTools, maxOutputTokens: this.config.maxTokens, stopWhen: stepCountIs(10), // 允许最多 10 轮工具调用 + abortSignal, // 支持取消 onChunk: ({ chunk }) => { if (chunk.type === 'tool-call') { onStream(`\n[调用工具: ${chunk.toolName}]\n`); @@ -413,14 +425,50 @@ export class Agent { }); // 流式输出文本 - for await (const chunk of result.textStream) { - fullResponse += chunk; - onStream(chunk); - } + let aborted = false; + try { + for await (const chunk of result.textStream) { + // 检查是否已中止 + if (abortSignal?.aborted) { + aborted = true; + break; + } + fullResponse += chunk; + onStream(chunk); + } - // 等待完成并获取完整的响应消息(包括工具调用和结果) - const response = await result.response; - responseMessages = response.messages as ModelMessage[]; + // 如果是手动中止(通过 break 退出),保存已收到的内容 + if (aborted) { + onStream?.('\n[已取消]\n'); + if (fullResponse) { + this.conversationHistory.push({ + role: 'assistant', + content: fullResponse + '\n[已取消]', + } as ModelMessage); + await this.persistSession(); + } + return { text: fullResponse, messages: [] }; + } + + // 等待完成并获取完整的响应消息(包括工具调用和结果) + const response = await result.response; + responseMessages = response.messages as ModelMessage[]; + } catch (error) { + // 如果是中止错误(AbortController.abort() 抛出),优雅处理 + if (error instanceof Error && (error.name === 'AbortError' || abortSignal?.aborted)) { + onStream?.('\n[已取消]\n'); + // 取消时也要保存已收到的内容 + if (fullResponse) { + this.conversationHistory.push({ + role: 'assistant', + content: fullResponse + '\n[已取消]', + } as ModelMessage); + await this.persistSession(); + } + return { text: fullResponse, messages: [] }; + } + throw error; + } } else { // 非流式模式 const result = await generateText({ @@ -430,6 +478,7 @@ export class Agent { tools: vercelTools, maxOutputTokens: this.config.maxTokens, stopWhen: stepCountIs(10), // 允许最多 10 轮工具调用 + abortSignal, // 支持取消 }); fullResponse = result.text; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 1718cc2..08ed199 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,4 +1,5 @@ export { Agent } from './core/agent.js'; +export type { AgentChatOptions } from './core/agent.js'; export { toolRegistry, todoManager, initTaskContext, updateTaskDescription, updateSkillDescription } from './tools/index.js'; export { loadConfig, saveConfig, getConfig, loadVisionConfig, ConfigurationError } from './utils/config.js'; export type { VisionConfig } from './utils/config.js'; diff --git a/packages/server/src/agent/adapter.ts b/packages/server/src/agent/adapter.ts index 0b79940..031a2e1 100644 --- a/packages/server/src/agent/adapter.ts +++ b/packages/server/src/agent/adapter.ts @@ -67,13 +67,21 @@ interface SessionManagerConstructor { new (): SessionManagerInstance; } +/** + * Chat 选项接口 + */ +interface ChatOptions { + onStream?: (chunk: string) => void; + abortSignal?: AbortSignal; +} + /** * Agent 实例接口 */ interface AgentInstance { setRegistry(registry: unknown): void; setSessionManager(manager: SessionManagerInstance): void; - chat(message: string, onStream?: (chunk: string) => void): Promise; + chat(message: string, options?: ChatOptions): Promise; getToolCount(): { core: number; discovered: number; total: number }; getContextUsageFormatted(): string; getContextUsage(): TokenUsage; @@ -149,6 +157,9 @@ const agentCache: Map = new Map(); // SessionManager 实例缓存(每个 session 一个) const sessionManagerCache: Map = new Map(); +// AbortController 缓存(每个 session 一个,用于取消正在进行的请求) +const abortControllerCache: Map = new Map(); + // 配置错误缓存(用于向客户端返回友好错误) let lastConfigError: { provider: string; message: string } | null = null; @@ -313,6 +324,16 @@ export async function destroyAgent(sessionId: string): Promise { export async function processMessage(sessionId: string, content: string): Promise { const sessionManager = getSessionManager(); + // 取消之前可能存在的请求 + const existingController = abortControllerCache.get(sessionId); + if (existingController) { + existingController.abort(); + } + + // 创建新的 AbortController + const abortController = new AbortController(); + abortControllerCache.set(sessionId, abortController); + // 更新状态 sessionManager.updateStatus(sessionId, 'busy' as SessionStatus); emitStatusEvent(sessionId, 'processing', { message: '正在处理...' }); @@ -338,6 +359,7 @@ export async function processMessage(sessionId: string, content: string): Promis emitLogEvent(sessionId, 'error', `配置错误: ${lastConfigError.message}`); sessionManager.updateStatus(sessionId, 'idle' as SessionStatus); emitStatusEvent(sessionId, 'idle'); + abortControllerCache.delete(sessionId); return; } @@ -357,26 +379,33 @@ export async function processMessage(sessionId: string, content: string): Promis sessionManager.updateStatus(sessionId, 'idle' as SessionStatus); emitStatusEvent(sessionId, 'idle'); + abortControllerCache.delete(sessionId); return; } try { - // 调用 Agent 的 chat 方法,使用流式回调 - const result = await agent.chat(content, (chunk: string) => { - // 推送流式内容 - broadcastToSession(sessionId, { - type: 'chunk', - sessionId, - payload: { content: chunk }, - }); + // 调用 Agent 的 chat 方法,使用流式回调和 AbortSignal + const result = await agent.chat(content, { + onStream: (chunk: string) => { + // 检查是否已取消 + if (abortController.signal.aborted) return; - // 检测工具调用 - if (chunk.includes('[调用工具:')) { - const match = chunk.match(/\[调用工具: (.+?)\]/); - if (match) { - emitLogEvent(sessionId, 'info', `调用工具: ${match[1]}`); + // 推送流式内容 + broadcastToSession(sessionId, { + type: 'chunk', + sessionId, + payload: { content: chunk }, + }); + + // 检测工具调用 + if (chunk.includes('[调用工具:')) { + const match = chunk.match(/\[调用工具: (.+?)\]/); + if (match) { + emitLogEvent(sessionId, 'info', `调用工具: ${match[1]}`); + } } - } + }, + abortSignal: abortController.signal, }); // 消息已由 Core Agent 自动持久化,这里只更新 Server 端的会话计数 @@ -429,6 +458,8 @@ export async function processMessage(sessionId: string, content: string): Promis emitLogEvent(sessionId, 'error', errorMessage); } finally { + // 清理 AbortController + abortControllerCache.delete(sessionId); sessionManager.updateStatus(sessionId, 'idle' as SessionStatus); } } @@ -437,11 +468,23 @@ export async function processMessage(sessionId: string, content: string): Promis * 取消正在进行的处理 */ export function cancelProcessing(sessionId: string): void { - // TODO: 实现取消逻辑 - // 目前 AI SDK 的 streamText 不支持取消 + // 获取并中止 AbortController + const controller = abortControllerCache.get(sessionId); + if (controller) { + controller.abort(); + abortControllerCache.delete(sessionId); + } + const sessionManager = getSessionManager(); sessionManager.updateStatus(sessionId, 'idle' as SessionStatus); emitStatusEvent(sessionId, 'cancelled'); + + // 通知客户端已取消 + broadcastToSession(sessionId, { + type: 'cancelled', + sessionId, + payload: { message: '用户取消了请求' }, + }); } /**