Files

820 lines
28 KiB
Go

package taskexec
import (
"context"
"database/sql"
"errors"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"inbox/internal/app/lanematerialize"
"inbox/internal/app/runtimeconfig"
"inbox/internal/base/timeutil"
"inbox/internal/domain/lane"
"inbox/internal/domain/message"
"inbox/internal/domain/task"
"inbox/internal/domain/topic"
"inbox/internal/domain/workflow"
sqlitestore "inbox/internal/store/sqlite"
)
func TestCompletePromotesDependentTaskAndNotifiesLeader(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 10, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
_, chainRecord := seedTaskExecGraph(t, ctx, store)
svc := NewService(store, newResolvedRoleResolver(store, clock), clock, WithSnapshotter(noopSnapshotter{}), WithMaterializer(noopMaterializer{}))
assignment, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-1")
if err != nil {
t.Fatalf("ClaimNext() error = %v", err)
}
if assignment.Task.Title != "Task A" {
t.Fatalf("expected Task A to be claimed first, got %#v", assignment.Task)
}
if !strings.Contains(assignment.Prompt, "Task A") {
t.Fatalf("expected task prompt to include task title, got %q", assignment.Prompt)
}
if !strings.Contains(assignment.Prompt, "## Skills") || !strings.Contains(assignment.Prompt, "Use Inbox V2") {
t.Fatalf("expected task prompt to include bound skills, got %q", assignment.Prompt)
}
updatedRun, err := svc.Complete(ctx, Completion{
RunID: assignment.Run.ID,
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
ResultMarkdown: "Implemented Task A.",
})
if err != nil {
t.Fatalf("Complete() error = %v", err)
}
if updatedRun.Status != workflow.RunStatusSucceeded {
t.Fatalf("unexpected run status: %#v", updatedRun)
}
tasks, err := store.ListTasksByLane(ctx, chainRecord.ID)
if err != nil {
t.Fatalf("ListTasksByLane() error = %v", err)
}
byTitle := make(map[string]task.Record, len(tasks))
for _, item := range tasks {
byTitle[item.Title] = item
}
if byTitle["Task A"].Status != task.StatusSucceeded {
t.Fatalf("expected Task A succeeded, got %#v", byTitle["Task A"])
}
if byTitle["Task B"].Status != task.StatusReady {
t.Fatalf("expected Task B promoted to ready, got %#v", byTitle["Task B"])
}
messages, err := store.ListMessagesByTopic(ctx, chainRecord.TopicID)
if err != nil {
t.Fatalf("ListMessagesByTopic() error = %v", err)
}
var workerSummary *message.Record
for _, item := range messages {
if item.FromRoleName == "worker" && item.ToExpr == "leader" {
workerSummary = &item
}
}
if workerSummary == nil {
t.Fatalf("expected worker summary message, got %#v", messages)
}
if !strings.Contains(workerSummary.BodyMarkdown, "Implemented Task A.") {
t.Fatalf("unexpected worker summary body: %q", workerSummary.BodyMarkdown)
}
chainState, err := store.GetLane(ctx, chainRecord.ID)
if err != nil {
t.Fatalf("GetLane() error = %v", err)
}
if chainState.Status != lane.StatusReady {
t.Fatalf("expected chain to stay ready for next task, got %#v", chainState)
}
}
func TestCompleteAutoCompletesMilestoneAndPromotesDownstreamTask(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 11, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
ws, chainRecord := seedTaskExecGraph(t, ctx, store)
items, err := store.ListTasksByLane(ctx, chainRecord.ID)
if err != nil {
t.Fatalf("ListTasksByLane() error = %v", err)
}
var taskAID string
for _, item := range items {
if item.Title == "Task A" {
taskAID = item.ID
break
}
}
if taskAID == "" {
t.Fatal("expected Task A in seed graph")
}
milestone, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: chainRecord.TopicID,
LaneID: chainRecord.ID,
Title: "Ready for Verification",
BodyMarkdown: "Milestone node.",
Kind: task.KindMilestone,
Status: task.StatusDraft,
Priority: 8,
TaskOrder: 3,
CreatedByRoleName: "leader",
}, []task.Dependency{{DependsOnTaskID: taskAID}})
if err != nil {
t.Fatalf("CreateTask(milestone) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: chainRecord.TopicID,
LaneID: chainRecord.ID,
Title: "Task C",
BodyMarkdown: "Run final verification.",
Kind: task.KindVerification,
Status: task.StatusDraft,
Priority: 7,
TaskOrder: 4,
CreatedByRoleName: "leader",
}, []task.Dependency{{DependsOnTaskID: milestone.ID}}); err != nil {
t.Fatalf("CreateTask(task C) error = %v", err)
}
svc := NewService(store, newResolvedRoleResolver(store, clock), clock, WithSnapshotter(noopSnapshotter{}), WithMaterializer(noopMaterializer{}))
assignment, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-1")
if err != nil {
t.Fatalf("ClaimNext() error = %v", err)
}
if assignment.Task.Title != "Task A" {
t.Fatalf("expected Task A first, got %#v", assignment.Task)
}
if _, err := svc.Complete(ctx, Completion{
RunID: assignment.Run.ID,
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
ResultMarkdown: "Implemented Task A.",
}); err != nil {
t.Fatalf("Complete() error = %v", err)
}
items, err = store.ListTasksByLane(ctx, chainRecord.ID)
if err != nil {
t.Fatalf("ListTasksByLane() error = %v", err)
}
byTitle := make(map[string]task.Record, len(items))
for _, item := range items {
byTitle[item.Title] = item
}
if byTitle["Ready for Verification"].Status != task.StatusSucceeded {
t.Fatalf("expected milestone auto-succeeded, got %#v", byTitle["Ready for Verification"])
}
if byTitle["Task C"].Status != task.StatusReady {
t.Fatalf("expected downstream task promoted after milestone, got %#v", byTitle["Task C"])
}
next, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-2")
if err != nil {
t.Fatalf("ClaimNext(second) error = %v", err)
}
if next.Task.Title != "Task B" {
t.Fatalf("expected Task B next due ordering before Task C, got %#v", next.Task)
}
}
func TestClaimNextCommitsMainStateWhenStartedEventFails(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 10, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
_, chainRecord := seedTaskExecGraph(t, ctx, store)
repo := &flakyTaskExecRepo{
Store: store,
failAppendTaskEvent: true,
}
svc := NewService(repo, newResolvedRoleResolver(store, clock), clock, WithSnapshotter(noopSnapshotter{}), WithMaterializer(noopMaterializer{}))
assignment, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-1")
if err != nil {
t.Fatalf("ClaimNext() should succeed even if started event fails, got %v", err)
}
if assignment.Run.Status != workflow.RunStatusRunning {
t.Fatalf("expected running run, got %#v", assignment.Run)
}
if assignment.Task.Status != task.StatusRunning {
t.Fatalf("expected running task, got %#v", assignment.Task)
}
runs, err := store.ListWorkflowRunsByTopic(ctx, chainRecord.TopicID)
if err != nil {
t.Fatalf("ListWorkflowRunsByTopic() error = %v", err)
}
if len(runs) != 1 || runs[0].ID != assignment.Run.ID || runs[0].Status != workflow.RunStatusRunning {
t.Fatalf("unexpected workflow runs after claim: %#v", runs)
}
persistedTask, err := store.GetTask(ctx, assignment.Task.ID)
if err != nil {
t.Fatalf("GetTask() error = %v", err)
}
if persistedTask.Status != task.StatusRunning || persistedTask.AssignedRunID != assignment.Run.ID {
t.Fatalf("expected persisted task running with assigned run, got %#v", persistedTask)
}
events, err := store.ListTaskEvents(ctx, assignment.Task.ID)
if err != nil {
t.Fatalf("ListTaskEvents() error = %v", err)
}
for _, item := range events {
if item.EventType == "started" {
t.Fatalf("expected no started event when side effect fails, got %#v", events)
}
}
logs, err := store.ListWorkflowRunLogs(ctx, assignment.Run.ID, 0)
if err != nil {
t.Fatalf("ListWorkflowRunLogs() error = %v", err)
}
if len(logs) == 0 {
t.Fatalf("expected warning log after started-event failure")
}
if !strings.Contains(logs[len(logs)-1].Content, "claim side effects failed after main state commit") {
t.Fatalf("unexpected warning log: %#v", logs[len(logs)-1])
}
}
func TestCompleteCommitsMainStateWhenNotificationSideEffectsFail(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 10, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
_, chainRecord := seedTaskExecGraph(t, ctx, store)
repo := &flakyTaskExecRepo{
Store: store,
failCreateMessage: true,
}
svc := NewService(repo, newResolvedRoleResolver(store, clock), clock, WithSnapshotter(noopSnapshotter{}), WithMaterializer(noopMaterializer{}))
assignment, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-1")
if err != nil {
t.Fatalf("ClaimNext() error = %v", err)
}
updatedRun, err := svc.Complete(ctx, Completion{
RunID: assignment.Run.ID,
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
ResultMarkdown: "Implemented Task A.",
})
if err != nil {
t.Fatalf("Complete() should succeed even if side effects fail, got %v", err)
}
if updatedRun.Status != workflow.RunStatusSucceeded {
t.Fatalf("unexpected run status: %#v", updatedRun)
}
tasks, err := store.ListTasksByLane(ctx, chainRecord.ID)
if err != nil {
t.Fatalf("ListTasksByLane() error = %v", err)
}
byTitle := make(map[string]task.Record, len(tasks))
for _, item := range tasks {
byTitle[item.Title] = item
}
if byTitle["Task A"].Status != task.StatusSucceeded {
t.Fatalf("expected Task A succeeded, got %#v", byTitle["Task A"])
}
if byTitle["Task B"].Status != task.StatusReady {
t.Fatalf("expected Task B promoted to ready, got %#v", byTitle["Task B"])
}
chainState, err := store.GetLane(ctx, chainRecord.ID)
if err != nil {
t.Fatalf("GetLane() error = %v", err)
}
if chainState.Status != lane.StatusReady {
t.Fatalf("expected chain ready after commit, got %#v", chainState)
}
messages, err := store.ListMessagesByTopic(ctx, chainRecord.TopicID)
if err != nil {
t.Fatalf("ListMessagesByTopic() error = %v", err)
}
for _, item := range messages {
if item.FromRoleName == "worker" && item.ToExpr == "leader" {
t.Fatalf("expected no worker summary message when notification fails, got %#v", item)
}
}
logs, err := store.ListWorkflowRunLogs(ctx, assignment.Run.ID, 0)
if err != nil {
t.Fatalf("ListWorkflowRunLogs() error = %v", err)
}
if len(logs) == 0 {
t.Fatalf("expected warning log after side-effect failure")
}
if !strings.Contains(logs[len(logs)-1].Content, "completion side effects failed after main state commit") {
t.Fatalf("unexpected warning log: %#v", logs[len(logs)-1])
}
}
func TestCompleteSnapshotsLaneHeadCommit(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 12, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
repoDir := createGitRepo(t)
initialHead := gitHead(t, repoDir)
seedTaskExecRepoGraph(t, ctx, store, repoDir, initialHead)
svc := NewService(store, newResolvedRoleResolver(store, clock), clock, WithMaterializer(noopMaterializer{}))
assignment, err := svc.ClaimNext(ctx, "chain_1", "runner-1")
if err != nil {
t.Fatalf("ClaimNext() error = %v", err)
}
if err := os.WriteFile(filepath.Join(repoDir, "todo.txt"), []byte("hello lane snapshot\n"), 0644); err != nil {
t.Fatalf("WriteFile() error = %v", err)
}
if _, err := svc.Complete(ctx, Completion{
RunID: assignment.Run.ID,
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
ResultMarkdown: "Implemented Task A.",
}); err != nil {
t.Fatalf("Complete() error = %v", err)
}
laneRecord, err := store.GetLane(ctx, "chain_1")
if err != nil {
t.Fatalf("GetLane() error = %v", err)
}
if laneRecord.HeadCommit == "" || laneRecord.HeadCommit == initialHead {
t.Fatalf("expected updated lane head commit, got %#v", laneRecord)
}
if status := gitStatusShort(t, repoDir); strings.TrimSpace(status) != "" {
t.Fatalf("expected clean repo after snapshot, got %q", status)
}
}
func TestClaimNextMaterializesUpstreamLaneCommit(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 13, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
rootRepo := createGitRepo(t)
runGit(t, rootRepo, "checkout", "-b", "upstream")
if err := os.WriteFile(filepath.Join(rootRepo, "backend.txt"), []byte("backend\n"), 0644); err != nil {
t.Fatalf("WriteFile(upstream) error = %v", err)
}
runGit(t, rootRepo, "add", "-A")
runGit(t, rootRepo, "commit", "-m", "upstream change")
upstreamHead := gitHead(t, rootRepo)
runGit(t, rootRepo, "checkout", "main")
downstreamDir := filepath.Join(t.TempDir(), "downstream")
runGit(t, rootRepo, "worktree", "add", "-b", "downstream", downstreamDir, "main")
seedMaterializationGraph(t, ctx, store, rootRepo, downstreamDir, upstreamHead)
svc := NewService(store, newResolvedRoleResolver(store, clock), clock, WithSnapshotter(noopSnapshotter{}))
assignment, err := svc.ClaimNext(ctx, "lane_downstream", "runner-1")
if err != nil {
t.Fatalf("ClaimNext() error = %v", err)
}
if assignment.Task.ID != "task_downstream" {
t.Fatalf("unexpected task claimed: %#v", assignment.Task)
}
if _, err := os.Stat(filepath.Join(downstreamDir, "backend.txt")); err != nil {
t.Fatalf("expected upstream file materialized into downstream lane: %v", err)
}
var count int
if err := store.DB().QueryRow(`SELECT COUNT(*) FROM lane_syncs WHERE downstream_lane_id = ? AND upstream_lane_id = ? AND status = ?`,
"lane_downstream", "lane_upstream", "applied").Scan(&count); err != nil {
t.Fatalf("count lane_syncs: %v", err)
}
if count != 1 {
t.Fatalf("expected one applied lane sync, got %d", count)
}
}
func TestClaimNextBlocksTaskWhenUpstreamLaneHasNoHeadCommit(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 14, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
repoDir := createGitRepo(t)
downstreamDir := filepath.Join(t.TempDir(), "downstream")
runGit(t, repoDir, "worktree", "add", "-b", "downstream", downstreamDir, "main")
seedMaterializationGraph(t, ctx, store, repoDir, downstreamDir, "")
svc := NewService(store, newResolvedRoleResolver(store, clock), clock, WithSnapshotter(noopSnapshotter{}))
assignment, err := svc.ClaimNext(ctx, "lane_downstream", "runner-1")
if !errors.Is(err, sql.ErrNoRows) {
t.Fatalf("expected sql.ErrNoRows after block, got assignment=%#v err=%v", assignment, err)
}
taskRecord, err := store.GetTask(ctx, "task_downstream")
if err != nil {
t.Fatalf("GetTask() error = %v", err)
}
if taskRecord.Status != task.StatusBlocked {
t.Fatalf("expected blocked task, got %#v", taskRecord)
}
laneRecord, err := store.GetLane(ctx, "lane_downstream")
if err != nil {
t.Fatalf("GetLane() error = %v", err)
}
if laneRecord.Status != lane.StatusBlocked {
t.Fatalf("expected blocked lane, got %#v", laneRecord)
}
}
func TestCompleteReleasesLaneRuntimeWhenTopicCompletes(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 16, 15, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
_, chainRecord := seedTaskExecGraph(t, ctx, store)
releaser := &recordingLaneRuntimeReleaser{}
svc := NewService(
store,
newResolvedRoleResolver(store, clock),
clock,
WithSnapshotter(noopSnapshotter{}),
WithMaterializer(noopMaterializer{}),
WithLaneRuntimeReleaser(releaser),
)
first, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-1")
if err != nil {
t.Fatalf("ClaimNext(first) error = %v", err)
}
if _, err := svc.Complete(ctx, Completion{
RunID: first.Run.ID,
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
ResultMarkdown: "Implemented Task A.",
}); err != nil {
t.Fatalf("Complete(first) error = %v", err)
}
second, err := svc.ClaimNext(ctx, chainRecord.ID, "runner-2")
if err != nil {
t.Fatalf("ClaimNext(second) error = %v", err)
}
if second.Task.Title != "Task B" {
t.Fatalf("expected Task B second, got %#v", second.Task)
}
if _, err := svc.Complete(ctx, Completion{
RunID: second.Run.ID,
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
ResultMarkdown: "Implemented Task B.",
}); err != nil {
t.Fatalf("Complete(second) error = %v", err)
}
topicRecord, err := store.GetTopic(ctx, chainRecord.TopicID)
if err != nil {
t.Fatalf("GetTopic() error = %v", err)
}
if topicRecord.Status != "completed" {
t.Fatalf("expected completed topic, got %#v", topicRecord)
}
if len(releaser.laneIDs) != 1 || releaser.laneIDs[0] != chainRecord.ID {
t.Fatalf("expected runtime release for completed lane, got %#v", releaser.laneIDs)
}
}
func seedTaskExecGraph(t *testing.T, ctx context.Context, store *sqlitestore.Store) (roleWorkspace, lane.Record) {
t.Helper()
now := timeutil.FormatRFC3339(time.Date(2026, 3, 16, 10, 0, 0, 0, time.UTC))
if _, err := store.DB().Exec(`
INSERT INTO projects(id, slug, name, root_path, default_branch, status, created_at, updated_at)
VALUES('proj_1', 'proj', 'Project', '/tmp/project', 'main', 'active', ?, ?)
`, now, now); err != nil {
t.Fatalf("insert project: %v", err)
}
if _, err := store.DB().Exec(`
INSERT INTO workspaces(id, project_id, slug, name, root_path, base_branch, worktree_branch, runtime_backend, status, created_at, updated_at)
VALUES('ws_1', 'proj_1', 'main', 'Main', '/tmp/workspace', 'main', 'worktree/main', 'host', 'active', ?, ?)
`, now, now); err != nil {
t.Fatalf("insert workspace: %v", err)
}
if _, err := store.DB().Exec(`
INSERT INTO topics(id, workspace_id, slug, title, space, status, created_at, updated_at)
VALUES('topic_1', 'ws_1', 'sample', 'Sample', ?, 'execution', ?, ?)
`, string(topic.SpaceWorkflow), now, now); err != nil {
t.Fatalf("insert topic: %v", err)
}
if _, err := store.CreateLane(ctx, lane.Record{
ID: "chain_1",
WorkspaceID: "ws_1",
TopicID: "topic_1",
Name: "Backend Chain",
Slug: "backend-chain",
Status: lane.StatusReady,
BaseBranch: "main",
BranchName: "lane/main/backend-lane",
WorktreePath: "/tmp/workspace--backend-chain",
ContainerName: "lane-main-backend-lane",
CreatedByRoleName: "leader",
}); err != nil {
t.Fatalf("CreateLane() error = %v", err)
}
taskA, err := store.CreateTask(ctx, task.Record{
WorkspaceID: "ws_1",
TopicID: "topic_1",
LaneID: "chain_1",
Title: "Task A",
BodyMarkdown: "Implement backend changes.",
Status: task.StatusReady,
TaskOrder: 1,
Priority: 10,
CreatedByRoleName: "leader",
}, nil)
if err != nil {
t.Fatalf("CreateTask(Task A) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: "ws_1",
TopicID: "topic_1",
LaneID: "chain_1",
Title: "Task B",
BodyMarkdown: "Add verification.",
Status: task.StatusDraft,
TaskOrder: 2,
Priority: 5,
CreatedByRoleName: "leader",
}, []task.Dependency{{DependsOnTaskID: taskA.ID}}); err != nil {
t.Fatalf("CreateTask(Task B) error = %v", err)
}
return roleWorkspace{ID: "ws_1"}, lane.Record{ID: "chain_1", TopicID: "topic_1"}
}
func seedTaskExecRepoGraph(t *testing.T, ctx context.Context, store *sqlitestore.Store, repoDir, headCommit string) {
t.Helper()
now := timeutil.FormatRFC3339(time.Date(2026, 3, 16, 12, 0, 0, 0, time.UTC))
mustExec(t, store, `
INSERT INTO projects(id, slug, name, root_path, default_branch, status, created_at, updated_at)
VALUES('proj_git', 'proj-git', 'Project Git', ?, 'main', 'active', ?, ?)
`, repoDir, now, now)
mustExec(t, store, `
INSERT INTO workspaces(id, project_id, slug, name, root_path, base_branch, worktree_branch, runtime_backend, status, created_at, updated_at)
VALUES('ws_git', 'proj_git', 'main', 'Main', ?, 'main', 'worktree/main', 'host', 'active', ?, ?)
`, repoDir, now, now)
mustExec(t, store, `
INSERT INTO topics(id, workspace_id, slug, title, space, status, created_at, updated_at)
VALUES('topic_git', 'ws_git', 'sample-git', 'Sample Git', ?, 'execution', ?, ?)
`, string(topic.SpaceWorkflow), now, now)
if _, err := store.CreateLane(ctx, lane.Record{
ID: "chain_1",
WorkspaceID: "ws_git",
TopicID: "topic_git",
Name: "Snapshot Lane",
Slug: "snapshot-lane",
Status: lane.StatusReady,
BaseBranch: "main",
BranchName: "main",
HeadCommit: headCommit,
WorktreePath: repoDir,
ContainerName: "lane-snapshot",
CreatedByRoleName: "leader",
}); err != nil {
t.Fatalf("CreateLane(snapshot) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
ID: "task_snapshot",
WorkspaceID: "ws_git",
TopicID: "topic_git",
LaneID: "chain_1",
Title: "Task A",
BodyMarkdown: "Implement backend changes.",
Status: task.StatusReady,
TaskOrder: 1,
Priority: 10,
CreatedByRoleName: "leader",
}, nil); err != nil {
t.Fatalf("CreateTask(snapshot) error = %v", err)
}
}
func seedMaterializationGraph(t *testing.T, ctx context.Context, store *sqlitestore.Store, projectRoot, downstreamDir, upstreamHead string) {
t.Helper()
now := timeutil.FormatRFC3339(time.Date(2026, 3, 16, 13, 0, 0, 0, time.UTC))
mustExec(t, store, `
INSERT INTO projects(id, slug, name, root_path, default_branch, status, created_at, updated_at)
VALUES('proj_mat', 'proj-mat', 'Project Mat', ?, 'main', 'active', ?, ?)
`, projectRoot, now, now)
mustExec(t, store, `
INSERT INTO workspaces(id, project_id, slug, name, root_path, base_branch, worktree_branch, runtime_backend, status, created_at, updated_at)
VALUES('ws_mat', 'proj_mat', 'Main', 'Main', ?, 'main', 'worktree/main', 'host', 'active', ?, ?)
`, projectRoot, now, now)
mustExec(t, store, `
INSERT INTO topics(id, workspace_id, slug, title, space, status, created_at, updated_at)
VALUES('topic_mat', 'ws_mat', 'sample-mat', 'Sample Mat', ?, 'execution', ?, ?)
`, string(topic.SpaceWorkflow), now, now)
if _, err := store.CreateLane(ctx, lane.Record{
ID: "lane_upstream",
WorkspaceID: "ws_mat",
TopicID: "topic_mat",
Name: "Upstream Lane",
Slug: "upstream-lane",
Status: lane.StatusSucceeded,
BaseBranch: "main",
BranchName: "upstream",
HeadCommit: upstreamHead,
WorktreePath: projectRoot,
ContainerName: "lane-upstream",
CreatedByRoleName: "leader",
}); err != nil {
t.Fatalf("CreateLane(upstream) error = %v", err)
}
if _, err := store.CreateLane(ctx, lane.Record{
ID: "lane_downstream",
WorkspaceID: "ws_mat",
TopicID: "topic_mat",
Name: "Downstream Lane",
Slug: "downstream-lane",
Status: lane.StatusReady,
BaseBranch: "main",
BranchName: "downstream",
WorktreePath: downstreamDir,
ContainerName: "lane-downstream",
CreatedByRoleName: "leader",
}); err != nil {
t.Fatalf("CreateLane(downstream) error = %v", err)
}
upstreamTask, err := store.CreateTask(ctx, task.Record{
ID: "task_upstream",
WorkspaceID: "ws_mat",
TopicID: "topic_mat",
LaneID: "lane_upstream",
Title: "Task Upstream",
BodyMarkdown: "Implement upstream work.",
Status: task.StatusSucceeded,
TaskOrder: 1,
Priority: 10,
CreatedByRoleName: "leader",
ResultSummaryMarkdown: "Done.",
BlockingReasonMarkdown: "",
CompletedAt: now,
}, nil)
if err != nil {
t.Fatalf("CreateTask(upstream) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
ID: "task_downstream",
WorkspaceID: "ws_mat",
TopicID: "topic_mat",
LaneID: "lane_downstream",
Title: "Task Downstream",
BodyMarkdown: "Integrate upstream work.",
Status: task.StatusReady,
TaskOrder: 1,
Priority: 5,
CreatedByRoleName: "leader",
}, []task.Dependency{{DependsOnTaskID: upstreamTask.ID}}); err != nil {
t.Fatalf("CreateTask(downstream) error = %v", err)
}
}
func mustExec(t *testing.T, store *sqlitestore.Store, query string, args ...any) {
t.Helper()
if _, err := store.DB().Exec(query, args...); err != nil {
t.Fatalf("Exec(%q) error = %v", query, err)
}
}
func createGitRepo(t *testing.T) string {
t.Helper()
repoDir := filepath.Join(t.TempDir(), "repo")
if err := os.MkdirAll(repoDir, 0755); err != nil {
t.Fatalf("MkdirAll() error = %v", err)
}
runGit(t, repoDir, "init", "-b", "main")
runGit(t, repoDir, "config", "user.name", "Test User")
runGit(t, repoDir, "config", "user.email", "test@example.com")
runGit(t, repoDir, "commit", "--allow-empty", "-m", "initial commit")
return repoDir
}
func gitHead(t *testing.T, dir string) string {
t.Helper()
return strings.TrimSpace(runGit(t, dir, "rev-parse", "HEAD"))
}
func gitStatusShort(t *testing.T, dir string) string {
t.Helper()
return runGit(t, dir, "status", "--short")
}
func runGit(t *testing.T, dir string, args ...string) string {
t.Helper()
cmd := exec.Command("git", args...)
cmd.Dir = dir
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("git %s: %v\n%s", strings.Join(args, " "), err, strings.TrimSpace(string(out)))
}
return strings.TrimSpace(string(out))
}
type roleWorkspace struct {
ID string
}
type resolvedRoleResolver struct {
store *sqlitestore.Store
clock timeutil.Clock
}
func newResolvedRoleResolver(store *sqlitestore.Store, clock timeutil.Clock) *resolvedRoleResolver {
return &resolvedRoleResolver{store: store, clock: clock}
}
func (r *resolvedRoleResolver) ResolveRole(ctx context.Context, workspaceID, roleName string) (runtimeconfig.ResolvedRole, error) {
return runtimeconfig.NewService(r.store, r.store, r.clock).ResolveRole(ctx, workspaceID, roleName)
}
type noopSnapshotter struct{}
func (noopSnapshotter) Capture(_ context.Context, item lane.Record, _ task.Record) (string, error) {
return item.HeadCommit, nil
}
type noopMaterializer struct{}
func (noopMaterializer) Materialize(_ context.Context, _ lane.Record, _ string, _ []lanematerialize.Upstream) error {
return nil
}
type recordingLaneRuntimeReleaser struct {
laneIDs []string
}
func (r *recordingLaneRuntimeReleaser) ReleaseLaneRuntime(_ context.Context, laneID string) (lane.Record, error) {
r.laneIDs = append(r.laneIDs, laneID)
return lane.Record{ID: laneID}, nil
}
type flakyTaskExecRepo struct {
*sqlitestore.Store
failCreateMessage bool
failAppendTaskEvent bool
}
func (r *flakyTaskExecRepo) CreateMessage(ctx context.Context, value message.Record) (message.Record, error) {
if r.failCreateMessage {
return message.Record{}, errors.New("forced create message failure")
}
return r.Store.CreateMessage(ctx, value)
}
func (r *flakyTaskExecRepo) AppendTaskEvent(ctx context.Context, value task.Event) (task.Event, error) {
if r.failAppendTaskEvent {
return task.Event{}, errors.New("forced append task event failure")
}
return r.Store.AppendTaskEvent(ctx, value)
}