package leaderloop import ( "context" "encoding/json" "os" "path/filepath" "strings" "testing" "time" "inbox/internal/app/runtimecodex" "inbox/internal/app/runtimeconfig" "inbox/internal/app/workspaceruntime" "inbox/internal/base/timeutil" "inbox/internal/domain/lane" "inbox/internal/domain/message" "inbox/internal/domain/role" "inbox/internal/domain/task" "inbox/internal/domain/topic" "inbox/internal/domain/workflow" "inbox/internal/domain/workspace" sqlitestore "inbox/internal/store/sqlite" ) func TestProcessOnceConsumesLeaderMessageAndCreatesLaneTasks(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 4, 0, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "user", ToExpr: "leader", Type: message.TypeChat, Stage: "plan", BodyMarkdown: "Build a todo app.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } fakeRuntime := &fakeWorkspaceRuntime{workspace: ws} runner := &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"Leader created the first execution lane.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"I split the work into a UI graph and will wait for confirmation.","type":"chat"}, "tasks":[ {"key":"design","title":"Design the UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]}, {"key":"verify","title":"Verify the UI","body_markdown":"Verify the UI flow.","acceptance_markdown":"Verification complete.","kind":"verification","deliverables":["reports/ui-check.md"],"priority":5,"task_order":2,"depends_on":["design"]} ] }`, }} service := NewService( store, runtimeconfig.NewService(store, store, clock), fakeRuntime, runner, clock, "", ) result, err := service.ProcessOnce(ctx) if err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if result == nil { t.Fatal("expected leader result") } if len(result.Lanes) != 1 || result.Lanes[0].Name != "Design the UI" { t.Fatalf("unexpected lanes: %#v", result.Lanes) } if len(result.Tasks) != 2 { t.Fatalf("unexpected tasks: %#v", result.Tasks) } if fakeRuntime.startedLaneID != "" { t.Fatalf("expected initial graph to wait for confirmation, got started lane %q", fakeRuntime.startedLaneID) } if result.Reply == nil || result.Reply.ToExpr != "user" { t.Fatalf("expected reply to user, got %#v", result.Reply) } if !strings.Contains(runner.prompt, "## Skills") || !strings.Contains(runner.prompt, "Use Inbox V2") { t.Fatalf("expected leader prompt to include bound skills, got %q", runner.prompt) } if !strings.Contains(result.Run.CommandJSON, `"planning"`) || !strings.Contains(result.Run.CommandJSON, `"execution_mode":"plan_only"`) { t.Fatalf("expected planning payload in command_json, got %s", result.Run.CommandJSON) } tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListTasksByTopic() error = %v", err) } if len(tasks) != 2 { t.Fatalf("expected 2 tasks in store, got %d", len(tasks)) } if tasks[0].Status != task.StatusReady || tasks[1].Status != task.StatusDraft { t.Fatalf("unexpected task statuses: %#v", tasks) } updatedTopic, err := store.GetTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("GetTopic() error = %v", err) } if updatedTopic.Status != "awaiting_confirmation" { t.Fatalf("expected awaiting_confirmation topic, got %#v", updatedTopic) } graphVersion, err := store.GetLatestTaskGraphVersionByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("GetLatestTaskGraphVersionByTopic() error = %v", err) } if graphVersion.Status != "draft" { t.Fatalf("expected draft graph version, got %#v", graphVersion) } } func TestParseLeaderOutputRejectsInitialFreezeStartNodes(t *testing.T) { _, err := parseLeaderOutput(`{ "plan_summary_markdown":"Draft the initial graph.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"Please confirm the graph first.","type":"summary"}, "tasks":[ {"key":"build","title":"Build app","body_markdown":"Implement app.","acceptance_markdown":"App works.","kind":"execution","deliverables":["apps/web"],"priority":1,"task_order":1,"depends_on":[]} ], "start_nodes":["app"] }`, true) if err == nil || !strings.Contains(err.Error(), `unknown field "start_nodes"`) { t.Fatalf("expected initial freeze start_nodes error, got %v", err) } } func TestLeaderOutputSchemaRequiresEveryDeclaredProperty(t *testing.T) { for _, initialFreeze := range []bool{true, false} { schemaJSON := leaderOutputSchema(initialFreeze) var schema map[string]any if err := json.Unmarshal([]byte(schemaJSON), &schema); err != nil { t.Fatalf("json.Unmarshal(schema) error = %v", err) } assertSchemaRequiredMatchesProperties(t, schema) properties := schema["properties"].(map[string]any) tasks := properties["tasks"].(map[string]any) items := tasks["items"].(map[string]any) assertSchemaRequiredMatchesProperties(t, items) leaderReply := properties["leader_reply"].(map[string]any) assertSchemaRequiredMatchesProperties(t, leaderReply) } } func TestProcessOnceReusesExistingLaneOnWorkerFollowUp(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 4, 30, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) _, err = store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "UI Chain", Slug: "ui-chain", Status: lane.StatusBlocked, CreatedByRoleName: "leader", }) if err != nil { t.Fatalf("CreateLane() error = %v", err) } if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "worker", ToExpr: "leader", Type: message.TypeSummary, Stage: "execution", BodyMarkdown: "UI chain failed the first task.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } fakeRuntime := &fakeWorkspaceRuntime{workspace: ws} service := NewService( store, runtimeconfig.NewService(store, store, clock), fakeRuntime, &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"Reused the UI chain and queued a retry task.", "plan_mode":"patch", "replan_reason":"UI chain failed and needs a focused retry.", "execution_mode":"plan_and_start", "leader_reply":{"markdown":"继续在现有图上修复。","type":"decision"}, "tasks":[ {"key":"retry-ui","title":"修复 UI 初始化失败","body_markdown":"在现有 UI 图上修复失败原因。","acceptance_markdown":"UI graph 恢复可执行。","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":3,"depends_on":[]} ] }`, }}, clock, "", ) result, err := service.ProcessOnce(ctx) if err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if result == nil { t.Fatal("expected leader result") } if fakeRuntime.startedLaneID == "" { t.Fatalf("expected a derived lane to be started, got none") } lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListLanesByTopic() error = %v", err) } if len(lanes) != 2 { t.Fatalf("expected existing blocked lane plus one derived execution lane, got %#v", lanes) } tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListTasksByTopic() error = %v", err) } if len(tasks) != 1 { t.Fatalf("expected retry task on existing chain, got %#v", tasks) } } func TestParseLeaderOutputRejectsInvalidPlanOnlyStartNodes(t *testing.T) { _, err := parseLeaderOutput(`{ "plan_summary_markdown":"Planned only.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"先只输出计划。","type":"summary"}, "tasks":[ {"key":"design","title":"Design the UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]} ], "start_nodes":["ui"] }`, false) if err == nil || !strings.Contains(err.Error(), `unknown field "start_nodes"`) { t.Fatalf("expected invalid plan_only start_nodes error, got %v", err) } } func TestParseLeaderOutputRejectsMissingDeliverables(t *testing.T) { _, err := parseLeaderOutput(`{ "plan_summary_markdown":"Plan.", "plan_mode":"initial", "execution_mode":"plan_and_start", "leader_reply":{"markdown":"开始执行。","type":"decision"}, "tasks":[ {"key":"design","title":"Design the UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":[],"priority":10,"task_order":1,"depends_on":[]} ] }`, false) if err == nil || !strings.Contains(err.Error(), "must declare deliverables") { t.Fatalf("expected missing deliverables error, got %v", err) } } func TestParseLeaderOutputRejectsDuplicateTaskKeys(t *testing.T) { _, err := parseLeaderOutput(`{ "plan_summary_markdown":"Plan.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"先看计划。","type":"summary"}, "tasks":[ {"key":"ui","title":"Build UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":["apps/web/src"],"depends_on":[]}, {"key":"ui","title":"Verify UI","body_markdown":"Verify the React UI.","acceptance_markdown":"Verification complete.","kind":"verification","deliverables":["reports/ui-check.md"],"depends_on":["ui"]} ] }`, true) if err == nil || !strings.Contains(err.Error(), `must be unique`) { t.Fatalf("expected duplicate task key error, got %v", err) } } func TestParseLeaderOutputAllowsMilestoneWithoutDeliverables(t *testing.T) { output, err := parseLeaderOutput(`{ "plan_summary_markdown":"Plan with milestone.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"先看计划。","type":"summary"}, "tasks":[ {"key":"ui","title":"Build UI","body_markdown":"Build the UI.","acceptance_markdown":"UI works.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]}, {"key":"ready-for-demo","title":"Ready for Demo","body_markdown":"Aggregate completion.","acceptance_markdown":"Milestone reached.","kind":"milestone","priority":5,"task_order":2,"depends_on":["ui"]} ] }`, true) if err != nil { t.Fatalf("parseLeaderOutput() error = %v", err) } if len(output.Tasks) != 2 || output.Tasks[1].Kind != "milestone" { t.Fatalf("expected milestone task, got %#v", output.Tasks) } } func TestProcessOnceKeepsMilestoneOnExecutionLane(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 4, 45, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "user", ToExpr: "leader", Type: message.TypeChat, Stage: "plan", BodyMarkdown: "Build UI and mark the review milestone.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } service := NewService( store, runtimeconfig.NewService(store, store, clock), &fakeWorkspaceRuntime{workspace: ws}, &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"One execution lane plus milestone.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"先确认这个图。","type":"summary"}, "tasks":[ {"key":"ui","title":"Build UI","body_markdown":"Build the UI.","acceptance_markdown":"UI works.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]}, {"key":"ready-for-demo","title":"Ready for Demo","body_markdown":"Aggregate completion.","acceptance_markdown":"Milestone reached.","kind":"milestone","priority":5,"task_order":2,"depends_on":["ui"]} ] }`, }}, clock, "", ) result, err := service.ProcessOnce(ctx) if err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if result == nil { t.Fatal("expected leader result") } lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListLanesByTopic() error = %v", err) } if len(lanes) != 1 { t.Fatalf("expected milestone to reuse the execution lane, got %#v", lanes) } tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListTasksByTopic() error = %v", err) } if len(tasks) != 2 || tasks[0].LaneID != tasks[1].LaneID { t.Fatalf("expected milestone and execution task to share a lane, got %#v", tasks) } } func TestProcessOnceStartsOnlyGateLanesInitially(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 5, 0, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "user", ToExpr: "leader", Type: message.TypeChat, Stage: "plan", BodyMarkdown: "Build a gated todo app.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } fakeRuntime := &fakeWorkspaceRuntime{workspace: ws} service := NewService( store, runtimeconfig.NewService(store, store, clock), fakeRuntime, &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"Start with foundation gating.", "plan_mode":"initial", "execution_mode":"plan_only", "leader_reply":{"markdown":"先跑基础检查。","type":"decision"}, "tasks":[ {"key":"gate","title":"Check workspace baseline","body_markdown":"Validate setup.","acceptance_markdown":"Setup is valid.","kind":"gate","deliverables":["reports/gate.md"],"priority":10,"task_order":1,"depends_on":[]}, {"key":"ui","title":"Build UI","body_markdown":"Implement the UI.","acceptance_markdown":"UI works.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]} ] }`, }}, clock, "", ) result, err := service.ProcessOnce(ctx) if err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if result == nil { t.Fatal("expected result") } lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListLanesByTopic() error = %v", err) } tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListTasksByTopic() error = %v", err) } var gateTask, uiTask task.Record for _, item := range tasks { if item.Kind == task.KindGate { gateTask = item } if item.Title == "Build UI" { uiTask = item } } if len(lanes) != 1 { t.Fatalf("expected gate task to reuse the execution lane, got %#v", lanes) } if gateTask.LaneID == "" || gateTask.LaneID != uiTask.LaneID { t.Fatalf("expected gate and execution task to share one lane, got gate=%#v ui=%#v", gateTask, uiTask) } if gateTask.Status != task.StatusReady { t.Fatalf("expected gate task ready, got %#v", gateTask) } if uiTask.Status != task.StatusDraft { t.Fatalf("expected non-gate task blocked behind gate, got %#v", uiTask) } if len(fakeRuntime.startedLaneIDs) != 0 { t.Fatalf("expected initial gated plan to wait for confirmation, got %#v", fakeRuntime.startedLaneIDs) } updatedTopic, err := store.GetTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("GetTopic() error = %v", err) } if updatedTopic.Status != "awaiting_confirmation" { t.Fatalf("expected awaiting_confirmation topic, got %#v lanes=%#v", updatedTopic, lanes) } } func TestProcessOncePlanOnlyDoesNotAutoStartReadyLanesAfterGateSuccess(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 5, 30, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) topicRecord.Status = "execution" if _, err := store.UpdateTopic(ctx, topicRecord); err != nil { t.Fatalf("UpdateTopic() error = %v", err) } gateChain, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Foundation", Slug: "foundation", Status: lane.StatusSucceeded, BranchName: "lane/todo/foundation", WorktreePath: filepath.Join(t.TempDir(), "foundation"), ContainerName: "lane-foundation-test", CreatedByRoleName: "leader", }) if err != nil { t.Fatalf("CreateLane(gate) error = %v", err) } frontendChain, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Frontend", Slug: "frontend", Status: lane.StatusReady, BranchName: "lane/todo/frontend", WorktreePath: filepath.Join(t.TempDir(), "frontend"), ContainerName: "lane-frontend-test", CreatedByRoleName: "leader", }) if err != nil { t.Fatalf("CreateLane(frontend) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: gateChain.ID, Title: "Check workspace baseline", BodyMarkdown: "Validate setup.", Kind: task.KindGate, Status: task.StatusSucceeded, Priority: 10, TaskOrder: 1, CreatedByRoleName: "leader", }, nil); err != nil { t.Fatalf("CreateTask(gate) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: frontendChain.ID, Title: "Build UI", BodyMarkdown: "Implement UI.", Kind: task.KindExecution, Status: task.StatusReady, Priority: 10, TaskOrder: 1, CreatedByRoleName: "leader", }, nil); err != nil { t.Fatalf("CreateTask(frontend) error = %v", err) } if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "worker", ToExpr: "leader", Type: message.TypeSummary, Stage: "execution", BodyMarkdown: "Foundation gate completed successfully. Keep the next lane stopped for now.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } fakeRuntime := &fakeWorkspaceRuntime{workspace: ws} service := NewService( store, runtimeconfig.NewService(store, store, clock), fakeRuntime, &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"Gate passed; continue existing plan.", "plan_mode":"patch", "replan_reason":"Gate completed successfully; continue existing graph.", "execution_mode":"plan_only", "leader_reply":{"markdown":"基础检查通过,继续后续链路。","type":"decision"}, "tasks":[] }`, }}, clock, "", ) result, err := service.ProcessOnce(ctx) if err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if result == nil { t.Fatal("expected result") } if len(fakeRuntime.startedLaneIDs) != 0 { t.Fatalf("expected plan_only to keep lanes stopped, got %#v", fakeRuntime.startedLaneIDs) } } func TestProcessOncePlanAndStartAutoStartsReadyLanesAfterGateSuccess(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 5, 35, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) topicRecord.Status = "execution" if _, err := store.UpdateTopic(ctx, topicRecord); err != nil { t.Fatalf("UpdateTopic() error = %v", err) } gateChain, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Foundation", Slug: "foundation", Status: lane.StatusSucceeded, BranchName: "lane/todo/foundation", WorktreePath: filepath.Join(t.TempDir(), "foundation"), ContainerName: "lane-foundation-test", CreatedByRoleName: "leader", }) if err != nil { t.Fatalf("CreateLane(gate) error = %v", err) } frontendChain, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Frontend", Slug: "frontend", Status: lane.StatusReady, BranchName: "lane/todo/frontend", WorktreePath: filepath.Join(t.TempDir(), "frontend"), ContainerName: "lane-frontend-test", CreatedByRoleName: "leader", }) if err != nil { t.Fatalf("CreateLane(frontend) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: gateChain.ID, Title: "Check workspace baseline", BodyMarkdown: "Validate setup.", Kind: task.KindGate, Status: task.StatusSucceeded, Priority: 10, TaskOrder: 1, CreatedByRoleName: "leader", }, nil); err != nil { t.Fatalf("CreateTask(gate) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: frontendChain.ID, Title: "Build UI", BodyMarkdown: "Implement UI.", Kind: task.KindExecution, Status: task.StatusReady, Priority: 10, TaskOrder: 1, CreatedByRoleName: "leader", }, nil); err != nil { t.Fatalf("CreateTask(frontend) error = %v", err) } if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "worker", ToExpr: "leader", Type: message.TypeSummary, Stage: "execution", BodyMarkdown: "Foundation gate completed successfully. Start the next ready lane.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } fakeRuntime := &fakeWorkspaceRuntime{workspace: ws} service := NewService( store, runtimeconfig.NewService(store, store, clock), fakeRuntime, &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"Gate passed; continue existing plan.", "plan_mode":"patch", "replan_reason":"Gate completed successfully; continue existing graph.", "execution_mode":"plan_and_start", "leader_reply":{"markdown":"基础检查通过,继续后续链路。","type":"decision"}, "tasks":[] }`, }}, clock, "", ) result, err := service.ProcessOnce(ctx) if err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if result == nil { t.Fatal("expected result") } if len(fakeRuntime.startedLaneIDs) != 1 || fakeRuntime.startedLaneIDs[0] != frontendChain.ID { t.Fatalf("expected auto-start of frontend lane, got %#v", fakeRuntime.startedLaneIDs) } } func TestProcessOnceUsesResolvedLeaderPromptOverrides(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 6, 0, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock) if _, err := store.UpsertRolePrompt(ctx, role.Prompt{ RoleName: "leader", WorkspaceID: ws.ID, PromptKind: role.PromptSystem, ContentMarkdown: "你是自定义 leader。\n\n优先把范围压缩到最小可执行图。", }, "test"); err != nil { t.Fatalf("UpsertRolePrompt() error = %v", err) } if _, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "user", ToExpr: "leader", Type: message.TypeChat, Stage: "plan", BodyMarkdown: "Build a todo app.", }); err != nil { t.Fatalf("CreateMessage() error = %v", err) } runner := &fakeRunner{result: RunResult{ Status: workflow.RunStatusSucceeded, ResultJSON: `{ "plan_summary_markdown":"Need clarification first.", "plan_mode":"initial", "execution_mode":"clarify", "leader_reply":{"markdown":"先缩小范围。","type":"question"}, "tasks":[] }`, }} service := NewService( store, runtimeconfig.NewService(store, store, clock), &fakeWorkspaceRuntime{workspace: ws}, runner, clock, "", ) if _, err := service.ProcessOnce(ctx); err != nil { t.Fatalf("ProcessOnce() error = %v", err) } if !strings.Contains(runner.prompt, "你是自定义 leader。") { t.Fatalf("expected custom leader prompt in runtime instructions, got %q", runner.prompt) } if strings.Contains(runner.prompt, defaultLeaderSystemPrompt) { t.Fatalf("expected custom prompt to replace fallback system prompt, got %q", runner.prompt) } } func TestValidateLeaderOutputForTopicRejectsPatchOnEmptyGraph(t *testing.T) { err := validateLeaderOutputForTopic(leaderOutput{ PlanSummaryMarkdown: "Patch on empty graph.", PlanMode: "patch", ReplanReason: "Need to update graph.", ExecutionMode: "plan_only", LeaderReply: leaderReplySpec{Markdown: "Patch.", Type: "summary"}, }, nil, nil) if err == nil || !strings.Contains(err.Error(), "cannot use plan_mode=patch on an empty topic graph") { t.Fatalf("expected patch on empty graph error, got %v", err) } } func TestValidateLeaderOutputForTopicRejectsInitialOnExistingGraph(t *testing.T) { err := validateLeaderOutputForTopic(leaderOutput{ PlanSummaryMarkdown: "Initial on existing graph.", PlanMode: "initial", ReplanReason: "", ExecutionMode: "plan_only", LeaderReply: leaderReplySpec{Markdown: "Initial.", Type: "summary"}, }, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, nil) if err == nil || !strings.Contains(err.Error(), "must use plan_mode=patch") { t.Fatalf("expected initial on existing graph error, got %v", err) } } func TestValidateLeaderOutputForTopicRejectsPatchWithoutReason(t *testing.T) { err := validateLeaderOutputForTopic(leaderOutput{ PlanSummaryMarkdown: "Patch without reason.", PlanMode: "patch", ReplanReason: "", ExecutionMode: "plan_only", LeaderReply: leaderReplySpec{Markdown: "Patch.", Type: "summary"}, }, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, nil) if err == nil || !strings.Contains(err.Error(), "requires replan_reason") { t.Fatalf("expected patch without reason error, got %v", err) } } func TestValidateLeaderOutputForTopicAllowsPatchDependenciesOnExistingTaskIDs(t *testing.T) { err := validateLeaderOutputForTopic(leaderOutput{ PlanSummaryMarkdown: "Patch existing graph.", PlanMode: "patch", ReplanReason: "Attach a follow-up task to the completed gate.", ExecutionMode: "plan_only", LeaderReply: leaderReplySpec{Markdown: "Continue from the existing gate.", Type: "summary"}, Tasks: []leaderTaskSpec{ { Key: "follow-up", Title: "Continue implementation", BodyMarkdown: "Build the next step.", AcceptanceMarkdown: "Ready to proceed.", Kind: "execution", Deliverables: []string{"apps/web/src"}, Priority: 10, TaskOrder: 2, DependsOn: []string{"task-existing"}, }, }, }, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, []task.Record{{ID: "task-existing", Title: "Existing Gate"}}) if err != nil { t.Fatalf("expected existing task dependency to validate, got %v", err) } } func TestValidateLeaderOutputForTopicRejectsUnknownPatchDependencyKey(t *testing.T) { err := validateLeaderOutputForTopic(leaderOutput{ PlanSummaryMarkdown: "Patch existing graph.", PlanMode: "patch", ReplanReason: "Attach a follow-up task to the completed gate.", ExecutionMode: "plan_only", LeaderReply: leaderReplySpec{Markdown: "Continue from the existing gate.", Type: "summary"}, Tasks: []leaderTaskSpec{ { Key: "follow-up", Title: "Continue implementation", BodyMarkdown: "Build the next step.", AcceptanceMarkdown: "Ready to proceed.", Kind: "execution", Deliverables: []string{"apps/web/src"}, Priority: 10, TaskOrder: 2, DependsOn: []string{"missing-task"}, }, }, }, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, []task.Record{{ID: "task-existing", Title: "Existing Gate"}}) if err == nil || !strings.Contains(err.Error(), "unknown dependency key") { t.Fatalf("expected unknown dependency key error, got %v", err) } } func TestLeaderCommandEnvWritesRuntimeCodexHomeUnderProjectRuntime(t *testing.T) { projectRoot := t.TempDir() env := leaderCommandEnv(projectRoot, runtimeconfig.ResolvedRole{ WorkspaceID: "ws_1", Role: role.Definition{Name: "leader"}, Config: role.Config{ RoleName: "leader", ConfigTOML: strings.Join([]string{ `model = "gpt-5.4"`, `model_provider = "custom"`, ``, `[model_providers.custom]`, `base_url = "http://example.test/v1"`, `wire_api = "responses"`, }, "\n"), AuthJSON: `{"OPENAI_API_KEY":"token-1"}`, }, }) expectedHome := runtimecodex.HostLeaderHomeDir(projectRoot, "ws_1", "leader") expectedCodexHome := runtimecodex.HostLeaderCodexDir(projectRoot, "ws_1", "leader") joined := strings.Join(env, "\n") if !strings.Contains(joined, "HOME="+expectedHome) { t.Fatalf("expected isolated HOME in env, env=%q", joined) } if !strings.Contains(joined, "CODEX_HOME="+expectedCodexHome) { t.Fatalf("expected isolated CODEX_HOME in env, env=%q", joined) } configBytes, err := os.ReadFile(filepath.Join(expectedCodexHome, "config.toml")) if err != nil { t.Fatalf("ReadFile(config.toml) error = %v", err) } configText := string(configBytes) for _, needle := range []string{ `model = "gpt-5.4"`, `model_provider = "custom"`, `[model_providers.custom]`, `base_url = "http://example.test/v1"`, } { if !strings.Contains(configText, needle) { t.Fatalf("expected seeded config to contain %q, got:\n%s", needle, configText) } } authBytes, err := os.ReadFile(filepath.Join(expectedCodexHome, "auth.json")) if err != nil { t.Fatalf("ReadFile(auth.json) error = %v", err) } var auth map[string]any if err := json.Unmarshal(authBytes, &auth); err != nil { t.Fatalf("Unmarshal(auth.json) error = %v", err) } if auth["OPENAI_API_KEY"] != "token-1" { t.Fatalf("unexpected copied auth.json: %s", string(authBytes)) } } func assertSchemaRequiredMatchesProperties(t *testing.T, schema map[string]any) { t.Helper() properties, ok := schema["properties"].(map[string]any) if !ok { t.Fatalf("schema properties missing or invalid: %#v", schema) } requiredRaw, ok := schema["required"].([]any) if !ok { t.Fatalf("schema required missing or invalid: %#v", schema) } required := make(map[string]struct{}, len(requiredRaw)) for _, item := range requiredRaw { key, ok := item.(string) if !ok { t.Fatalf("schema required item must be string, got %#v", item) } required[key] = struct{}{} } for key := range properties { if _, ok := required[key]; !ok { t.Fatalf("schema required is missing property %q", key) } } } type fakeRunner struct { result RunResult prompt string } func (f *fakeRunner) Run(_ context.Context, _ string, _ runtimeconfig.ResolvedRole, prompt, _ string) (RunResult, error) { f.prompt = prompt return f.result, nil } type fakeWorkspaceRuntime struct { workspace workspace.Workspace startedLaneID string startedLaneIDs []string } func (f *fakeWorkspaceRuntime) Ensure(_ context.Context, _ string) (workspace.Workspace, workspaceruntime.Runtime, error) { return f.workspace, workspaceruntime.Runtime{}, nil } func (f *fakeWorkspaceRuntime) EnsureLane(_ context.Context, laneID string) (lane.Record, error) { f.startedLaneID = laneID f.startedLaneIDs = append(f.startedLaneIDs, laneID) return lane.Record{ID: laneID, Name: "UI Chain", Status: lane.StatusRunning}, nil } func seedLeaderLoopWorkspace(t *testing.T, ctx context.Context, store *sqlitestore.Store, clock timeutil.Clock) (workspace.Workspace, topic.Record) { t.Helper() now := timeutil.FormatRFC3339(clock.Now()) project, err := store.CreateProject(ctx, workspace.Project{ Slug: "demo", Name: "Demo", RootPath: t.TempDir(), DefaultBranch: "main", Status: "active", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateProject() error = %v", err) } ws, err := store.CreateWorkspace(ctx, workspace.Workspace{ ProjectID: project.ID, Slug: "todo", Name: "todo", RootPath: t.TempDir(), BaseBranch: "main", WorktreeBranch: "worktree/todo", RuntimeBackend: "host", Status: "active", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateWorkspace() error = %v", err) } record, err := store.CreateTopic(ctx, topic.Record{ WorkspaceID: ws.ID, Slug: "v1", Title: "v1", Space: topic.SpaceWorkflow, Status: "plan", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateTopic() error = %v", err) } return ws, record }