Files

998 lines
34 KiB
Go

package leaderloop
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
"time"
"inbox/internal/app/runtimecodex"
"inbox/internal/app/runtimeconfig"
"inbox/internal/app/workspaceruntime"
"inbox/internal/base/timeutil"
"inbox/internal/domain/lane"
"inbox/internal/domain/message"
"inbox/internal/domain/role"
"inbox/internal/domain/task"
"inbox/internal/domain/topic"
"inbox/internal/domain/workflow"
"inbox/internal/domain/workspace"
sqlitestore "inbox/internal/store/sqlite"
)
func TestProcessOnceConsumesLeaderMessageAndCreatesLaneTasks(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 4, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "user",
ToExpr: "leader",
Type: message.TypeChat,
Stage: "plan",
BodyMarkdown: "Build a todo app.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
fakeRuntime := &fakeWorkspaceRuntime{workspace: ws}
runner := &fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"Leader created the first execution lane.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"I split the work into a UI graph and will wait for confirmation.","type":"chat"},
"tasks":[
{"key":"design","title":"Design the UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]},
{"key":"verify","title":"Verify the UI","body_markdown":"Verify the UI flow.","acceptance_markdown":"Verification complete.","kind":"verification","deliverables":["reports/ui-check.md"],"priority":5,"task_order":2,"depends_on":["design"]}
]
}`,
}}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
fakeRuntime,
runner,
clock,
"",
)
result, err := service.ProcessOnce(ctx)
if err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if result == nil {
t.Fatal("expected leader result")
}
if len(result.Lanes) != 1 || result.Lanes[0].Name != "Design the UI" {
t.Fatalf("unexpected lanes: %#v", result.Lanes)
}
if len(result.Tasks) != 2 {
t.Fatalf("unexpected tasks: %#v", result.Tasks)
}
if fakeRuntime.startedLaneID != "" {
t.Fatalf("expected initial graph to wait for confirmation, got started lane %q", fakeRuntime.startedLaneID)
}
if result.Reply == nil || result.Reply.ToExpr != "user" {
t.Fatalf("expected reply to user, got %#v", result.Reply)
}
if !strings.Contains(runner.prompt, "## Skills") || !strings.Contains(runner.prompt, "Use Inbox V2") {
t.Fatalf("expected leader prompt to include bound skills, got %q", runner.prompt)
}
if !strings.Contains(result.Run.CommandJSON, `"planning"`) || !strings.Contains(result.Run.CommandJSON, `"execution_mode":"plan_only"`) {
t.Fatalf("expected planning payload in command_json, got %s", result.Run.CommandJSON)
}
tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListTasksByTopic() error = %v", err)
}
if len(tasks) != 2 {
t.Fatalf("expected 2 tasks in store, got %d", len(tasks))
}
if tasks[0].Status != task.StatusReady || tasks[1].Status != task.StatusDraft {
t.Fatalf("unexpected task statuses: %#v", tasks)
}
updatedTopic, err := store.GetTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("GetTopic() error = %v", err)
}
if updatedTopic.Status != "awaiting_confirmation" {
t.Fatalf("expected awaiting_confirmation topic, got %#v", updatedTopic)
}
graphVersion, err := store.GetLatestTaskGraphVersionByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("GetLatestTaskGraphVersionByTopic() error = %v", err)
}
if graphVersion.Status != "draft" {
t.Fatalf("expected draft graph version, got %#v", graphVersion)
}
}
func TestParseLeaderOutputRejectsInitialFreezeStartNodes(t *testing.T) {
_, err := parseLeaderOutput(`{
"plan_summary_markdown":"Draft the initial graph.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"Please confirm the graph first.","type":"summary"},
"tasks":[
{"key":"build","title":"Build app","body_markdown":"Implement app.","acceptance_markdown":"App works.","kind":"execution","deliverables":["apps/web"],"priority":1,"task_order":1,"depends_on":[]}
],
"start_nodes":["app"]
}`, true)
if err == nil || !strings.Contains(err.Error(), `unknown field "start_nodes"`) {
t.Fatalf("expected initial freeze start_nodes error, got %v", err)
}
}
func TestLeaderOutputSchemaRequiresEveryDeclaredProperty(t *testing.T) {
for _, initialFreeze := range []bool{true, false} {
schemaJSON := leaderOutputSchema(initialFreeze)
var schema map[string]any
if err := json.Unmarshal([]byte(schemaJSON), &schema); err != nil {
t.Fatalf("json.Unmarshal(schema) error = %v", err)
}
assertSchemaRequiredMatchesProperties(t, schema)
properties := schema["properties"].(map[string]any)
tasks := properties["tasks"].(map[string]any)
items := tasks["items"].(map[string]any)
assertSchemaRequiredMatchesProperties(t, items)
leaderReply := properties["leader_reply"].(map[string]any)
assertSchemaRequiredMatchesProperties(t, leaderReply)
}
}
func TestProcessOnceReusesExistingLaneOnWorkerFollowUp(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 4, 30, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
_, err = store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "UI Chain",
Slug: "ui-chain",
Status: lane.StatusBlocked,
CreatedByRoleName: "leader",
})
if err != nil {
t.Fatalf("CreateLane() error = %v", err)
}
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "worker",
ToExpr: "leader",
Type: message.TypeSummary,
Stage: "execution",
BodyMarkdown: "UI chain failed the first task.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
fakeRuntime := &fakeWorkspaceRuntime{workspace: ws}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
fakeRuntime,
&fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"Reused the UI chain and queued a retry task.",
"plan_mode":"patch",
"replan_reason":"UI chain failed and needs a focused retry.",
"execution_mode":"plan_and_start",
"leader_reply":{"markdown":"继续在现有图上修复。","type":"decision"},
"tasks":[
{"key":"retry-ui","title":"修复 UI 初始化失败","body_markdown":"在现有 UI 图上修复失败原因。","acceptance_markdown":"UI graph 恢复可执行。","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":3,"depends_on":[]}
]
}`,
}},
clock,
"",
)
result, err := service.ProcessOnce(ctx)
if err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if result == nil {
t.Fatal("expected leader result")
}
if fakeRuntime.startedLaneID == "" {
t.Fatalf("expected a derived lane to be started, got none")
}
lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListLanesByTopic() error = %v", err)
}
if len(lanes) != 2 {
t.Fatalf("expected existing blocked lane plus one derived execution lane, got %#v", lanes)
}
tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListTasksByTopic() error = %v", err)
}
if len(tasks) != 1 {
t.Fatalf("expected retry task on existing chain, got %#v", tasks)
}
}
func TestParseLeaderOutputRejectsInvalidPlanOnlyStartNodes(t *testing.T) {
_, err := parseLeaderOutput(`{
"plan_summary_markdown":"Planned only.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"先只输出计划。","type":"summary"},
"tasks":[
{"key":"design","title":"Design the UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]}
],
"start_nodes":["ui"]
}`, false)
if err == nil || !strings.Contains(err.Error(), `unknown field "start_nodes"`) {
t.Fatalf("expected invalid plan_only start_nodes error, got %v", err)
}
}
func TestParseLeaderOutputRejectsMissingDeliverables(t *testing.T) {
_, err := parseLeaderOutput(`{
"plan_summary_markdown":"Plan.",
"plan_mode":"initial",
"execution_mode":"plan_and_start",
"leader_reply":{"markdown":"开始执行。","type":"decision"},
"tasks":[
{"key":"design","title":"Design the UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":[],"priority":10,"task_order":1,"depends_on":[]}
]
}`, false)
if err == nil || !strings.Contains(err.Error(), "must declare deliverables") {
t.Fatalf("expected missing deliverables error, got %v", err)
}
}
func TestParseLeaderOutputRejectsDuplicateTaskKeys(t *testing.T) {
_, err := parseLeaderOutput(`{
"plan_summary_markdown":"Plan.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"先看计划。","type":"summary"},
"tasks":[
{"key":"ui","title":"Build UI","body_markdown":"Create the React UI.","acceptance_markdown":"UI is polished.","kind":"execution","deliverables":["apps/web/src"],"depends_on":[]},
{"key":"ui","title":"Verify UI","body_markdown":"Verify the React UI.","acceptance_markdown":"Verification complete.","kind":"verification","deliverables":["reports/ui-check.md"],"depends_on":["ui"]}
]
}`, true)
if err == nil || !strings.Contains(err.Error(), `must be unique`) {
t.Fatalf("expected duplicate task key error, got %v", err)
}
}
func TestParseLeaderOutputAllowsMilestoneWithoutDeliverables(t *testing.T) {
output, err := parseLeaderOutput(`{
"plan_summary_markdown":"Plan with milestone.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"先看计划。","type":"summary"},
"tasks":[
{"key":"ui","title":"Build UI","body_markdown":"Build the UI.","acceptance_markdown":"UI works.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]},
{"key":"ready-for-demo","title":"Ready for Demo","body_markdown":"Aggregate completion.","acceptance_markdown":"Milestone reached.","kind":"milestone","priority":5,"task_order":2,"depends_on":["ui"]}
]
}`, true)
if err != nil {
t.Fatalf("parseLeaderOutput() error = %v", err)
}
if len(output.Tasks) != 2 || output.Tasks[1].Kind != "milestone" {
t.Fatalf("expected milestone task, got %#v", output.Tasks)
}
}
func TestProcessOnceKeepsMilestoneOnExecutionLane(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 4, 45, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "user",
ToExpr: "leader",
Type: message.TypeChat,
Stage: "plan",
BodyMarkdown: "Build UI and mark the review milestone.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
&fakeWorkspaceRuntime{workspace: ws},
&fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"One execution lane plus milestone.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"先确认这个图。","type":"summary"},
"tasks":[
{"key":"ui","title":"Build UI","body_markdown":"Build the UI.","acceptance_markdown":"UI works.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]},
{"key":"ready-for-demo","title":"Ready for Demo","body_markdown":"Aggregate completion.","acceptance_markdown":"Milestone reached.","kind":"milestone","priority":5,"task_order":2,"depends_on":["ui"]}
]
}`,
}},
clock,
"",
)
result, err := service.ProcessOnce(ctx)
if err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if result == nil {
t.Fatal("expected leader result")
}
lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListLanesByTopic() error = %v", err)
}
if len(lanes) != 1 {
t.Fatalf("expected milestone to reuse the execution lane, got %#v", lanes)
}
tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListTasksByTopic() error = %v", err)
}
if len(tasks) != 2 || tasks[0].LaneID != tasks[1].LaneID {
t.Fatalf("expected milestone and execution task to share a lane, got %#v", tasks)
}
}
func TestProcessOnceStartsOnlyGateLanesInitially(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 5, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "user",
ToExpr: "leader",
Type: message.TypeChat,
Stage: "plan",
BodyMarkdown: "Build a gated todo app.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
fakeRuntime := &fakeWorkspaceRuntime{workspace: ws}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
fakeRuntime,
&fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"Start with foundation gating.",
"plan_mode":"initial",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"先跑基础检查。","type":"decision"},
"tasks":[
{"key":"gate","title":"Check workspace baseline","body_markdown":"Validate setup.","acceptance_markdown":"Setup is valid.","kind":"gate","deliverables":["reports/gate.md"],"priority":10,"task_order":1,"depends_on":[]},
{"key":"ui","title":"Build UI","body_markdown":"Implement the UI.","acceptance_markdown":"UI works.","kind":"execution","deliverables":["apps/web/src"],"priority":10,"task_order":1,"depends_on":[]}
]
}`,
}},
clock,
"",
)
result, err := service.ProcessOnce(ctx)
if err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if result == nil {
t.Fatal("expected result")
}
lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListLanesByTopic() error = %v", err)
}
tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListTasksByTopic() error = %v", err)
}
var gateTask, uiTask task.Record
for _, item := range tasks {
if item.Kind == task.KindGate {
gateTask = item
}
if item.Title == "Build UI" {
uiTask = item
}
}
if len(lanes) != 1 {
t.Fatalf("expected gate task to reuse the execution lane, got %#v", lanes)
}
if gateTask.LaneID == "" || gateTask.LaneID != uiTask.LaneID {
t.Fatalf("expected gate and execution task to share one lane, got gate=%#v ui=%#v", gateTask, uiTask)
}
if gateTask.Status != task.StatusReady {
t.Fatalf("expected gate task ready, got %#v", gateTask)
}
if uiTask.Status != task.StatusDraft {
t.Fatalf("expected non-gate task blocked behind gate, got %#v", uiTask)
}
if len(fakeRuntime.startedLaneIDs) != 0 {
t.Fatalf("expected initial gated plan to wait for confirmation, got %#v", fakeRuntime.startedLaneIDs)
}
updatedTopic, err := store.GetTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("GetTopic() error = %v", err)
}
if updatedTopic.Status != "awaiting_confirmation" {
t.Fatalf("expected awaiting_confirmation topic, got %#v lanes=%#v", updatedTopic, lanes)
}
}
func TestProcessOncePlanOnlyDoesNotAutoStartReadyLanesAfterGateSuccess(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 5, 30, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
topicRecord.Status = "execution"
if _, err := store.UpdateTopic(ctx, topicRecord); err != nil {
t.Fatalf("UpdateTopic() error = %v", err)
}
gateChain, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Foundation",
Slug: "foundation",
Status: lane.StatusSucceeded,
BranchName: "lane/todo/foundation",
WorktreePath: filepath.Join(t.TempDir(), "foundation"),
ContainerName: "lane-foundation-test",
CreatedByRoleName: "leader",
})
if err != nil {
t.Fatalf("CreateLane(gate) error = %v", err)
}
frontendChain, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Frontend",
Slug: "frontend",
Status: lane.StatusReady,
BranchName: "lane/todo/frontend",
WorktreePath: filepath.Join(t.TempDir(), "frontend"),
ContainerName: "lane-frontend-test",
CreatedByRoleName: "leader",
})
if err != nil {
t.Fatalf("CreateLane(frontend) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: gateChain.ID,
Title: "Check workspace baseline",
BodyMarkdown: "Validate setup.",
Kind: task.KindGate,
Status: task.StatusSucceeded,
Priority: 10,
TaskOrder: 1,
CreatedByRoleName: "leader",
}, nil); err != nil {
t.Fatalf("CreateTask(gate) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: frontendChain.ID,
Title: "Build UI",
BodyMarkdown: "Implement UI.",
Kind: task.KindExecution,
Status: task.StatusReady,
Priority: 10,
TaskOrder: 1,
CreatedByRoleName: "leader",
}, nil); err != nil {
t.Fatalf("CreateTask(frontend) error = %v", err)
}
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "worker",
ToExpr: "leader",
Type: message.TypeSummary,
Stage: "execution",
BodyMarkdown: "Foundation gate completed successfully. Keep the next lane stopped for now.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
fakeRuntime := &fakeWorkspaceRuntime{workspace: ws}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
fakeRuntime,
&fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"Gate passed; continue existing plan.",
"plan_mode":"patch",
"replan_reason":"Gate completed successfully; continue existing graph.",
"execution_mode":"plan_only",
"leader_reply":{"markdown":"基础检查通过,继续后续链路。","type":"decision"},
"tasks":[]
}`,
}},
clock,
"",
)
result, err := service.ProcessOnce(ctx)
if err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if result == nil {
t.Fatal("expected result")
}
if len(fakeRuntime.startedLaneIDs) != 0 {
t.Fatalf("expected plan_only to keep lanes stopped, got %#v", fakeRuntime.startedLaneIDs)
}
}
func TestProcessOncePlanAndStartAutoStartsReadyLanesAfterGateSuccess(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 5, 35, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
topicRecord.Status = "execution"
if _, err := store.UpdateTopic(ctx, topicRecord); err != nil {
t.Fatalf("UpdateTopic() error = %v", err)
}
gateChain, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Foundation",
Slug: "foundation",
Status: lane.StatusSucceeded,
BranchName: "lane/todo/foundation",
WorktreePath: filepath.Join(t.TempDir(), "foundation"),
ContainerName: "lane-foundation-test",
CreatedByRoleName: "leader",
})
if err != nil {
t.Fatalf("CreateLane(gate) error = %v", err)
}
frontendChain, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Frontend",
Slug: "frontend",
Status: lane.StatusReady,
BranchName: "lane/todo/frontend",
WorktreePath: filepath.Join(t.TempDir(), "frontend"),
ContainerName: "lane-frontend-test",
CreatedByRoleName: "leader",
})
if err != nil {
t.Fatalf("CreateLane(frontend) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: gateChain.ID,
Title: "Check workspace baseline",
BodyMarkdown: "Validate setup.",
Kind: task.KindGate,
Status: task.StatusSucceeded,
Priority: 10,
TaskOrder: 1,
CreatedByRoleName: "leader",
}, nil); err != nil {
t.Fatalf("CreateTask(gate) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: frontendChain.ID,
Title: "Build UI",
BodyMarkdown: "Implement UI.",
Kind: task.KindExecution,
Status: task.StatusReady,
Priority: 10,
TaskOrder: 1,
CreatedByRoleName: "leader",
}, nil); err != nil {
t.Fatalf("CreateTask(frontend) error = %v", err)
}
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "worker",
ToExpr: "leader",
Type: message.TypeSummary,
Stage: "execution",
BodyMarkdown: "Foundation gate completed successfully. Start the next ready lane.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
fakeRuntime := &fakeWorkspaceRuntime{workspace: ws}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
fakeRuntime,
&fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"Gate passed; continue existing plan.",
"plan_mode":"patch",
"replan_reason":"Gate completed successfully; continue existing graph.",
"execution_mode":"plan_and_start",
"leader_reply":{"markdown":"基础检查通过,继续后续链路。","type":"decision"},
"tasks":[]
}`,
}},
clock,
"",
)
result, err := service.ProcessOnce(ctx)
if err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if result == nil {
t.Fatal("expected result")
}
if len(fakeRuntime.startedLaneIDs) != 1 || fakeRuntime.startedLaneIDs[0] != frontendChain.ID {
t.Fatalf("expected auto-start of frontend lane, got %#v", fakeRuntime.startedLaneIDs)
}
}
func TestProcessOnceUsesResolvedLeaderPromptOverrides(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 6, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, topicRecord := seedLeaderLoopWorkspace(t, ctx, store, clock)
if _, err := store.UpsertRolePrompt(ctx, role.Prompt{
RoleName: "leader",
WorkspaceID: ws.ID,
PromptKind: role.PromptSystem,
ContentMarkdown: "你是自定义 leader。\n\n优先把范围压缩到最小可执行图。",
}, "test"); err != nil {
t.Fatalf("UpsertRolePrompt() error = %v", err)
}
if _, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "user",
ToExpr: "leader",
Type: message.TypeChat,
Stage: "plan",
BodyMarkdown: "Build a todo app.",
}); err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
runner := &fakeRunner{result: RunResult{
Status: workflow.RunStatusSucceeded,
ResultJSON: `{
"plan_summary_markdown":"Need clarification first.",
"plan_mode":"initial",
"execution_mode":"clarify",
"leader_reply":{"markdown":"先缩小范围。","type":"question"},
"tasks":[]
}`,
}}
service := NewService(
store,
runtimeconfig.NewService(store, store, clock),
&fakeWorkspaceRuntime{workspace: ws},
runner,
clock,
"",
)
if _, err := service.ProcessOnce(ctx); err != nil {
t.Fatalf("ProcessOnce() error = %v", err)
}
if !strings.Contains(runner.prompt, "你是自定义 leader。") {
t.Fatalf("expected custom leader prompt in runtime instructions, got %q", runner.prompt)
}
if strings.Contains(runner.prompt, defaultLeaderSystemPrompt) {
t.Fatalf("expected custom prompt to replace fallback system prompt, got %q", runner.prompt)
}
}
func TestValidateLeaderOutputForTopicRejectsPatchOnEmptyGraph(t *testing.T) {
err := validateLeaderOutputForTopic(leaderOutput{
PlanSummaryMarkdown: "Patch on empty graph.",
PlanMode: "patch",
ReplanReason: "Need to update graph.",
ExecutionMode: "plan_only",
LeaderReply: leaderReplySpec{Markdown: "Patch.", Type: "summary"},
}, nil, nil)
if err == nil || !strings.Contains(err.Error(), "cannot use plan_mode=patch on an empty topic graph") {
t.Fatalf("expected patch on empty graph error, got %v", err)
}
}
func TestValidateLeaderOutputForTopicRejectsInitialOnExistingGraph(t *testing.T) {
err := validateLeaderOutputForTopic(leaderOutput{
PlanSummaryMarkdown: "Initial on existing graph.",
PlanMode: "initial",
ReplanReason: "",
ExecutionMode: "plan_only",
LeaderReply: leaderReplySpec{Markdown: "Initial.", Type: "summary"},
}, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, nil)
if err == nil || !strings.Contains(err.Error(), "must use plan_mode=patch") {
t.Fatalf("expected initial on existing graph error, got %v", err)
}
}
func TestValidateLeaderOutputForTopicRejectsPatchWithoutReason(t *testing.T) {
err := validateLeaderOutputForTopic(leaderOutput{
PlanSummaryMarkdown: "Patch without reason.",
PlanMode: "patch",
ReplanReason: "",
ExecutionMode: "plan_only",
LeaderReply: leaderReplySpec{Markdown: "Patch.", Type: "summary"},
}, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, nil)
if err == nil || !strings.Contains(err.Error(), "requires replan_reason") {
t.Fatalf("expected patch without reason error, got %v", err)
}
}
func TestValidateLeaderOutputForTopicAllowsPatchDependenciesOnExistingTaskIDs(t *testing.T) {
err := validateLeaderOutputForTopic(leaderOutput{
PlanSummaryMarkdown: "Patch existing graph.",
PlanMode: "patch",
ReplanReason: "Attach a follow-up task to the completed gate.",
ExecutionMode: "plan_only",
LeaderReply: leaderReplySpec{Markdown: "Continue from the existing gate.", Type: "summary"},
Tasks: []leaderTaskSpec{
{
Key: "follow-up",
Title: "Continue implementation",
BodyMarkdown: "Build the next step.",
AcceptanceMarkdown: "Ready to proceed.",
Kind: "execution",
Deliverables: []string{"apps/web/src"},
Priority: 10,
TaskOrder: 2,
DependsOn: []string{"task-existing"},
},
},
}, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, []task.Record{{ID: "task-existing", Title: "Existing Gate"}})
if err != nil {
t.Fatalf("expected existing task dependency to validate, got %v", err)
}
}
func TestValidateLeaderOutputForTopicRejectsUnknownPatchDependencyKey(t *testing.T) {
err := validateLeaderOutputForTopic(leaderOutput{
PlanSummaryMarkdown: "Patch existing graph.",
PlanMode: "patch",
ReplanReason: "Attach a follow-up task to the completed gate.",
ExecutionMode: "plan_only",
LeaderReply: leaderReplySpec{Markdown: "Continue from the existing gate.", Type: "summary"},
Tasks: []leaderTaskSpec{
{
Key: "follow-up",
Title: "Continue implementation",
BodyMarkdown: "Build the next step.",
AcceptanceMarkdown: "Ready to proceed.",
Kind: "execution",
Deliverables: []string{"apps/web/src"},
Priority: 10,
TaskOrder: 2,
DependsOn: []string{"missing-task"},
},
},
}, []lane.Record{{ID: "chain_1", Slug: "ui-chain", Name: "UI Chain"}}, []task.Record{{ID: "task-existing", Title: "Existing Gate"}})
if err == nil || !strings.Contains(err.Error(), "unknown dependency key") {
t.Fatalf("expected unknown dependency key error, got %v", err)
}
}
func TestLeaderCommandEnvWritesRuntimeCodexHomeUnderProjectRuntime(t *testing.T) {
projectRoot := t.TempDir()
env := leaderCommandEnv(projectRoot, runtimeconfig.ResolvedRole{
WorkspaceID: "ws_1",
Role: role.Definition{Name: "leader"},
Config: role.Config{
RoleName: "leader",
ConfigTOML: strings.Join([]string{
`model = "gpt-5.4"`,
`model_provider = "custom"`,
``,
`[model_providers.custom]`,
`base_url = "http://example.test/v1"`,
`wire_api = "responses"`,
}, "\n"),
AuthJSON: `{"OPENAI_API_KEY":"token-1"}`,
},
})
expectedHome := runtimecodex.HostLeaderHomeDir(projectRoot, "ws_1", "leader")
expectedCodexHome := runtimecodex.HostLeaderCodexDir(projectRoot, "ws_1", "leader")
joined := strings.Join(env, "\n")
if !strings.Contains(joined, "HOME="+expectedHome) {
t.Fatalf("expected isolated HOME in env, env=%q", joined)
}
if !strings.Contains(joined, "CODEX_HOME="+expectedCodexHome) {
t.Fatalf("expected isolated CODEX_HOME in env, env=%q", joined)
}
configBytes, err := os.ReadFile(filepath.Join(expectedCodexHome, "config.toml"))
if err != nil {
t.Fatalf("ReadFile(config.toml) error = %v", err)
}
configText := string(configBytes)
for _, needle := range []string{
`model = "gpt-5.4"`,
`model_provider = "custom"`,
`[model_providers.custom]`,
`base_url = "http://example.test/v1"`,
} {
if !strings.Contains(configText, needle) {
t.Fatalf("expected seeded config to contain %q, got:\n%s", needle, configText)
}
}
authBytes, err := os.ReadFile(filepath.Join(expectedCodexHome, "auth.json"))
if err != nil {
t.Fatalf("ReadFile(auth.json) error = %v", err)
}
var auth map[string]any
if err := json.Unmarshal(authBytes, &auth); err != nil {
t.Fatalf("Unmarshal(auth.json) error = %v", err)
}
if auth["OPENAI_API_KEY"] != "token-1" {
t.Fatalf("unexpected copied auth.json: %s", string(authBytes))
}
}
func assertSchemaRequiredMatchesProperties(t *testing.T, schema map[string]any) {
t.Helper()
properties, ok := schema["properties"].(map[string]any)
if !ok {
t.Fatalf("schema properties missing or invalid: %#v", schema)
}
requiredRaw, ok := schema["required"].([]any)
if !ok {
t.Fatalf("schema required missing or invalid: %#v", schema)
}
required := make(map[string]struct{}, len(requiredRaw))
for _, item := range requiredRaw {
key, ok := item.(string)
if !ok {
t.Fatalf("schema required item must be string, got %#v", item)
}
required[key] = struct{}{}
}
for key := range properties {
if _, ok := required[key]; !ok {
t.Fatalf("schema required is missing property %q", key)
}
}
}
type fakeRunner struct {
result RunResult
prompt string
}
func (f *fakeRunner) Run(_ context.Context, _ string, _ runtimeconfig.ResolvedRole, prompt, _ string) (RunResult, error) {
f.prompt = prompt
return f.result, nil
}
type fakeWorkspaceRuntime struct {
workspace workspace.Workspace
startedLaneID string
startedLaneIDs []string
}
func (f *fakeWorkspaceRuntime) Ensure(_ context.Context, _ string) (workspace.Workspace, workspaceruntime.Runtime, error) {
return f.workspace, workspaceruntime.Runtime{}, nil
}
func (f *fakeWorkspaceRuntime) EnsureLane(_ context.Context, laneID string) (lane.Record, error) {
f.startedLaneID = laneID
f.startedLaneIDs = append(f.startedLaneIDs, laneID)
return lane.Record{ID: laneID, Name: "UI Chain", Status: lane.StatusRunning}, nil
}
func seedLeaderLoopWorkspace(t *testing.T, ctx context.Context, store *sqlitestore.Store, clock timeutil.Clock) (workspace.Workspace, topic.Record) {
t.Helper()
now := timeutil.FormatRFC3339(clock.Now())
project, err := store.CreateProject(ctx, workspace.Project{
Slug: "demo",
Name: "Demo",
RootPath: t.TempDir(),
DefaultBranch: "main",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateProject() error = %v", err)
}
ws, err := store.CreateWorkspace(ctx, workspace.Workspace{
ProjectID: project.ID,
Slug: "todo",
Name: "todo",
RootPath: t.TempDir(),
BaseBranch: "main",
WorktreeBranch: "worktree/todo",
RuntimeBackend: "host",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateWorkspace() error = %v", err)
}
record, err := store.CreateTopic(ctx, topic.Record{
WorkspaceID: ws.ID,
Slug: "v1",
Title: "v1",
Space: topic.SpaceWorkflow,
Status: "plan",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateTopic() error = %v", err)
}
return ws, record
}