From 07f4a6fdae370423f29c9e45e74aee83618ee748 Mon Sep 17 00:00:00 2001 From: kurihada Date: Thu, 19 Mar 2026 13:13:36 +0800 Subject: [PATCH] Implement orch core scheduling --- cmd/orch/main.go | 4 +- docs/implementation-roadmap.md | 45 +- docs/roadmaps/archive/orch-core-scheduling.md | 64 + internal/cli/orch/answer.go | 77 + internal/cli/orch/blocked.go | 64 + internal/cli/orch/body.go | 22 + internal/cli/orch/db.go | 22 + internal/cli/orch/dep.go | 76 + internal/cli/orch/dispatch.go | 94 + internal/cli/orch/execute.go | 113 ++ internal/cli/orch/integration_test.go | 506 +++++ internal/cli/orch/ready.go | 69 + internal/cli/orch/reconcile.go | 58 + internal/cli/orch/root.go | 16 +- internal/cli/orch/run.go | 122 +- internal/cli/orch/status.go | 65 + internal/cli/orch/task.go | 88 + internal/cli/orch/test_helpers_test.go | 109 ++ internal/store/orch.go | 1639 +++++++++++++++++ 19 files changed, 3230 insertions(+), 23 deletions(-) create mode 100644 docs/roadmaps/archive/orch-core-scheduling.md create mode 100644 internal/cli/orch/answer.go create mode 100644 internal/cli/orch/blocked.go create mode 100644 internal/cli/orch/body.go create mode 100644 internal/cli/orch/db.go create mode 100644 internal/cli/orch/dep.go create mode 100644 internal/cli/orch/dispatch.go create mode 100644 internal/cli/orch/execute.go create mode 100644 internal/cli/orch/integration_test.go create mode 100644 internal/cli/orch/ready.go create mode 100644 internal/cli/orch/reconcile.go create mode 100644 internal/cli/orch/status.go create mode 100644 internal/cli/orch/task.go create mode 100644 internal/cli/orch/test_helpers_test.go create mode 100644 internal/store/orch.go diff --git a/cmd/orch/main.go b/cmd/orch/main.go index 48b2ffe..caa5019 100644 --- a/cmd/orch/main.go +++ b/cmd/orch/main.go @@ -7,7 +7,5 @@ import ( ) func main() { - if err := orchcli.NewRootCmd().Execute(); err != nil { - os.Exit(1) - } + os.Exit(orchcli.Execute(os.Args[1:], os.Stdout, os.Stderr)) } diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index 8222b5b..1fc59fe 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -24,10 +24,11 @@ As of now: - a reusable Codex skill package for `inbox` now exists under `skills/inbox/`, with a formal `SKILL.md`, `agents/openai.yaml`, and a bundled CLI binary asset - an inbox skill forward-test plan directory now exists under `docs/tests/inbox-skill/`, with a shared execution template and multiple scenario cases - an execution-roadmap workflow now exists under `docs/roadmaps/active/` and `docs/roadmaps/archive/` for agent-level work traces and completion archives -- `orch` currently exists as a command skeleton only -- no scheduler workflows have been implemented yet +- `orch` now implements `run init/show`, `task add`, `dep add`, `ready`, `dispatch`, `reconcile`, `blocked`, `answer`, and `status` +- `orch` can create runs, gate tasks through dependencies, dispatch work through `inbox`, reconcile worker thread state back into task state, and answer blocked tasks +- automated integration tests now cover the main `orch` scheduler slice, including dependency gating, dispatch, blocked-answer flow, and reconcile -This means the project is past design discovery and ready for `orch` implementation. +This means the project now has a working `orch` core scheduler and is ready for strict worktree-backed execution support. ## Source Of Truth @@ -67,9 +68,10 @@ Current implementation status: - `Milestone 1: Go Skeleton` is complete - `Milestone 2: Shared DB Layer` is complete enough for both CLIs - `Milestone 3: Inbox Happy Path` is complete +- `Milestone 4: Orch Core Scheduling` is complete for the current non-worktree scheduler scope - `Milestone 6: Waiting Primitives` is partially complete through `inbox wait-reply` -The next practical coding target is `Milestone 4: Orch Core Scheduling`. +The next practical coding target is `Milestone 5: Strict Worktree Support`. ### Milestone 1: Go Skeleton @@ -217,6 +219,30 @@ Definition of done: - dispatch a task through `orch` - see worker state reflected back after `reconcile` +Status: + +- completed for the current non-worktree scheduling scope + +Completed so far: + +- `orch run init` +- `orch run show` +- `orch task add` +- `orch dep add` +- `orch ready` +- `orch dispatch` +- `orch reconcile` +- `orch blocked` +- `orch answer` +- `orch status` +- CLI integration tests cover single-task dispatch/reconcile, dependency gating, blocked-answer flow, and non-ready dispatch rejection + +Remaining: + +- strict worktree provisioning on dispatch +- `orch wait` +- retry, reassign, cancel, and cleanup workflows + ### Milestone 5: Strict Worktree Support Goal: @@ -273,10 +299,11 @@ Definition of done: If a new agent is taking over now, the next concrete step should be: -1. start `Milestone 4: Orch Core Scheduling` -2. keep the authored inbox test-plan set in `docs/tests/inbox/` synchronized if CLI behavior changes during `orch` work +1. start `Milestone 5: Strict Worktree Support` +2. add real worktree metadata population to `orch dispatch` +3. keep the authored inbox test-plan set in `docs/tests/inbox/` synchronized if CLI behavior changes during further `orch` work -The inbox implementation and its human-readable test-plan set are already in place, so the next meaningful project step is to build scheduler behavior on top of that stable base. +The inbox implementation and its human-readable test-plan set are already in place, and the initial `orch` scheduler loop now exists, so the next meaningful project step is to isolate code-writing attempts in real worktrees. ## Recommended Driver Choices @@ -297,11 +324,13 @@ Completed so far: - schema init test - inbox command-level CLI integration coverage aligned to `docs/tests/inbox/` - inbox workflow lifecycle coverage +- orch scheduler lifecycle coverage for run/task/dependency/dispatch/reconcile +- orch blocked-question and answer coverage Still recommended before the codebase grows too much: -- single-task orch dispatch and reconcile test - worktree path generation test +- `orch wait` event wake test - council tally grouping test ## Inbox Test Documentation Roadmap diff --git a/docs/roadmaps/archive/orch-core-scheduling.md b/docs/roadmaps/archive/orch-core-scheduling.md new file mode 100644 index 0000000..9c50d33 --- /dev/null +++ b/docs/roadmaps/archive/orch-core-scheduling.md @@ -0,0 +1,64 @@ +# Orch Core Scheduling + +## Status + +- `completed` + +## Owner + +- codex + +## Started At + +- `2026-03-19` + +## Goal + +- implement the first usable `orch` scheduling slice on top of the existing shared SQLite schema and `inbox` transport +- deliver a leader workflow that can create a run, add tasks and dependencies, dispatch ready work, reconcile inbox state, and answer blocked tasks + +## Scope + +- add `orch` store primitives for runs, tasks, dependencies, attempts, readiness, dispatch, reconcile, blocked lookup, and run status views +- add CLI commands for the first Milestone 4 surface +- add automated tests for the happy-path scheduler workflow and core state transitions +- update the implementation roadmap with the new progress state + +## Checklist + +- [x] inspect the current `orch` skeleton, schema, and project roadmaps +- [x] implement `orch` store types and DB operations for runs, tasks, dependencies, attempts, and task-state transitions +- [x] add CLI commands for `run init`, `run show`, `task add`, `dep add`, `ready`, `dispatch`, `reconcile`, `blocked`, `answer`, and `status` +- [x] add automated tests covering run creation, dependency gating, dispatch, blocked-answer flow, and reconcile +- [x] run `go test ./...` +- [x] update `docs/implementation-roadmap.md` +- [x] archive this roadmap with a completion summary when the workstream is complete + +## Files + +- `docs/roadmaps/active/orch-core-scheduling.md` +- `docs/implementation-roadmap.md` +- `cmd/orch/main.go` +- `internal/cli/orch/root.go` +- `internal/cli/orch/*.go` +- `internal/store/orch.go` +- `internal/cli/orch/*_test.go` + +## Decisions + +- start with the scheduler loop that reuses existing `inbox` behavior rather than attempting worktree orchestration in the same slice +- keep JSON response style aligned with `inbox` so both CLIs expose consistent machine-readable contracts + +## Blockers + +- none + +## Next Step + +- start `Milestone 5: Strict Worktree Support` by extending `orch dispatch` to resolve a concrete base commit and create real worktree metadata + +## Completion Summary + +- `orch` now has a usable core scheduler loop backed by shared SQLite state and `inbox` +- the implemented CLI surface covers `run init/show`, `task add`, `dep add`, `ready`, `dispatch`, `reconcile`, `blocked`, `answer`, and `status` +- integration tests now verify dispatch/reconcile lifecycle, dependency gating, blocked-question answers, and non-ready dispatch rejection diff --git a/internal/cli/orch/answer.go b/internal/cli/orch/answer.go new file mode 100644 index 0000000..373dce7 --- /dev/null +++ b/internal/cli/orch/answer.go @@ -0,0 +1,77 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type answerOptions struct { + runID string + taskID string + body string + bodyFile string + payloadJSON string +} + +func newAnswerCmd(root *rootOptions) *cobra.Command { + opts := &answerOptions{} + + cmd := &cobra.Command{ + Use: "answer", + Short: "Answer the active blocked question for a task", + RunE: func(cmd *cobra.Command, args []string) error { + body, err := resolveBodyValue(opts.body, opts.bodyFile) + if err != nil { + return err + } + + ctx := cmd.Context() + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + result, err := store.NewOrchStore(sqlDB).AnswerTask(ctx, store.AnswerInput{ + RunID: opts.runID, + TaskID: opts.taskID, + Body: body, + PayloadJSON: opts.payloadJSON, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "answer", + Data: map[string]any{ + "task": result.Task, + "attempt": result.Attempt, + "thread": result.Thread, + "message": result.Message, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "answered task %s on thread %s\n", result.Task.TaskID, result.Thread.ThreadID) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + cmd.Flags().StringVar(&opts.taskID, "task", "", "Task ID") + cmd.Flags().StringVar(&opts.body, "body", "", "Answer body") + cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read answer body from file") + cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string") + _ = cmd.MarkFlagRequired("run") + _ = cmd.MarkFlagRequired("task") + + return cmd +} diff --git a/internal/cli/orch/blocked.go b/internal/cli/orch/blocked.go new file mode 100644 index 0000000..5e0cb93 --- /dev/null +++ b/internal/cli/orch/blocked.go @@ -0,0 +1,64 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type blockedOptions struct { + runID string +} + +func newBlockedCmd(root *rootOptions) *cobra.Command { + opts := &blockedOptions{} + + cmd := &cobra.Command{ + Use: "blocked", + Short: "List blocked tasks and their latest question", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + blocked, err := store.NewOrchStore(sqlDB).ListBlockedTasks(ctx, opts.runID) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "blocked", + Data: map[string]any{ + "blocked": blocked, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + if len(blocked) == 0 { + _, err = fmt.Fprintln(cmd.OutOrStdout(), "no blocked tasks") + return err + } + for _, item := range blocked { + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\n", item.Task.TaskID, item.Question.Summary); err != nil { + return err + } + } + return nil + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + _ = cmd.MarkFlagRequired("run") + + return cmd +} diff --git a/internal/cli/orch/body.go b/internal/cli/orch/body.go new file mode 100644 index 0000000..4663691 --- /dev/null +++ b/internal/cli/orch/body.go @@ -0,0 +1,22 @@ +package orch + +import ( + "os" + + "ai-workflow-skill/internal/protocol" +) + +func resolveBodyValue(body, bodyFile string) (string, error) { + if body != "" && bodyFile != "" { + return "", protocol.InvalidInput("body and body-file are mutually exclusive", nil) + } + if bodyFile == "" { + return body, nil + } + + content, err := os.ReadFile(bodyFile) + if err != nil { + return "", protocol.InvalidInput("failed to read body-file", err) + } + return string(content), nil +} diff --git a/internal/cli/orch/db.go b/internal/cli/orch/db.go new file mode 100644 index 0000000..a6e1861 --- /dev/null +++ b/internal/cli/orch/db.go @@ -0,0 +1,22 @@ +package orch + +import ( + "context" + "database/sql" + + "ai-workflow-skill/internal/db" +) + +func openOrchDB(ctx context.Context, dbPath string) (*sql.DB, error) { + sqlDB, err := db.Open(ctx, dbPath) + if err != nil { + return nil, err + } + + if err := db.ApplyMigrations(ctx, sqlDB); err != nil { + _ = sqlDB.Close() + return nil, err + } + + return sqlDB, nil +} diff --git a/internal/cli/orch/dep.go b/internal/cli/orch/dep.go new file mode 100644 index 0000000..14bcac3 --- /dev/null +++ b/internal/cli/orch/dep.go @@ -0,0 +1,76 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type depAddOptions struct { + runID string + taskID string + dependsOn string +} + +func newDepCmd(root *rootOptions) *cobra.Command { + cmd := &cobra.Command{ + Use: "dep", + Short: "Task dependency commands", + } + + cmd.AddCommand(newDepAddCmd(root)) + return cmd +} + +func newDepAddCmd(root *rootOptions) *cobra.Command { + opts := &depAddOptions{} + + cmd := &cobra.Command{ + Use: "add", + Short: "Add a dependency edge to a task", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + dep, err := store.NewOrchStore(sqlDB).AddDependency(ctx, store.AddDependencyInput{ + RunID: opts.runID, + TaskID: opts.taskID, + DependsOnTaskID: opts.dependsOn, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "dep add", + Data: map[string]any{ + "dependency": dep, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "added dependency %s -> %s\n", dep.TaskID, dep.DependsOnTaskID) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + cmd.Flags().StringVar(&opts.taskID, "task", "", "Task ID") + cmd.Flags().StringVar(&opts.dependsOn, "depends-on", "", "Dependency task ID") + _ = cmd.MarkFlagRequired("run") + _ = cmd.MarkFlagRequired("task") + _ = cmd.MarkFlagRequired("depends-on") + + return cmd +} diff --git a/internal/cli/orch/dispatch.go b/internal/cli/orch/dispatch.go new file mode 100644 index 0000000..a6c327a --- /dev/null +++ b/internal/cli/orch/dispatch.go @@ -0,0 +1,94 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type dispatchOptions struct { + runID string + taskID string + toAgent string + body string + bodyFile string + baseRef string + workspaceRoot string + strictWorktree bool +} + +func newDispatchCmd(root *rootOptions) *cobra.Command { + opts := &dispatchOptions{} + + cmd := &cobra.Command{ + Use: "dispatch", + Short: "Dispatch a ready task to a worker through inbox", + RunE: func(cmd *cobra.Command, args []string) error { + if opts.workspaceRoot != "" || opts.strictWorktree { + return protocol.InvalidInput("worktree dispatch is not implemented yet", nil) + } + + body, err := resolveBodyValue(opts.body, opts.bodyFile) + if err != nil { + return err + } + + ctx := cmd.Context() + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + result, err := store.NewOrchStore(sqlDB).DispatchTask(ctx, store.DispatchInput{ + RunID: opts.runID, + TaskID: opts.taskID, + ToAgent: opts.toAgent, + Body: body, + BaseRef: opts.baseRef, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "dispatch", + Data: map[string]any{ + "task": result.Task, + "attempt": result.Attempt, + "thread": result.Thread, + "message": result.Message, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf( + cmd.OutOrStdout(), + "dispatched task %s to %s as thread %s\n", + result.Task.TaskID, + result.Attempt.AssignedTo, + result.Attempt.ThreadID, + ) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + cmd.Flags().StringVar(&opts.taskID, "task", "", "Task ID") + cmd.Flags().StringVar(&opts.toAgent, "to", "", "Worker agent override") + cmd.Flags().StringVar(&opts.body, "body", "", "Task message body") + cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read task message body from file") + cmd.Flags().StringVar(&opts.baseRef, "base-ref", "", "Optional base ref to record on the attempt") + cmd.Flags().StringVar(&opts.workspaceRoot, "workspace-root", "", "Workspace root for worktree dispatch") + cmd.Flags().BoolVar(&opts.strictWorktree, "strict-worktree", false, "Require strict worktree setup") + _ = cmd.MarkFlagRequired("run") + _ = cmd.MarkFlagRequired("task") + + return cmd +} diff --git a/internal/cli/orch/execute.go b/internal/cli/orch/execute.go new file mode 100644 index 0000000..5805455 --- /dev/null +++ b/internal/cli/orch/execute.go @@ -0,0 +1,113 @@ +package orch + +import ( + "errors" + "fmt" + "io" + "strings" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" +) + +func Execute(args []string, stdout, stderr io.Writer) int { + cmd := NewRootCmd() + cmd.SetOut(stdout) + cmd.SetErr(stderr) + cmd.SetArgs(args) + + if err := cmd.Execute(); err != nil { + jsonOutput := hasJSONFlag(args) + renderError(stdout, stderr, jsonOutput, err) + return exitCodeForError(err) + } + + return 0 +} + +func exitCodeForError(err error) int { + var cliErr *protocol.CLIError + if errors.As(err, &cliErr) { + return cliErr.ExitCode + } + + switch { + case isUsageError(err): + return 30 + case errors.Is(err, store.ErrLeaseConflict): + return 20 + case errors.Is(err, store.ErrRunNotFound), errors.Is(err, store.ErrTaskNotFound), errors.Is(err, store.ErrThreadNotFound), errors.Is(err, store.ErrMessageNotFound): + return 40 + case errors.Is(err, store.ErrInvalidInput), errors.Is(err, store.ErrInvalidState), errors.Is(err, store.ErrNoActiveLease): + return 30 + default: + return 50 + } +} + +func errorCodeForError(err error) string { + var cliErr *protocol.CLIError + if errors.As(err, &cliErr) { + return cliErr.Code + } + + switch { + case isUsageError(err): + return "invalid_input" + case errors.Is(err, store.ErrLeaseConflict): + return "conflict" + case errors.Is(err, store.ErrRunNotFound), errors.Is(err, store.ErrTaskNotFound), errors.Is(err, store.ErrThreadNotFound), errors.Is(err, store.ErrMessageNotFound): + return "not_found" + case errors.Is(err, store.ErrInvalidInput): + return "invalid_input" + case errors.Is(err, store.ErrInvalidState), errors.Is(err, store.ErrNoActiveLease): + return "invalid_state" + default: + return "internal_error" + } +} + +func renderError(stdout, stderr io.Writer, jsonOutput bool, err error) { + message := errorMessage(err) + if jsonOutput { + _ = protocol.WriteJSON(stdout, protocol.Error{ + OK: false, + Error: protocol.ErrorPayload{ + Code: errorCodeForError(err), + Message: message, + }, + }) + return + } + + _, _ = fmt.Fprintln(stderr, message) +} + +func errorMessage(err error) string { + var cliErr *protocol.CLIError + if errors.As(err, &cliErr) { + return cliErr.Message + } + return err.Error() +} + +func hasJSONFlag(args []string) bool { + for _, arg := range args { + if arg == "--json" { + return true + } + if strings.HasPrefix(arg, "--json=") { + return !strings.HasSuffix(arg, "=false") + } + } + return false +} + +func isUsageError(err error) bool { + message := err.Error() + return strings.HasPrefix(message, "required flag(s)") || + strings.HasPrefix(message, "unknown flag:") || + strings.HasPrefix(message, "unknown command ") || + strings.Contains(message, " accepts ") || + strings.Contains(message, "invalid argument ") +} diff --git a/internal/cli/orch/integration_test.go b/internal/cli/orch/integration_test.go new file mode 100644 index 0000000..71c4827 --- /dev/null +++ b/internal/cli/orch/integration_test.go @@ -0,0 +1,506 @@ +package orch + +import ( + "path/filepath" + "testing" +) + +func TestOrchRunDispatchReconcileLifecycle(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "run", "init", + "--run", "run_blog_001", + "--goal", "Build blog MVP", + "--summary", "Public blog plus admin CRUD", + ) + + taskOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "task", "add", + "--run", "run_blog_001", + "--task", "T1", + "--title", "Implement retry policy", + "--summary", "Add retry policy to HTTP client", + "--default-to", "worker-a", + ) + + var taskResp map[string]any + mustDecodeJSON(t, taskOut, &taskResp) + if got := nestedString(t, taskResp, "data", "task", "status"); got != "ready" { + t.Fatalf("expected new task to become ready, got %q", got) + } + + readyOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "ready", + "--run", "run_blog_001", + ) + + var readyResp map[string]any + mustDecodeJSON(t, readyOut, &readyResp) + readyTasks := nestedArray(t, readyResp, "data", "tasks") + if len(readyTasks) != 1 { + t.Fatalf("expected one ready task, got %#v", readyTasks) + } + readyTask, ok := readyTasks[0].(map[string]any) + if !ok { + t.Fatalf("expected ready task object, got %#v", readyTasks[0]) + } + if taskID, _ := readyTask["task_id"].(string); taskID != "T1" { + t.Fatalf("expected ready task T1, got %#v", readyTask["task_id"]) + } + + dispatchOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "dispatch", + "--run", "run_blog_001", + "--task", "T1", + "--body", "Implement retry handling for the HTTP client.", + ) + + var dispatchResp map[string]any + mustDecodeJSON(t, dispatchOut, &dispatchResp) + if got := nestedString(t, dispatchResp, "data", "task", "status"); got != "dispatched" { + t.Fatalf("expected dispatched task, got %q", got) + } + threadID := nestedString(t, dispatchResp, "data", "attempt", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", threadID, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-a", + "--thread", threadID, + "--status", "in_progress", + "--summary", "Implementation started", + ) + + reconcileOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "reconcile", + "--run", "run_blog_001", + ) + + var reconcileResp map[string]any + mustDecodeJSON(t, reconcileOut, &reconcileResp) + updatedTasks := nestedArray(t, reconcileResp, "data", "updated_tasks") + if len(updatedTasks) != 1 { + t.Fatalf("expected one updated task after running reconcile, got %#v", updatedTasks) + } + runningTask, ok := updatedTasks[0].(map[string]any) + if !ok { + t.Fatalf("expected updated task object, got %#v", updatedTasks[0]) + } + if status, _ := runningTask["status"].(string); status != "running" { + t.Fatalf("expected running task after reconcile, got %#v", runningTask["status"]) + } + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadID, + "--summary", "Retry policy implemented", + "--body", "The HTTP client now retries transient failures.", + ) + + reconcileDoneOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "reconcile", + "--run", "run_blog_001", + ) + + var reconcileDoneResp map[string]any + mustDecodeJSON(t, reconcileDoneOut, &reconcileDoneResp) + updatedTasks = nestedArray(t, reconcileDoneResp, "data", "updated_tasks") + if len(updatedTasks) != 1 { + t.Fatalf("expected one updated task after done reconcile, got %#v", updatedTasks) + } + doneTask, ok := updatedTasks[0].(map[string]any) + if !ok { + t.Fatalf("expected updated task object, got %#v", updatedTasks[0]) + } + if status, _ := doneTask["status"].(string); status != "done" { + t.Fatalf("expected done task after reconcile, got %#v", doneTask["status"]) + } + + statusOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "status", + "--run", "run_blog_001", + ) + + var statusResp map[string]any + mustDecodeJSON(t, statusOut, &statusResp) + if got := nestedString(t, statusResp, "data", "run", "status"); got != "done" { + t.Fatalf("expected run status done, got %q", got) + } + tasks := nestedArray(t, statusResp, "data", "tasks") + if len(tasks) != 1 { + t.Fatalf("expected one task in status response, got %#v", tasks) + } + task, ok := tasks[0].(map[string]any) + if !ok { + t.Fatalf("expected task object, got %#v", tasks[0]) + } + if got, _ := task["status"].(string); got != "done" { + t.Fatalf("expected status task done, got %#v", task["status"]) + } +} + +func TestOrchDependencyBlockedAndAnswerFlow(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "run", "init", + "--run", "run_blog_002", + "--goal", "Build dependency-aware workflow", + ) + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "task", "add", + "--run", "run_blog_002", + "--task", "T1", + "--title", "Build backend", + "--summary", "Implement backend APIs", + "--default-to", "worker-a", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "task", "add", + "--run", "run_blog_002", + "--task", "T2", + "--title", "Build frontend", + "--summary", "Implement frontend flows", + "--default-to", "worker-b", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "dep", "add", + "--run", "run_blog_002", + "--task", "T2", + "--depends-on", "T1", + ) + + readyOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "ready", + "--run", "run_blog_002", + ) + + var readyResp map[string]any + mustDecodeJSON(t, readyOut, &readyResp) + readyTasks := nestedArray(t, readyResp, "data", "tasks") + if len(readyTasks) != 1 { + t.Fatalf("expected only dependency-free task ready, got %#v", readyTasks) + } + readyTask, ok := readyTasks[0].(map[string]any) + if !ok { + t.Fatalf("expected ready task object, got %#v", readyTasks[0]) + } + if taskID, _ := readyTask["task_id"].(string); taskID != "T1" { + t.Fatalf("expected T1 ready before dependency clears, got %#v", readyTask["task_id"]) + } + + dispatchBackendOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "dispatch", + "--run", "run_blog_002", + "--task", "T1", + ) + + var dispatchBackendResp map[string]any + mustDecodeJSON(t, dispatchBackendOut, &dispatchBackendResp) + threadBackend := nestedString(t, dispatchBackendResp, "data", "attempt", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", threadBackend, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadBackend, + "--summary", "Backend complete", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "reconcile", + "--run", "run_blog_002", + ) + + readyAfterDoneOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "ready", + "--run", "run_blog_002", + ) + + var readyAfterDoneResp map[string]any + mustDecodeJSON(t, readyAfterDoneOut, &readyAfterDoneResp) + readyTasks = nestedArray(t, readyAfterDoneResp, "data", "tasks") + if len(readyTasks) != 1 { + t.Fatalf("expected dependent task to become ready, got %#v", readyTasks) + } + readyTask, ok = readyTasks[0].(map[string]any) + if !ok { + t.Fatalf("expected ready task object, got %#v", readyTasks[0]) + } + if taskID, _ := readyTask["task_id"].(string); taskID != "T2" { + t.Fatalf("expected T2 ready after T1 completion, got %#v", readyTask["task_id"]) + } + + dispatchFrontendOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "dispatch", + "--run", "run_blog_002", + "--task", "T2", + ) + + var dispatchFrontendResp map[string]any + mustDecodeJSON(t, dispatchFrontendOut, &dispatchFrontendResp) + threadFrontend := nestedString(t, dispatchFrontendResp, "data", "attempt", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-b", + "--thread", threadFrontend, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-b", + "--thread", threadFrontend, + "--status", "blocked", + "--summary", "Need logging decision", + "--payload-json", `{"question":"stdout or stderr?"}`, + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "reconcile", + "--run", "run_blog_002", + ) + + blockedOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "blocked", + "--run", "run_blog_002", + ) + + var blockedResp map[string]any + mustDecodeJSON(t, blockedOut, &blockedResp) + blockedTasks := nestedArray(t, blockedResp, "data", "blocked") + if len(blockedTasks) != 1 { + t.Fatalf("expected one blocked task, got %#v", blockedTasks) + } + blockedTask, ok := blockedTasks[0].(map[string]any) + if !ok { + t.Fatalf("expected blocked task object, got %#v", blockedTasks[0]) + } + question, ok := blockedTask["question"].(map[string]any) + if !ok { + t.Fatalf("expected blocked question object, got %#v", blockedTask["question"]) + } + if kind, _ := question["kind"].(string); kind != "question" { + t.Fatalf("expected blocked question kind, got %#v", question["kind"]) + } + + answerOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "answer", + "--run", "run_blog_002", + "--task", "T2", + "--body", "Use stdout for MVP.", + ) + + var answerResp map[string]any + mustDecodeJSON(t, answerOut, &answerResp) + if got := nestedString(t, answerResp, "data", "message", "kind"); got != "answer" { + t.Fatalf("expected answer message kind, got %q", got) + } + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadFrontend, + ) + + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + messages := nestedArray(t, showResp, "data", "messages") + if len(messages) < 4 { + t.Fatalf("expected answer to append a message, got %#v", messages) + } + lastMessage, ok := messages[len(messages)-1].(map[string]any) + if !ok { + t.Fatalf("expected last message object, got %#v", messages[len(messages)-1]) + } + if kind, _ := lastMessage["kind"].(string); kind != "answer" { + t.Fatalf("expected latest message to be answer, got %#v", lastMessage["kind"]) + } + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-b", + "--thread", threadFrontend, + "--status", "in_progress", + "--summary", "Decision applied", + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-b", + "--thread", threadFrontend, + "--summary", "Frontend complete", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "reconcile", + "--run", "run_blog_002", + ) + + statusOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "status", + "--run", "run_blog_002", + ) + + var statusResp map[string]any + mustDecodeJSON(t, statusOut, &statusResp) + if got := nestedString(t, statusResp, "data", "run", "status"); got != "done" { + t.Fatalf("expected run status done after both tasks, got %q", got) + } +} + +func TestOrchDispatchRejectsNonReadyTask(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "run", "init", + "--run", "run_blog_003", + "--goal", "Validate ready gating", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "task", "add", + "--run", "run_blog_003", + "--task", "T1", + "--title", "Backend", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "task", "add", + "--run", "run_blog_003", + "--task", "T2", + "--title", "Frontend", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "dep", "add", + "--run", "run_blog_003", + "--task", "T2", + "--depends-on", "T1", + ) + + stdout, _, exitCode := executeOrchCommand( + "--db", dbPath, + "--json", + "dispatch", + "--run", "run_blog_003", + "--task", "T2", + ) + if exitCode != 30 { + t.Fatalf("expected invalid_state exit code 30, got %d\nstdout:\n%s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "invalid_state") +} diff --git a/internal/cli/orch/ready.go b/internal/cli/orch/ready.go new file mode 100644 index 0000000..4a55b8b --- /dev/null +++ b/internal/cli/orch/ready.go @@ -0,0 +1,69 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type readyOptions struct { + runID string + limit int +} + +func newReadyCmd(root *rootOptions) *cobra.Command { + opts := &readyOptions{} + + cmd := &cobra.Command{ + Use: "ready", + Short: "List tasks that are ready for dispatch", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + tasks, err := store.NewOrchStore(sqlDB).ListReadyTasks(ctx, store.ListReadyInput{ + RunID: opts.runID, + Limit: opts.limit, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "ready", + Data: map[string]any{ + "tasks": tasks, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + if len(tasks) == 0 { + _, err = fmt.Fprintln(cmd.OutOrStdout(), "no ready tasks") + return err + } + for _, task := range tasks { + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", task.TaskID, task.Priority, task.Title); err != nil { + return err + } + } + return nil + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + cmd.Flags().IntVar(&opts.limit, "limit", 20, "Maximum number of tasks to list") + _ = cmd.MarkFlagRequired("run") + + return cmd +} diff --git a/internal/cli/orch/reconcile.go b/internal/cli/orch/reconcile.go new file mode 100644 index 0000000..ba07374 --- /dev/null +++ b/internal/cli/orch/reconcile.go @@ -0,0 +1,58 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type reconcileOptions struct { + runID string +} + +func newReconcileCmd(root *rootOptions) *cobra.Command { + opts := &reconcileOptions{} + + cmd := &cobra.Command{ + Use: "reconcile", + Short: "Reconcile inbox thread state back into orch task state", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + result, err := store.NewOrchStore(sqlDB).ReconcileRun(ctx, opts.runID) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "reconcile", + Data: map[string]any{ + "run": result.Run, + "task_counts": result.TaskCounts, + "updated_tasks": result.UpdatedTasks, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "reconciled run %s (%d updated tasks)\n", result.Run.RunID, len(result.UpdatedTasks)) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + _ = cmd.MarkFlagRequired("run") + + return cmd +} diff --git a/internal/cli/orch/root.go b/internal/cli/orch/root.go index 82efd7f..79120ab 100644 --- a/internal/cli/orch/root.go +++ b/internal/cli/orch/root.go @@ -13,14 +13,24 @@ func NewRootCmd() *cobra.Command { opts := &rootOptions{} cmd := &cobra.Command{ - Use: "orch", - Short: "Leader-facing scheduler and control plane", + Use: "orch", + Short: "Leader-facing scheduler and control plane", + SilenceErrors: true, + SilenceUsage: true, } cmd.PersistentFlags().StringVar(&opts.dbPath, "db", ".agents/coord.db", "SQLite database path") cmd.PersistentFlags().BoolVar(&opts.json, "json", false, "Emit machine-readable JSON") - cmd.AddCommand(newRunCmd()) + cmd.AddCommand(newRunCmd(opts)) + cmd.AddCommand(newTaskCmd(opts)) + cmd.AddCommand(newDepCmd(opts)) + cmd.AddCommand(newReadyCmd(opts)) + cmd.AddCommand(newDispatchCmd(opts)) + cmd.AddCommand(newReconcileCmd(opts)) + cmd.AddCommand(newBlockedCmd(opts)) + cmd.AddCommand(newAnswerCmd(opts)) + cmd.AddCommand(newStatusCmd(opts)) return cmd } diff --git a/internal/cli/orch/run.go b/internal/cli/orch/run.go index 96f7abf..fed6eec 100644 --- a/internal/cli/orch/run.go +++ b/internal/cli/orch/run.go @@ -1,20 +1,124 @@ package orch -import "github.com/spf13/cobra" +import ( + "fmt" -func newRunCmd() *cobra.Command { + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type runInitOptions struct { + runID string + goal string + summary string +} + +type runShowOptions struct { + runID string +} + +func newRunCmd(root *rootOptions) *cobra.Command { cmd := &cobra.Command{ Use: "run", Short: "Run management commands", } - cmd.AddCommand(&cobra.Command{ - Use: "init", - Short: "Stub for future run initialization", - RunE: func(cmd *cobra.Command, args []string) error { - return cmd.Help() - }, - }) + cmd.AddCommand(newRunInitCmd(root)) + cmd.AddCommand(newRunShowCmd(root)) + + return cmd +} + +func newRunInitCmd(root *rootOptions) *cobra.Command { + opts := &runInitOptions{} + + cmd := &cobra.Command{ + Use: "init", + Short: "Create a new orchestration run", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + run, err := store.NewOrchStore(sqlDB).CreateRun(ctx, store.CreateRunInput{ + RunID: opts.runID, + Goal: opts.goal, + Summary: opts.summary, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "run init", + Data: map[string]any{ + "run": run, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "created run %s\n", run.RunID) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + cmd.Flags().StringVar(&opts.goal, "goal", "", "Run goal") + cmd.Flags().StringVar(&opts.summary, "summary", "", "Run summary") + _ = cmd.MarkFlagRequired("run") + _ = cmd.MarkFlagRequired("goal") + + return cmd +} + +func newRunShowCmd(root *rootOptions) *cobra.Command { + opts := &runShowOptions{} + + cmd := &cobra.Command{ + Use: "show", + Short: "Show run metadata and aggregate state", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + overview, err := store.NewOrchStore(sqlDB).GetRunOverview(ctx, opts.runID) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "run show", + Data: map[string]any{ + "run": overview.Run, + "task_counts": overview.TaskCounts, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "run %s status %s\n", overview.Run.RunID, overview.Run.Status) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + _ = cmd.MarkFlagRequired("run") return cmd } diff --git a/internal/cli/orch/status.go b/internal/cli/orch/status.go new file mode 100644 index 0000000..91d45d8 --- /dev/null +++ b/internal/cli/orch/status.go @@ -0,0 +1,65 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type statusOptions struct { + runID string +} + +func newStatusCmd(root *rootOptions) *cobra.Command { + opts := &statusOptions{} + + cmd := &cobra.Command{ + Use: "status", + Short: "Show task state summary for the run", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + overview, err := store.NewOrchStore(sqlDB).GetRunOverview(ctx, opts.runID) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "status", + Data: map[string]any{ + "run": overview.Run, + "task_counts": overview.TaskCounts, + "tasks": overview.Tasks, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "run %s status %s\n", overview.Run.RunID, overview.Run.Status); err != nil { + return err + } + for _, task := range overview.Tasks { + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", task.TaskID, task.Status, task.Title); err != nil { + return err + } + } + return nil + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + _ = cmd.MarkFlagRequired("run") + + return cmd +} diff --git a/internal/cli/orch/task.go b/internal/cli/orch/task.go new file mode 100644 index 0000000..0a9e843 --- /dev/null +++ b/internal/cli/orch/task.go @@ -0,0 +1,88 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type taskAddOptions struct { + runID string + taskID string + title string + summary string + defaultTo string + acceptanceJSON string + priority string +} + +func newTaskCmd(root *rootOptions) *cobra.Command { + cmd := &cobra.Command{ + Use: "task", + Short: "Task management commands", + } + + cmd.AddCommand(newTaskAddCmd(root)) + return cmd +} + +func newTaskAddCmd(root *rootOptions) *cobra.Command { + opts := &taskAddOptions{} + + cmd := &cobra.Command{ + Use: "add", + Short: "Add a task to a run", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + task, err := store.NewOrchStore(sqlDB).AddTask(ctx, store.AddTaskInput{ + RunID: opts.runID, + TaskID: opts.taskID, + Title: opts.title, + Summary: opts.summary, + DefaultTo: opts.defaultTo, + AcceptanceJSON: opts.acceptanceJSON, + Priority: opts.priority, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "task add", + Data: map[string]any{ + "task": task, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "added task %s to run %s\n", task.TaskID, task.RunID) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") + cmd.Flags().StringVar(&opts.taskID, "task", "", "Task ID") + cmd.Flags().StringVar(&opts.title, "title", "", "Task title") + cmd.Flags().StringVar(&opts.summary, "summary", "", "Task summary") + cmd.Flags().StringVar(&opts.defaultTo, "default-to", "", "Default worker agent") + cmd.Flags().StringVar(&opts.acceptanceJSON, "acceptance-json", "", "Acceptance criteria JSON") + cmd.Flags().StringVar(&opts.priority, "priority", "normal", "Task priority") + _ = cmd.MarkFlagRequired("run") + _ = cmd.MarkFlagRequired("task") + _ = cmd.MarkFlagRequired("title") + + return cmd +} diff --git a/internal/cli/orch/test_helpers_test.go b/internal/cli/orch/test_helpers_test.go new file mode 100644 index 0000000..32b1cd1 --- /dev/null +++ b/internal/cli/orch/test_helpers_test.go @@ -0,0 +1,109 @@ +package orch + +import ( + "bytes" + "encoding/json" + "testing" + + inboxcli "ai-workflow-skill/internal/cli/inbox" +) + +func runOrchCommand(t *testing.T, args ...string) string { + t.Helper() + + stdout, stderr, exitCode := executeOrchCommand(args...) + if exitCode != 0 { + t.Fatalf("execute orch command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, exitCode, stderr, stdout) + } + + return stdout +} + +func executeOrchCommand(args ...string) (string, string, int) { + var stdout bytes.Buffer + var stderr bytes.Buffer + exitCode := Execute(args, &stdout, &stderr) + return stdout.String(), stderr.String(), exitCode +} + +func runInboxCommand(t *testing.T, args ...string) string { + t.Helper() + + stdout, stderr, exitCode := executeInboxCommand(args...) + if exitCode != 0 { + t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, exitCode, stderr, stdout) + } + + return stdout +} + +func executeInboxCommand(args ...string) (string, string, int) { + var stdout bytes.Buffer + var stderr bytes.Buffer + exitCode := inboxcli.Execute(args, &stdout, &stderr) + return stdout.String(), stderr.String(), exitCode +} + +func mustDecodeJSON(t *testing.T, raw string, target any) { + t.Helper() + + if err := json.Unmarshal([]byte(raw), target); err != nil { + t.Fatalf("decode json %q: %v", raw, err) + } +} + +func nestedString(t *testing.T, value map[string]any, keys ...string) string { + t.Helper() + + current := nestedValue(t, value, keys...) + str, ok := current.(string) + if !ok { + t.Fatalf("expected string at %v, got %#v", keys, current) + } + return str +} + +func nestedValue(t *testing.T, value map[string]any, keys ...string) any { + t.Helper() + + var current any = value + for _, key := range keys { + obj, ok := current.(map[string]any) + if !ok { + t.Fatalf("expected object at %q in %v, got %#v", key, keys, current) + } + current, ok = obj[key] + if !ok { + t.Fatalf("missing key %q in %v", key, keys) + } + } + return current +} + +func nestedArray(t *testing.T, value map[string]any, keys ...string) []any { + t.Helper() + + current := nestedValue(t, value, keys...) + items, ok := current.([]any) + if !ok { + t.Fatalf("expected array at %v, got %#v", keys, current) + } + return items +} + +func assertErrorJSON(t *testing.T, raw string, expectedCode string) { + t.Helper() + + var payload map[string]any + mustDecodeJSON(t, raw, &payload) + if ok, _ := payload["ok"].(bool); ok { + t.Fatalf("expected ok=false error payload, got %#v", payload) + } + errorValue, ok := payload["error"].(map[string]any) + if !ok { + t.Fatalf("expected error object, got %#v", payload["error"]) + } + if code, _ := errorValue["code"].(string); code != expectedCode { + t.Fatalf("expected error code %q, got %#v", expectedCode, errorValue["code"]) + } +} diff --git a/internal/store/orch.go b/internal/store/orch.go new file mode 100644 index 0000000..7d0f50d --- /dev/null +++ b/internal/store/orch.go @@ -0,0 +1,1639 @@ +package store + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" +) + +var ErrRunNotFound = errors.New("run not found") +var ErrTaskNotFound = errors.New("task not found") + +type OrchStore struct { + db *sql.DB +} + +type Run struct { + RunID string `json:"run_id"` + Goal string `json:"goal"` + Summary string `json:"summary"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type Task struct { + RunID string `json:"run_id"` + TaskID string `json:"task_id"` + Title string `json:"title"` + Summary string `json:"summary"` + Status string `json:"status"` + DefaultTo string `json:"default_to,omitempty"` + Priority string `json:"priority"` + AcceptanceJSON json.RawMessage `json:"acceptance_json"` + LatestAttemptNo int `json:"latest_attempt_no,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type TaskDependency struct { + RunID string `json:"run_id"` + TaskID string `json:"task_id"` + DependsOnTaskID string `json:"depends_on_task_id"` +} + +type TaskAttempt struct { + RunID string `json:"run_id"` + TaskID string `json:"task_id"` + AttemptNo int `json:"attempt_no"` + AssignedTo string `json:"assigned_to"` + ThreadID string `json:"thread_id"` + BaseRef string `json:"base_ref,omitempty"` + BaseCommit string `json:"base_commit,omitempty"` + BranchName string `json:"branch_name,omitempty"` + WorktreePath string `json:"worktree_path,omitempty"` + WorkspaceStatus string `json:"workspace_status,omitempty"` + ResultCommit string `json:"result_commit,omitempty"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type RunOverview struct { + Run Run `json:"run"` + TaskCounts map[string]int `json:"task_counts"` + Tasks []Task `json:"tasks,omitempty"` +} + +type CreateRunInput struct { + RunID string + Goal string + Summary string +} + +type AddTaskInput struct { + RunID string + TaskID string + Title string + Summary string + DefaultTo string + AcceptanceJSON string + Priority string +} + +type AddDependencyInput struct { + RunID string + TaskID string + DependsOnTaskID string +} + +type ListReadyInput struct { + RunID string + Limit int +} + +type DispatchInput struct { + RunID string + TaskID string + ToAgent string + Body string + BaseRef string +} + +type DispatchResult struct { + Task Task `json:"task"` + Attempt TaskAttempt `json:"attempt"` + Thread Thread `json:"thread"` + Message Message `json:"message"` +} + +type ReconcileResult struct { + Run Run `json:"run"` + TaskCounts map[string]int `json:"task_counts"` + UpdatedTasks []Task `json:"updated_tasks"` +} + +type BlockedTask struct { + Task Task `json:"task"` + Attempt TaskAttempt `json:"attempt"` + Question Message `json:"question"` +} + +type AnswerInput struct { + RunID string + TaskID string + Body string + PayloadJSON string +} + +type AnswerResult struct { + Task Task `json:"task"` + Attempt TaskAttempt `json:"attempt"` + Thread Thread `json:"thread"` + Message Message `json:"message"` +} + +func NewOrchStore(db *sql.DB) *OrchStore { + return &OrchStore{db: db} +} + +func (s *OrchStore) CreateRun(ctx context.Context, input CreateRunInput) (Run, error) { + runID := strings.TrimSpace(input.RunID) + goal := strings.TrimSpace(input.Goal) + if runID == "" { + return Run{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + if goal == "" { + return Run{}, fmt.Errorf("%w: goal is required", ErrInvalidInput) + } + + now := nowUTC() + run := Run{ + RunID: runID, + Goal: goal, + Summary: strings.TrimSpace(input.Summary), + Status: "active", + CreatedAt: now, + UpdatedAt: now, + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return Run{}, fmt.Errorf("begin create run transaction: %w", err) + } + defer tx.Rollback() + + _, err = tx.ExecContext( + ctx, + `INSERT INTO runs (run_id, goal, summary, status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?)`, + run.RunID, + run.Goal, + run.Summary, + run.Status, + formatTime(run.CreatedAt), + formatTime(run.UpdatedAt), + ) + if err != nil { + if isUniqueConstraintError(err) { + return Run{}, fmt.Errorf("%w: run %s already exists", ErrInvalidState, run.RunID) + } + return Run{}, fmt.Errorf("insert run: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: run.RunID, + TaskID: "", + Source: "orch", + EventType: "run_initialized", + Summary: defaultString(run.Summary, run.Goal), + PayloadJSON: marshalJSON(map[string]any{"goal": run.Goal, "summary": run.Summary}), + CreatedAt: now, + }); err != nil { + return Run{}, err + } + + if err := tx.Commit(); err != nil { + return Run{}, fmt.Errorf("commit create run transaction: %w", err) + } + + return run, nil +} + +func (s *OrchStore) GetRun(ctx context.Context, runID string) (Run, error) { + return selectRun(ctx, s.db, runID) +} + +func (s *OrchStore) AddTask(ctx context.Context, input AddTaskInput) (Task, error) { + if strings.TrimSpace(input.RunID) == "" { + return Task{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + if strings.TrimSpace(input.TaskID) == "" { + return Task{}, fmt.Errorf("%w: task id is required", ErrInvalidInput) + } + if strings.TrimSpace(input.Title) == "" { + return Task{}, fmt.Errorf("%w: title is required", ErrInvalidInput) + } + + priority, err := normalizePriority(input.Priority) + if err != nil { + return Task{}, err + } + acceptanceJSON, err := validateAndNormalizeJSONDefault("acceptance-json", input.AcceptanceJSON, "[]") + if err != nil { + return Task{}, err + } + + now := nowUTC() + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return Task{}, fmt.Errorf("begin add task transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, input.RunID); err != nil { + return Task{}, err + } + + _, err = tx.ExecContext( + ctx, + `INSERT INTO tasks ( + run_id, task_id, title, summary, status, default_to, priority, + acceptance_json, latest_attempt_no, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, ?, ?)`, + input.RunID, + input.TaskID, + input.Title, + input.Summary, + "planned", + nullIfEmpty(input.DefaultTo), + priority, + acceptanceJSON, + formatTime(now), + formatTime(now), + ) + if err != nil { + if isUniqueConstraintError(err) { + return Task{}, fmt.Errorf("%w: task %s already exists in run %s", ErrInvalidState, input.TaskID, input.RunID) + } + return Task{}, fmt.Errorf("insert task: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: input.RunID, + TaskID: input.TaskID, + Source: "orch", + EventType: "task_added", + Summary: input.Title, + PayloadJSON: marshalJSON(map[string]any{"title": input.Title, "priority": priority}), + CreatedAt: now, + }); err != nil { + return Task{}, err + } + + if err := refreshReadyStates(ctx, tx, input.RunID, now); err != nil { + return Task{}, err + } + if err := updateRunAggregateStatus(ctx, tx, input.RunID, now); err != nil { + return Task{}, err + } + + task, err := selectTask(ctx, tx, input.RunID, input.TaskID) + if err != nil { + return Task{}, err + } + + if err := tx.Commit(); err != nil { + return Task{}, fmt.Errorf("commit add task transaction: %w", err) + } + + return task, nil +} + +func (s *OrchStore) AddDependency(ctx context.Context, input AddDependencyInput) (TaskDependency, error) { + if strings.TrimSpace(input.RunID) == "" { + return TaskDependency{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + if strings.TrimSpace(input.TaskID) == "" { + return TaskDependency{}, fmt.Errorf("%w: task id is required", ErrInvalidInput) + } + if strings.TrimSpace(input.DependsOnTaskID) == "" { + return TaskDependency{}, fmt.Errorf("%w: depends-on task id is required", ErrInvalidInput) + } + if input.TaskID == input.DependsOnTaskID { + return TaskDependency{}, fmt.Errorf("%w: task cannot depend on itself", ErrInvalidInput) + } + + now := nowUTC() + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return TaskDependency{}, fmt.Errorf("begin add dependency transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, input.RunID); err != nil { + return TaskDependency{}, err + } + if _, err := selectTask(ctx, tx, input.RunID, input.TaskID); err != nil { + return TaskDependency{}, err + } + if _, err := selectTask(ctx, tx, input.RunID, input.DependsOnTaskID); err != nil { + return TaskDependency{}, err + } + + _, err = tx.ExecContext( + ctx, + `INSERT INTO task_dependencies (run_id, task_id, depends_on_task_id) + VALUES (?, ?, ?)`, + input.RunID, + input.TaskID, + input.DependsOnTaskID, + ) + if err != nil { + if isUniqueConstraintError(err) { + return TaskDependency{}, fmt.Errorf("%w: dependency %s -> %s already exists", ErrInvalidState, input.TaskID, input.DependsOnTaskID) + } + return TaskDependency{}, fmt.Errorf("insert dependency: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: input.RunID, + TaskID: input.TaskID, + Source: "orch", + EventType: "task_dependency_added", + Summary: fmt.Sprintf("%s depends on %s", input.TaskID, input.DependsOnTaskID), + PayloadJSON: marshalJSON(map[string]any{"depends_on_task_id": input.DependsOnTaskID}), + CreatedAt: now, + }); err != nil { + return TaskDependency{}, err + } + + if err := refreshReadyStates(ctx, tx, input.RunID, now); err != nil { + return TaskDependency{}, err + } + if err := updateRunAggregateStatus(ctx, tx, input.RunID, now); err != nil { + return TaskDependency{}, err + } + + if err := tx.Commit(); err != nil { + return TaskDependency{}, fmt.Errorf("commit add dependency transaction: %w", err) + } + + return TaskDependency{ + RunID: input.RunID, + TaskID: input.TaskID, + DependsOnTaskID: input.DependsOnTaskID, + }, nil +} + +func (s *OrchStore) ListReadyTasks(ctx context.Context, input ListReadyInput) ([]Task, error) { + if strings.TrimSpace(input.RunID) == "" { + return nil, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + + limit := input.Limit + if limit <= 0 { + limit = 20 + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("begin list ready transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, input.RunID); err != nil { + return nil, err + } + if err := refreshReadyStates(ctx, tx, input.RunID, nowUTC()); err != nil { + return nil, err + } + if err := updateRunAggregateStatus(ctx, tx, input.RunID, nowUTC()); err != nil { + return nil, err + } + + rows, err := tx.QueryContext( + ctx, + `SELECT + run_id, task_id, title, summary, status, default_to, priority, + acceptance_json, latest_attempt_no, created_at, updated_at + FROM tasks + WHERE run_id = ? AND status = 'ready' + ORDER BY CASE priority + WHEN 'high' THEN 0 + WHEN 'normal' THEN 1 + ELSE 2 + END, created_at ASC + LIMIT ?`, + input.RunID, + limit, + ) + if err != nil { + return nil, fmt.Errorf("query ready tasks: %w", err) + } + defer rows.Close() + + var tasks []Task + for rows.Next() { + task, err := scanTask(rows) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate ready tasks: %w", err) + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("commit list ready transaction: %w", err) + } + + return tasks, nil +} + +func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (DispatchResult, error) { + if strings.TrimSpace(input.RunID) == "" { + return DispatchResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + if strings.TrimSpace(input.TaskID) == "" { + return DispatchResult{}, fmt.Errorf("%w: task id is required", ErrInvalidInput) + } + + now := nowUTC() + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return DispatchResult{}, fmt.Errorf("begin dispatch transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, input.RunID); err != nil { + return DispatchResult{}, err + } + if err := refreshReadyStates(ctx, tx, input.RunID, now); err != nil { + return DispatchResult{}, err + } + + task, err := selectTask(ctx, tx, input.RunID, input.TaskID) + if err != nil { + return DispatchResult{}, err + } + if task.Status != "ready" { + return DispatchResult{}, fmt.Errorf("%w: task %s is not ready for dispatch", ErrInvalidState, task.TaskID) + } + + assignedTo := defaultString(strings.TrimSpace(input.ToAgent), task.DefaultTo) + if assignedTo == "" { + return DispatchResult{}, fmt.Errorf("%w: dispatch target agent is required", ErrInvalidInput) + } + + attemptNo := task.LatestAttemptNo + 1 + threadID := newID("thr") + messageID := newID("msg") + payloadJSON := buildDispatchPayload(task, attemptNo, input.BaseRef) + thread := Thread{ + ThreadID: threadID, + RunID: task.RunID, + TaskID: task.TaskID, + Subject: task.Title, + CreatedBy: "orch", + AssignedTo: assignedTo, + Status: "pending", + Priority: task.Priority, + LatestMessageID: messageID, + CreatedAt: now, + UpdatedAt: now, + } + + _, err = tx.ExecContext( + ctx, + `INSERT INTO threads ( + thread_id, run_id, task_id, subject, created_by, assigned_to, status, + priority, latest_message_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + thread.ThreadID, + thread.RunID, + thread.TaskID, + thread.Subject, + thread.CreatedBy, + thread.AssignedTo, + thread.Status, + thread.Priority, + thread.LatestMessageID, + formatTime(thread.CreatedAt), + formatTime(thread.UpdatedAt), + ) + if err != nil { + return DispatchResult{}, fmt.Errorf("insert dispatch thread: %w", err) + } + + message := Message{ + MessageID: messageID, + ThreadID: threadID, + FromAgent: "orch", + ToAgent: assignedTo, + Kind: "task", + Summary: defaultString(task.Summary, task.Title), + Body: input.Body, + PayloadJSON: json.RawMessage(payloadJSON), + CreatedAt: now, + } + if err := insertMessage(ctx, tx, message); err != nil { + return DispatchResult{}, err + } + if err := insertEvent(ctx, tx, eventInput{ + RunID: thread.RunID, + TaskID: thread.TaskID, + ThreadID: thread.ThreadID, + Source: "inbox", + EventType: "thread_created", + MessageID: message.MessageID, + Summary: message.Summary, + PayloadJSON: payloadJSON, + CreatedAt: now, + }); err != nil { + return DispatchResult{}, err + } + + attempt := TaskAttempt{ + RunID: task.RunID, + TaskID: task.TaskID, + AttemptNo: attemptNo, + AssignedTo: assignedTo, + ThreadID: threadID, + BaseRef: strings.TrimSpace(input.BaseRef), + Status: "dispatched", + CreatedAt: now, + UpdatedAt: now, + } + _, err = tx.ExecContext( + ctx, + `INSERT INTO task_attempts ( + run_id, task_id, attempt_no, assigned_to, thread_id, base_ref, base_commit, + branch_name, worktree_path, workspace_status, result_commit, status, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + attempt.RunID, + attempt.TaskID, + attempt.AttemptNo, + attempt.AssignedTo, + attempt.ThreadID, + nullIfEmpty(attempt.BaseRef), + nil, + nil, + nil, + nil, + nil, + attempt.Status, + formatTime(attempt.CreatedAt), + formatTime(attempt.UpdatedAt), + ) + if err != nil { + return DispatchResult{}, fmt.Errorf("insert task attempt: %w", err) + } + + _, err = tx.ExecContext( + ctx, + `UPDATE tasks + SET status = ?, latest_attempt_no = ?, updated_at = ? + WHERE run_id = ? AND task_id = ?`, + "dispatched", + attempt.AttemptNo, + formatTime(now), + task.RunID, + task.TaskID, + ) + if err != nil { + return DispatchResult{}, fmt.Errorf("update task dispatch status: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: task.RunID, + TaskID: task.TaskID, + ThreadID: thread.ThreadID, + Source: "orch", + EventType: "task_dispatched", + MessageID: message.MessageID, + Summary: message.Summary, + PayloadJSON: payloadJSON, + CreatedAt: now, + }); err != nil { + return DispatchResult{}, err + } + + if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil { + return DispatchResult{}, err + } + + if err := tx.Commit(); err != nil { + return DispatchResult{}, fmt.Errorf("commit dispatch transaction: %w", err) + } + + task.Status = "dispatched" + task.LatestAttemptNo = attempt.AttemptNo + task.UpdatedAt = now + + return DispatchResult{ + Task: task, + Attempt: attempt, + Thread: thread, + Message: message, + }, nil +} + +func (s *OrchStore) ReconcileRun(ctx context.Context, runID string) (ReconcileResult, error) { + if strings.TrimSpace(runID) == "" { + return ReconcileResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + + now := nowUTC() + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return ReconcileResult{}, fmt.Errorf("begin reconcile transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, runID); err != nil { + return ReconcileResult{}, err + } + + rows, err := tx.QueryContext( + ctx, + `SELECT + t.task_id, + t.status, + a.attempt_no, + a.status, + a.thread_id, + th.status + FROM tasks t + JOIN task_attempts a + ON a.run_id = t.run_id + AND a.task_id = t.task_id + AND a.attempt_no = t.latest_attempt_no + JOIN threads th ON th.thread_id = a.thread_id + WHERE t.run_id = ? + AND t.latest_attempt_no IS NOT NULL`, + runID, + ) + if err != nil { + return ReconcileResult{}, fmt.Errorf("query reconcile candidates: %w", err) + } + defer rows.Close() + + var updatedIDs []string + for rows.Next() { + var ( + taskID string + taskStatus string + attemptNo int + attemptStatus string + threadID string + threadStatus string + ) + if err := rows.Scan(&taskID, &taskStatus, &attemptNo, &attemptStatus, &threadID, &threadStatus); err != nil { + return ReconcileResult{}, fmt.Errorf("scan reconcile candidate: %w", err) + } + + nextStatus := reconcileTaskStatus(threadStatus) + if nextStatus == "" { + continue + } + if nextStatus == taskStatus && nextStatus == attemptStatus { + continue + } + + _, err = tx.ExecContext( + ctx, + `UPDATE tasks + SET status = ?, updated_at = ? + WHERE run_id = ? AND task_id = ?`, + nextStatus, + formatTime(now), + runID, + taskID, + ) + if err != nil { + return ReconcileResult{}, fmt.Errorf("update reconciled task status: %w", err) + } + _, err = tx.ExecContext( + ctx, + `UPDATE task_attempts + SET status = ?, updated_at = ? + WHERE run_id = ? AND task_id = ? AND attempt_no = ?`, + nextStatus, + formatTime(now), + runID, + taskID, + attemptNo, + ) + if err != nil { + return ReconcileResult{}, fmt.Errorf("update reconciled attempt status: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: runID, + TaskID: taskID, + ThreadID: threadID, + Source: "orch", + EventType: "task_" + nextStatus, + Summary: fmt.Sprintf("%s -> %s", taskID, nextStatus), + PayloadJSON: marshalJSON(map[string]any{ + "thread_id": threadID, + "thread_status": threadStatus, + "previous_status": taskStatus, + "previous_attempt": attemptStatus, + }), + CreatedAt: now, + }); err != nil { + return ReconcileResult{}, err + } + + updatedIDs = append(updatedIDs, taskID) + } + if err := rows.Err(); err != nil { + return ReconcileResult{}, fmt.Errorf("iterate reconcile candidates: %w", err) + } + + if err := refreshReadyStates(ctx, tx, runID, now); err != nil { + return ReconcileResult{}, err + } + if err := updateRunAggregateStatus(ctx, tx, runID, now); err != nil { + return ReconcileResult{}, err + } + + run, err := selectRun(ctx, tx, runID) + if err != nil { + return ReconcileResult{}, err + } + taskCounts, err := collectTaskCounts(ctx, tx, runID) + if err != nil { + return ReconcileResult{}, err + } + + updatedTasks := make([]Task, 0, len(updatedIDs)) + for _, taskID := range updatedIDs { + task, err := selectTask(ctx, tx, runID, taskID) + if err != nil { + return ReconcileResult{}, err + } + updatedTasks = append(updatedTasks, task) + } + + if err := tx.Commit(); err != nil { + return ReconcileResult{}, fmt.Errorf("commit reconcile transaction: %w", err) + } + + return ReconcileResult{ + Run: run, + TaskCounts: taskCounts, + UpdatedTasks: updatedTasks, + }, nil +} + +func (s *OrchStore) ListBlockedTasks(ctx context.Context, runID string) ([]BlockedTask, error) { + if strings.TrimSpace(runID) == "" { + return nil, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("begin list blocked transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, runID); err != nil { + return nil, err + } + + rows, err := tx.QueryContext( + ctx, + `SELECT + t.run_id, t.task_id, t.title, t.summary, t.status, t.default_to, t.priority, + t.acceptance_json, t.latest_attempt_no, t.created_at, t.updated_at, + a.run_id, a.task_id, a.attempt_no, a.assigned_to, a.thread_id, a.base_ref, + a.base_commit, a.branch_name, a.worktree_path, a.workspace_status, + a.result_commit, a.status, a.created_at, a.updated_at + FROM tasks t + JOIN task_attempts a + ON a.run_id = t.run_id + AND a.task_id = t.task_id + AND a.attempt_no = t.latest_attempt_no + WHERE t.run_id = ? + AND t.status = 'blocked' + ORDER BY t.updated_at ASC`, + runID, + ) + if err != nil { + return nil, fmt.Errorf("query blocked tasks: %w", err) + } + defer rows.Close() + + var blocked []BlockedTask + for rows.Next() { + task, attempt, err := scanTaskAndAttempt(rows) + if err != nil { + return nil, err + } + question, err := selectLatestQuestionMessage(ctx, tx, attempt.ThreadID) + if err != nil { + return nil, err + } + blocked = append(blocked, BlockedTask{ + Task: task, + Attempt: attempt, + Question: question, + }) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate blocked tasks: %w", err) + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("commit list blocked transaction: %w", err) + } + + return blocked, nil +} + +func (s *OrchStore) AnswerTask(ctx context.Context, input AnswerInput) (AnswerResult, error) { + if strings.TrimSpace(input.RunID) == "" { + return AnswerResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + if strings.TrimSpace(input.TaskID) == "" { + return AnswerResult{}, fmt.Errorf("%w: task id is required", ErrInvalidInput) + } + + payloadJSON, err := validateAndNormalizeJSON("payload-json", input.PayloadJSON) + if err != nil { + return AnswerResult{}, err + } + if strings.TrimSpace(input.Body) == "" && payloadJSON == "{}" { + return AnswerResult{}, fmt.Errorf("%w: body or payload-json is required", ErrInvalidInput) + } + + now := nowUTC() + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return AnswerResult{}, fmt.Errorf("begin answer transaction: %w", err) + } + defer tx.Rollback() + + task, err := selectTask(ctx, tx, input.RunID, input.TaskID) + if err != nil { + return AnswerResult{}, err + } + if task.Status != "blocked" { + return AnswerResult{}, fmt.Errorf("%w: task %s is not blocked", ErrInvalidState, task.TaskID) + } + if task.LatestAttemptNo == 0 { + return AnswerResult{}, fmt.Errorf("%w: task %s has no active attempt", ErrInvalidState, task.TaskID) + } + + attempt, err := selectAttempt(ctx, tx, input.RunID, input.TaskID, task.LatestAttemptNo) + if err != nil { + return AnswerResult{}, err + } + thread, err := selectThread(ctx, tx, attempt.ThreadID) + if err != nil { + return AnswerResult{}, err + } + if isTerminalStatus(thread.Status) { + return AnswerResult{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, thread.ThreadID) + } + + message := Message{ + MessageID: newID("msg"), + ThreadID: thread.ThreadID, + FromAgent: "orch", + ToAgent: attempt.AssignedTo, + Kind: "answer", + Summary: summarizeAnswer(input.Body), + Body: input.Body, + PayloadJSON: json.RawMessage(payloadJSON), + CreatedAt: now, + } + if err := insertMessage(ctx, tx, message); err != nil { + return AnswerResult{}, err + } + if err := updateThreadState(ctx, tx, thread.ThreadID, thread.Status, thread.AssignedTo, message.MessageID, now); err != nil { + return AnswerResult{}, err + } + if err := insertEvent(ctx, tx, eventInput{ + RunID: thread.RunID, + TaskID: thread.TaskID, + ThreadID: thread.ThreadID, + Source: "inbox", + EventType: "thread_reply", + MessageID: message.MessageID, + Summary: message.Summary, + PayloadJSON: payloadJSON, + CreatedAt: now, + }); err != nil { + return AnswerResult{}, err + } + if err := insertEvent(ctx, tx, eventInput{ + RunID: task.RunID, + TaskID: task.TaskID, + ThreadID: thread.ThreadID, + Source: "orch", + EventType: "task_answered", + MessageID: message.MessageID, + Summary: message.Summary, + PayloadJSON: payloadJSON, + CreatedAt: now, + }); err != nil { + return AnswerResult{}, err + } + + _, err = tx.ExecContext( + ctx, + `UPDATE tasks + SET updated_at = ? + WHERE run_id = ? AND task_id = ?`, + formatTime(now), + task.RunID, + task.TaskID, + ) + if err != nil { + return AnswerResult{}, fmt.Errorf("touch answered task: %w", err) + } + _, err = tx.ExecContext( + ctx, + `UPDATE task_attempts + SET updated_at = ? + WHERE run_id = ? AND task_id = ? AND attempt_no = ?`, + formatTime(now), + attempt.RunID, + attempt.TaskID, + attempt.AttemptNo, + ) + if err != nil { + return AnswerResult{}, fmt.Errorf("touch answered attempt: %w", err) + } + if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil { + return AnswerResult{}, err + } + + task.UpdatedAt = now + attempt.UpdatedAt = now + thread.LatestMessageID = message.MessageID + thread.UpdatedAt = now + + if err := tx.Commit(); err != nil { + return AnswerResult{}, fmt.Errorf("commit answer transaction: %w", err) + } + + return AnswerResult{ + Task: task, + Attempt: attempt, + Thread: thread, + Message: message, + }, nil +} + +func (s *OrchStore) GetRunOverview(ctx context.Context, runID string) (RunOverview, error) { + if strings.TrimSpace(runID) == "" { + return RunOverview{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + + now := nowUTC() + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return RunOverview{}, fmt.Errorf("begin run overview transaction: %w", err) + } + defer tx.Rollback() + + if _, err := selectRun(ctx, tx, runID); err != nil { + return RunOverview{}, err + } + if err := refreshReadyStates(ctx, tx, runID, now); err != nil { + return RunOverview{}, err + } + if err := updateRunAggregateStatus(ctx, tx, runID, now); err != nil { + return RunOverview{}, err + } + + run, err := selectRun(ctx, tx, runID) + if err != nil { + return RunOverview{}, err + } + taskCounts, err := collectTaskCounts(ctx, tx, runID) + if err != nil { + return RunOverview{}, err + } + tasks, err := listTasksForRun(ctx, tx, runID) + if err != nil { + return RunOverview{}, err + } + + if err := tx.Commit(); err != nil { + return RunOverview{}, fmt.Errorf("commit run overview transaction: %w", err) + } + + return RunOverview{ + Run: run, + TaskCounts: taskCounts, + Tasks: tasks, + }, nil +} + +func listTasksForRun(ctx context.Context, db queryRowsContexter, runID string) ([]Task, error) { + rows, err := db.QueryContext( + ctx, + `SELECT + run_id, task_id, title, summary, status, default_to, priority, + acceptance_json, latest_attempt_no, created_at, updated_at + FROM tasks + WHERE run_id = ? + ORDER BY created_at ASC`, + runID, + ) + if err != nil { + return nil, fmt.Errorf("query tasks for run: %w", err) + } + defer rows.Close() + + var tasks []Task + for rows.Next() { + task, err := scanTask(rows) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate tasks for run: %w", err) + } + return tasks, nil +} + +type queryRowsContexter interface { + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) +} + +func scanRun(scanner threadScanner) (Run, error) { + var ( + run Run + createdAt, updated string + ) + + if err := scanner.Scan( + &run.RunID, + &run.Goal, + &run.Summary, + &run.Status, + &createdAt, + &updated, + ); err != nil { + return Run{}, fmt.Errorf("scan run: %w", err) + } + + run.CreatedAt = parseTime(createdAt) + run.UpdatedAt = parseTime(updated) + return run, nil +} + +func scanTask(scanner threadScanner) (Task, error) { + var ( + task Task + defaultTo sql.NullString + latestAttempt sql.NullInt64 + acceptanceJSON string + createdAt, updatedAt string + ) + + if err := scanner.Scan( + &task.RunID, + &task.TaskID, + &task.Title, + &task.Summary, + &task.Status, + &defaultTo, + &task.Priority, + &acceptanceJSON, + &latestAttempt, + &createdAt, + &updatedAt, + ); err != nil { + return Task{}, fmt.Errorf("scan task: %w", err) + } + + task.DefaultTo = defaultTo.String + task.AcceptanceJSON = json.RawMessage(acceptanceJSON) + if latestAttempt.Valid { + task.LatestAttemptNo = int(latestAttempt.Int64) + } + task.CreatedAt = parseTime(createdAt) + task.UpdatedAt = parseTime(updatedAt) + return task, nil +} + +func scanAttempt(scanner threadScanner) (TaskAttempt, error) { + var ( + attempt TaskAttempt + baseRef sql.NullString + baseCommit sql.NullString + branchName sql.NullString + worktreePath sql.NullString + workspaceStatus sql.NullString + resultCommit sql.NullString + createdAt, updated string + ) + + if err := scanner.Scan( + &attempt.RunID, + &attempt.TaskID, + &attempt.AttemptNo, + &attempt.AssignedTo, + &attempt.ThreadID, + &baseRef, + &baseCommit, + &branchName, + &worktreePath, + &workspaceStatus, + &resultCommit, + &attempt.Status, + &createdAt, + &updated, + ); err != nil { + return TaskAttempt{}, fmt.Errorf("scan attempt: %w", err) + } + + attempt.BaseRef = baseRef.String + attempt.BaseCommit = baseCommit.String + attempt.BranchName = branchName.String + attempt.WorktreePath = worktreePath.String + attempt.WorkspaceStatus = workspaceStatus.String + attempt.ResultCommit = resultCommit.String + attempt.CreatedAt = parseTime(createdAt) + attempt.UpdatedAt = parseTime(updated) + return attempt, nil +} + +func scanTaskAndAttempt(scanner threadScanner) (Task, TaskAttempt, error) { + var ( + task Task + taskDefaultTo sql.NullString + taskLatestAttempt sql.NullInt64 + taskAcceptanceJSON string + taskCreatedAt string + taskUpdatedAt string + attempt TaskAttempt + attemptBaseRef sql.NullString + attemptBaseCommit sql.NullString + attemptBranchName sql.NullString + attemptWorktreePath sql.NullString + attemptWorkspaceState sql.NullString + attemptResultCommit sql.NullString + attemptCreatedAt string + attemptUpdatedAt string + ) + + if err := scanner.Scan( + &task.RunID, + &task.TaskID, + &task.Title, + &task.Summary, + &task.Status, + &taskDefaultTo, + &task.Priority, + &taskAcceptanceJSON, + &taskLatestAttempt, + &taskCreatedAt, + &taskUpdatedAt, + &attempt.RunID, + &attempt.TaskID, + &attempt.AttemptNo, + &attempt.AssignedTo, + &attempt.ThreadID, + &attemptBaseRef, + &attemptBaseCommit, + &attemptBranchName, + &attemptWorktreePath, + &attemptWorkspaceState, + &attemptResultCommit, + &attempt.Status, + &attemptCreatedAt, + &attemptUpdatedAt, + ); err != nil { + return Task{}, TaskAttempt{}, fmt.Errorf("scan task and attempt: %w", err) + } + + task.DefaultTo = taskDefaultTo.String + task.AcceptanceJSON = json.RawMessage(taskAcceptanceJSON) + if taskLatestAttempt.Valid { + task.LatestAttemptNo = int(taskLatestAttempt.Int64) + } + task.CreatedAt = parseTime(taskCreatedAt) + task.UpdatedAt = parseTime(taskUpdatedAt) + + attempt.BaseRef = attemptBaseRef.String + attempt.BaseCommit = attemptBaseCommit.String + attempt.BranchName = attemptBranchName.String + attempt.WorktreePath = attemptWorktreePath.String + attempt.WorkspaceStatus = attemptWorkspaceState.String + attempt.ResultCommit = attemptResultCommit.String + attempt.CreatedAt = parseTime(attemptCreatedAt) + attempt.UpdatedAt = parseTime(attemptUpdatedAt) + + return task, attempt, nil +} + +func selectRun(ctx context.Context, db queryRower, runID string) (Run, error) { + row := db.QueryRowContext( + ctx, + `SELECT run_id, goal, summary, status, created_at, updated_at + FROM runs + WHERE run_id = ?`, + runID, + ) + run, err := scanRun(row) + if errors.Is(err, sql.ErrNoRows) { + return Run{}, fmt.Errorf("%w: %s", ErrRunNotFound, runID) + } + return run, err +} + +func selectTask(ctx context.Context, db queryRower, runID, taskID string) (Task, error) { + row := db.QueryRowContext( + ctx, + `SELECT + run_id, task_id, title, summary, status, default_to, priority, + acceptance_json, latest_attempt_no, created_at, updated_at + FROM tasks + WHERE run_id = ? AND task_id = ?`, + runID, + taskID, + ) + task, err := scanTask(row) + if errors.Is(err, sql.ErrNoRows) { + return Task{}, fmt.Errorf("%w: %s/%s", ErrTaskNotFound, runID, taskID) + } + return task, err +} + +func selectAttempt(ctx context.Context, db queryRower, runID, taskID string, attemptNo int) (TaskAttempt, error) { + row := db.QueryRowContext( + ctx, + `SELECT + run_id, task_id, attempt_no, assigned_to, thread_id, base_ref, base_commit, + branch_name, worktree_path, workspace_status, result_commit, status, + created_at, updated_at + FROM task_attempts + WHERE run_id = ? AND task_id = ? AND attempt_no = ?`, + runID, + taskID, + attemptNo, + ) + attempt, err := scanAttempt(row) + if errors.Is(err, sql.ErrNoRows) { + return TaskAttempt{}, fmt.Errorf("%w: attempt %s/%s/%d not found", ErrInvalidState, runID, taskID, attemptNo) + } + return attempt, err +} + +func selectLatestQuestionMessage(ctx context.Context, db queryRowsAndRower, threadID string) (Message, error) { + row := db.QueryRowContext( + ctx, + `SELECT + message_id, thread_id, from_agent, to_agent, kind, summary, body, + payload_json, created_at + FROM messages + WHERE thread_id = ? AND kind = 'question' + ORDER BY created_at DESC + LIMIT 1`, + threadID, + ) + message, err := scanMessage(row) + if errors.Is(err, sql.ErrNoRows) { + return Message{}, fmt.Errorf("%w: blocked thread %s has no question message", ErrInvalidState, threadID) + } + if err != nil { + return Message{}, err + } + artifactsByMessageID, err := loadArtifactsForMessageIDsFromQueryer(ctx, db, []string{message.MessageID}) + if err != nil { + return Message{}, err + } + message.Artifacts = artifactsByMessageID[message.MessageID] + return message, nil +} + +type queryRowsAndRower interface { + queryRower + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) +} + +func loadArtifactsForMessageIDsFromQueryer(ctx context.Context, db queryRowsContexter, messageIDs []string) (map[string][]Artifact, error) { + result := make(map[string][]Artifact) + if len(messageIDs) == 0 { + return result, nil + } + + args := make([]any, 0, len(messageIDs)) + for _, messageID := range messageIDs { + args = append(args, messageID) + } + + rows, err := db.QueryContext( + ctx, + `SELECT + artifact_id, message_id, path, kind, metadata_json, created_at + FROM artifacts + WHERE message_id IN (`+placeholders(len(messageIDs))+`) + ORDER BY created_at ASC`, + args..., + ) + if err != nil { + return nil, fmt.Errorf("query artifacts: %w", err) + } + defer rows.Close() + + for rows.Next() { + artifact, err := scanArtifact(rows) + if err != nil { + return nil, err + } + result[artifact.MessageID] = append(result[artifact.MessageID], artifact) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate artifacts: %w", err) + } + + return result, nil +} + +func refreshReadyStates(ctx context.Context, tx *sql.Tx, runID string, now time.Time) error { + rows, err := tx.QueryContext( + ctx, + `SELECT task_id, status, title + FROM tasks + WHERE run_id = ? + AND status IN ('planned', 'ready')`, + runID, + ) + if err != nil { + return fmt.Errorf("query tasks for readiness refresh: %w", err) + } + defer rows.Close() + + type readinessRow struct { + taskID string + status string + title string + } + + var tasks []readinessRow + for rows.Next() { + var row readinessRow + if err := rows.Scan(&row.taskID, &row.status, &row.title); err != nil { + return fmt.Errorf("scan readiness refresh row: %w", err) + } + tasks = append(tasks, row) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("iterate readiness refresh rows: %w", err) + } + + for _, task := range tasks { + ready, err := dependenciesSatisfied(ctx, tx, runID, task.taskID) + if err != nil { + return err + } + + desired := "planned" + if ready { + desired = "ready" + } + if desired == task.status { + continue + } + + _, err = tx.ExecContext( + ctx, + `UPDATE tasks + SET status = ?, updated_at = ? + WHERE run_id = ? AND task_id = ?`, + desired, + formatTime(now), + runID, + task.taskID, + ) + if err != nil { + return fmt.Errorf("update task readiness: %w", err) + } + + if desired == "ready" { + if err := insertEvent(ctx, tx, eventInput{ + RunID: runID, + TaskID: task.taskID, + Source: "orch", + EventType: "task_ready", + Summary: defaultString(task.title, task.taskID), + PayloadJSON: marshalJSON(map[string]any{"task_id": task.taskID}), + CreatedAt: now, + }); err != nil { + return err + } + } + } + + return nil +} + +func dependenciesSatisfied(ctx context.Context, tx *sql.Tx, runID, taskID string) (bool, error) { + var pendingCount int + err := tx.QueryRowContext( + ctx, + `SELECT COUNT(*) + FROM task_dependencies d + JOIN tasks dep + ON dep.run_id = d.run_id + AND dep.task_id = d.depends_on_task_id + WHERE d.run_id = ? + AND d.task_id = ? + AND dep.status <> 'done'`, + runID, + taskID, + ).Scan(&pendingCount) + if err != nil { + return false, fmt.Errorf("query dependency readiness: %w", err) + } + return pendingCount == 0, nil +} + +func updateRunAggregateStatus(ctx context.Context, tx *sql.Tx, runID string, now time.Time) error { + counts, err := collectTaskCounts(ctx, tx, runID) + if err != nil { + return err + } + nextStatus := deriveRunStatus(counts) + + _, err = tx.ExecContext( + ctx, + `UPDATE runs + SET status = ?, updated_at = ? + WHERE run_id = ?`, + nextStatus, + formatTime(now), + runID, + ) + if err != nil { + return fmt.Errorf("update run aggregate status: %w", err) + } + return nil +} + +func collectTaskCounts(ctx context.Context, db queryRowsContexter, runID string) (map[string]int, error) { + rows, err := db.QueryContext( + ctx, + `SELECT status, COUNT(*) + FROM tasks + WHERE run_id = ? + GROUP BY status`, + runID, + ) + if err != nil { + return nil, fmt.Errorf("query task counts: %w", err) + } + defer rows.Close() + + counts := make(map[string]int) + for rows.Next() { + var ( + status string + count int + ) + if err := rows.Scan(&status, &count); err != nil { + return nil, fmt.Errorf("scan task count: %w", err) + } + counts[status] = count + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate task counts: %w", err) + } + + return counts, nil +} + +func deriveRunStatus(counts map[string]int) string { + total := 0 + for _, count := range counts { + total += count + } + if total == 0 { + return "active" + } + if counts["blocked"] > 0 { + return "blocked" + } + if counts["failed"] > 0 { + return "failed" + } + if counts["running"] > 0 || counts["dispatched"] > 0 { + return "running" + } + if counts["ready"] > 0 { + return "ready" + } + if counts["planned"] > 0 { + return "planned" + } + if counts["done"] > 0 { + return "done" + } + if counts["cancelled"] == total { + return "cancelled" + } + return "active" +} + +func reconcileTaskStatus(threadStatus string) string { + switch threadStatus { + case "pending": + return "dispatched" + case "claimed", "in_progress": + return "running" + case "blocked": + return "blocked" + case "done": + return "done" + case "failed": + return "failed" + case "cancelled": + return "cancelled" + default: + return "" + } +} + +func normalizePriority(priority string) (string, error) { + priority = defaultString(strings.TrimSpace(priority), "normal") + switch priority { + case "low", "normal", "high": + return priority, nil + default: + return "", fmt.Errorf("%w: priority must be one of low, normal, high", ErrInvalidInput) + } +} + +func validateAndNormalizeJSONDefault(fieldName, value, defaultValue string) (string, error) { + normalized := strings.TrimSpace(value) + if normalized == "" { + normalized = defaultValue + } + if !json.Valid([]byte(normalized)) { + return "", fmt.Errorf("%w: %s must be valid JSON", ErrInvalidInput, fieldName) + } + + var compact bytes.Buffer + if err := json.Compact(&compact, []byte(normalized)); err != nil { + return "", fmt.Errorf("%w: %s must be valid JSON", ErrInvalidInput, fieldName) + } + return compact.String(), nil +} + +func buildDispatchPayload(task Task, attemptNo int, baseRef string) string { + payload := map[string]any{ + "run_id": task.RunID, + "task_id": task.TaskID, + "attempt_no": attemptNo, + "title": task.Title, + "summary": task.Summary, + "priority": task.Priority, + } + + if len(task.AcceptanceJSON) > 0 { + var acceptance any + if err := json.Unmarshal(task.AcceptanceJSON, &acceptance); err == nil { + payload["acceptance"] = acceptance + } + } + if strings.TrimSpace(baseRef) != "" { + payload["base_ref"] = strings.TrimSpace(baseRef) + } + + return marshalJSON(payload) +} + +func marshalJSON(v any) string { + data, err := json.Marshal(v) + if err != nil { + return "{}" + } + return string(data) +} + +func nullIfEmpty(value string) any { + if strings.TrimSpace(value) == "" { + return nil + } + return value +} + +func summarizeAnswer(body string) string { + body = strings.TrimSpace(body) + if body == "" { + return "task answer" + } + line := body + if idx := strings.IndexByte(line, '\n'); idx >= 0 { + line = line[:idx] + } + line = strings.TrimSpace(line) + if line == "" { + return "task answer" + } + return line +} + +func isUniqueConstraintError(err error) bool { + return strings.Contains(strings.ToLower(err.Error()), "unique constraint failed") +}