From dd6a0e31b6a6b387e70ee8c2cf427beda1527186 Mon Sep 17 00:00:00 2001 From: kurihada Date: Thu, 19 Mar 2026 15:13:34 +0800 Subject: [PATCH] Add council review wait command --- docs/council-review.md | 5 + docs/implementation-roadmap.md | 21 ++- docs/roadmaps/archive/orch-council-wait.md | 61 +++++++ internal/cli/orch/council.go | 1 + internal/cli/orch/council_wait.go | 69 ++++++++ internal/cli/orch/integration_test.go | 196 ++++++++++++++++++++- internal/store/council.go | 179 +++++++++++++++++++ 7 files changed, 522 insertions(+), 10 deletions(-) create mode 100644 docs/roadmaps/archive/orch-council-wait.md create mode 100644 internal/cli/orch/council_wait.go diff --git a/docs/council-review.md b/docs/council-review.md index 993cc8f..9f9edff 100644 --- a/docs/council-review.md +++ b/docs/council-review.md @@ -143,6 +143,11 @@ Suggested flags: - `--run RUN_ID` - `--timeout-seconds N` +Behavior: + +- waits until all three reviewer tasks reach terminal task states +- returns current reviewer task statuses on both wake and timeout + ### `orch council tally` Group similar suggestions and count supporting reviewers. diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index 1d23c98..a63c10a 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -29,9 +29,10 @@ As of now: - `orch dispatch` now supports `--repo-path`, `--workspace-root`, and `--strict-worktree`, auto-enables strict worktree mode for code-like tasks inferred from task metadata, resolves committed base revisions, records workspace metadata on attempts, and writes that metadata into inbox task payloads - `orch wait` now blocks on run-scoped task events and reconciles inbox state while polling so leader waits can wake on worker progress without manual sleep loops - `orch council start` now creates a dedicated council run, persists council target input metadata, and dispatches the three fixed reviewer roles through the existing scheduler -- automated integration tests now cover the main `orch` scheduler slice, including dependency gating, dispatch, blocked-answer flow, retry, reassign, cancel, cleanup, strict worktree creation, automatic code-task worktree enablement, dirty-repo rejection rules, wait wake/timeout behavior, and council start dispatch +- `orch council wait` now blocks until the three reviewer tasks reach terminal states or a timeout is reached +- automated integration tests now cover the main `orch` scheduler slice, including dependency gating, dispatch, blocked-answer flow, retry, reassign, cancel, cleanup, strict worktree creation, automatic code-task worktree enablement, dirty-repo rejection rules, wait wake/timeout behavior, and council start/wait behavior -This means the project now has a working `orch` core scheduler with automatic worktree selection for code-like tasks, strict worktree-backed dispatch, the main leader-side control loop, and the first council workflow slice. +This means the project now has a working `orch` core scheduler with automatic worktree selection for code-like tasks, strict worktree-backed dispatch, the main leader-side control loop, and the first two council workflow slices. ## Source Of Truth @@ -74,9 +75,9 @@ Current implementation status: - `Milestone 4: Orch Core Scheduling` is complete for the current non-worktree scheduler scope - `Milestone 5: Strict Worktree Support` is complete - `Milestone 6: Waiting Primitives` is complete -- `Milestone 7: Council Review` is partially complete through `orch council start` +- `Milestone 7: Council Review` is partially complete through `orch council start` and `orch council wait` -The next practical coding target is the next `Milestone 7` slice: `orch council wait`. +The next practical coding target is the next `Milestone 7` slice: `orch council tally`. ### Milestone 1: Go Skeleton @@ -341,12 +342,13 @@ Completed so far: - council-specific storage now includes run metadata, reviewer assignment rows, reviewer findings/groups tables, and persisted council input references - `orch council start` +- `orch council wait` - council start creates a dedicated run, stores council target input metadata, creates reviewer tasks `CR1` through `CR3`, and dispatches the fixed reviewer roles `architecture-reviewer`, `implementation-reviewer`, and `risk-reviewer` -- CLI integration tests cover council start dispatch and metadata persistence +- council wait blocks until all three reviewer tasks reach terminal states or timeout +- CLI integration tests cover council start dispatch, metadata persistence, and council wait wake/timeout behavior Remaining: -- `orch council wait` - `orch council tally` - `orch council report` @@ -354,11 +356,11 @@ Remaining: If a new agent is taking over now, the next concrete step should be: -1. continue `Milestone 7: Council Review` with `orch council wait` -2. define the council completion check that determines when all reviewer outputs are ready for tally +1. continue `Milestone 7: Council Review` with `orch council tally` +2. define how reviewer findings are parsed from completed reviewer outputs into `council_findings` 3. keep the authored inbox test-plan set in `docs/tests/inbox/` synchronized if CLI behavior changes during further `orch` work -The inbox implementation and its human-readable test-plan set are already in place, and `orch` now supports the main scheduler loop plus the first council workflow slice, so the next meaningful project step is finishing council wait, tally, and report. +The inbox implementation and its human-readable test-plan set are already in place, and `orch` now supports the main scheduler loop plus council start/wait, so the next meaningful project step is parsing reviewer outputs and implementing tally/report. ## Recommended Driver Choices @@ -385,6 +387,7 @@ Completed so far: - orch wait wake and timeout coverage - orch retry, reassign, cancel, and cleanup coverage - orch council start dispatch and persistence coverage +- orch council wait wake and timeout coverage Still recommended before the codebase grows too much: diff --git a/docs/roadmaps/archive/orch-council-wait.md b/docs/roadmaps/archive/orch-council-wait.md new file mode 100644 index 0000000..be8e283 --- /dev/null +++ b/docs/roadmaps/archive/orch-council-wait.md @@ -0,0 +1,61 @@ +# Orch Council Wait + +## Status + +- `completed` + +## Owner + +- codex + +## Started At + +- `2026-03-19` + +## Goal + +- implement `orch council wait` so the leader can block until all three reviewer tasks finish or a timeout is reached + +## Scope + +- add council wait status queries and blocking wait behavior on top of existing council reviewer rows +- add CLI support for `orch council wait` +- add integration tests for wake and timeout behavior +- update the implementation roadmap and archive this workstream when complete + +## Checklist + +- [x] inspect council wait docs and current council storage +- [x] implement council wait store and CLI command +- [x] add integration coverage for council wait wake and timeout +- [x] run `go test ./...` +- [x] update `docs/implementation-roadmap.md` +- [x] archive this roadmap with a completion summary + +## Files + +- `docs/roadmaps/archive/orch-council-wait.md` +- `docs/implementation-roadmap.md` +- `internal/store/council.go` +- `internal/cli/orch/council.go` +- `internal/cli/orch/council_wait.go` +- `internal/cli/orch/integration_test.go` + +## Decisions + +- treat reviewer completion as all council reviewer tasks reaching terminal task states +- reuse the existing `orch wait`/reconcile polling model rather than introducing a separate push mechanism for councils + +## Blockers + +- none + +## Next Step + +- implement `orch council tally` by parsing completed reviewer outputs into `council_findings` and grouping similar proposals + +## Completion Summary + +- `orch council wait` now blocks until all council reviewer tasks reach terminal task states or a timeout is reached +- wait responses return reviewer task statuses on both wake and timeout paths +- integration tests now cover council wait wake and timeout behavior diff --git a/internal/cli/orch/council.go b/internal/cli/orch/council.go index 0bcd0f0..951de10 100644 --- a/internal/cli/orch/council.go +++ b/internal/cli/orch/council.go @@ -9,5 +9,6 @@ func newCouncilCmd(root *rootOptions) *cobra.Command { } cmd.AddCommand(newCouncilStartCmd(root)) + cmd.AddCommand(newCouncilWaitCmd(root)) return cmd } diff --git a/internal/cli/orch/council_wait.go b/internal/cli/orch/council_wait.go new file mode 100644 index 0000000..a6b9130 --- /dev/null +++ b/internal/cli/orch/council_wait.go @@ -0,0 +1,69 @@ +package orch + +import ( + "fmt" + "time" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type councilWaitOptions struct { + runID string + timeoutSeconds int +} + +func newCouncilWaitCmd(root *rootOptions) *cobra.Command { + opts := &councilWaitOptions{} + + cmd := &cobra.Command{ + Use: "wait", + Short: "Block until all council reviewers complete or timeout is reached", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + result, err := store.NewOrchStore(sqlDB).WaitForCouncil(ctx, store.CouncilWaitInput{ + RunID: opts.runID, + Timeout: time.Duration(opts.timeoutSeconds) * time.Second, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "council wait", + Data: map[string]any{ + "run_id": result.RunID, + "woke": result.Woke, + "all_complete": result.AllComplete, + "reviewers": result.ReviewerStatuses, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + if !result.Woke { + _, err = fmt.Fprintf(cmd.OutOrStdout(), "council wait timed out for run %s\n", result.RunID) + return err + } + _, err = fmt.Fprintf(cmd.OutOrStdout(), "all council reviewers completed for run %s\n", result.RunID) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Council run ID") + cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait before timing out") + _ = cmd.MarkFlagRequired("run") + + return cmd +} diff --git a/internal/cli/orch/integration_test.go b/internal/cli/orch/integration_test.go index 836d849..a887a4e 100644 --- a/internal/cli/orch/integration_test.go +++ b/internal/cli/orch/integration_test.go @@ -4,6 +4,7 @@ import ( "database/sql" "os" "path/filepath" + "strings" "testing" "time" ) @@ -891,7 +892,7 @@ func TestOrchWaitWakesOnBlockedEvent(t *testing.T) { "--agent", "worker-a", "--thread", threadID, ) - runInboxCommand( + runInboxCommandEventually( t, "--db", dbPath, "--json", @@ -1540,3 +1541,196 @@ func TestOrchCouncilStartDispatchesThreeReviewers(t *testing.T) { t.Fatalf("expected three council tasks, got %#v", tasks) } } + +func TestOrchCouncilWaitWakesWhenAllReviewersComplete(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + startOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "start", + "--run", "council_blog_wait_001", + "--target", "Review the current blog architecture.", + ) + + var startResp map[string]any + mustDecodeJSON(t, startOut, &startResp) + reviewers := nestedArray(t, startResp, "data", "reviewers") + + for _, item := range reviewers { + reviewer, ok := item.(map[string]any) + if !ok { + t.Fatalf("expected reviewer object, got %#v", item) + } + taskID, _ := reviewer["task_id"].(string) + + statusOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "status", + "--run", "council_blog_wait_001", + ) + var statusResp map[string]any + mustDecodeJSON(t, statusOut, &statusResp) + tasks := nestedArray(t, statusResp, "data", "tasks") + + var threadID string + for _, taskItem := range tasks { + task, ok := taskItem.(map[string]any) + if !ok { + t.Fatalf("expected task object, got %#v", taskItem) + } + if task["task_id"] == taskID { + taskStatus := runOrchCommand( + t, + "--db", dbPath, + "--json", + "status", + "--run", "council_blog_wait_001", + ) + var taskStatusResp map[string]any + mustDecodeJSON(t, taskStatus, &taskStatusResp) + statusTasks := nestedArray(t, taskStatusResp, "data", "tasks") + for _, statusTaskItem := range statusTasks { + statusTask, ok := statusTaskItem.(map[string]any) + if !ok { + t.Fatalf("expected status task object, got %#v", statusTaskItem) + } + if statusTask["task_id"] == taskID { + break + } + } + } + } + + sqlDB, err := openOrchDB(t.Context(), dbPath) + if err != nil { + t.Fatalf("open orch db: %v", err) + } + if err := sqlDB.QueryRowContext( + t.Context(), + `SELECT thread_id + FROM task_attempts + WHERE run_id = ? AND task_id = ? AND attempt_no = 1`, + "council_blog_wait_001", + taskID, + ).Scan(&threadID); err != nil { + sqlDB.Close() + t.Fatalf("query council reviewer thread id: %v", err) + } + sqlDB.Close() + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", reviewer["reviewer_role"].(string), + "--thread", threadID, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", reviewer["reviewer_role"].(string), + "--thread", threadID, + "--summary", "Review complete", + ) + } + + waitOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "wait", + "--run", "council_blog_wait_001", + "--timeout-seconds", "2", + ) + + var waitResp map[string]any + mustDecodeJSON(t, waitOut, &waitResp) + if woke, _ := nestedValue(t, waitResp, "data", "woke").(bool); !woke { + t.Fatalf("expected council wait to wake, got %#v", waitResp) + } + if allComplete, _ := nestedValue(t, waitResp, "data", "all_complete").(bool); !allComplete { + t.Fatalf("expected all reviewers complete, got %#v", waitResp) + } + reviewers = nestedArray(t, waitResp, "data", "reviewers") + if len(reviewers) != 3 { + t.Fatalf("expected three council reviewer statuses, got %#v", reviewers) + } + for _, item := range reviewers { + reviewer, ok := item.(map[string]any) + if !ok { + t.Fatalf("expected reviewer object, got %#v", item) + } + if got, _ := reviewer["status"].(string); got != "done" { + t.Fatalf("expected done reviewer status, got %#v", reviewer["status"]) + } + } +} + +func TestOrchCouncilWaitTimesOutWhenReviewersIncomplete(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "start", + "--run", "council_blog_wait_002", + "--target", "Review the current blog architecture.", + ) + + waitOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "wait", + "--run", "council_blog_wait_002", + "--timeout-seconds", "1", + ) + + var waitResp map[string]any + mustDecodeJSON(t, waitOut, &waitResp) + if woke, _ := nestedValue(t, waitResp, "data", "woke").(bool); woke { + t.Fatalf("expected council wait timeout, got %#v", waitResp) + } + if allComplete, _ := nestedValue(t, waitResp, "data", "all_complete").(bool); allComplete { + t.Fatalf("expected incomplete reviewer set on timeout, got %#v", waitResp) + } + reviewers := nestedArray(t, waitResp, "data", "reviewers") + if len(reviewers) != 3 { + t.Fatalf("expected three reviewer statuses on timeout, got %#v", reviewers) + } +} + +func runInboxCommandEventually(t *testing.T, args ...string) string { + t.Helper() + + deadline := time.Now().Add(2 * time.Second) + var lastStdout, lastStderr string + var lastExit int + for { + lastStdout, lastStderr, lastExit = executeInboxCommand(args...) + if lastExit == 0 { + return lastStdout + } + if time.Now().After(deadline) || !isSQLiteBusyPayload(lastStdout) { + t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, lastExit, lastStderr, lastStdout) + } + time.Sleep(25 * time.Millisecond) + } +} + +func isSQLiteBusyPayload(stdout string) bool { + return strings.Contains(strings.ToLower(stdout), "sqlite_busy") || + strings.Contains(strings.ToLower(stdout), "database is locked") +} diff --git a/internal/store/council.go b/internal/store/council.go index 08d5b2a..6df3129 100644 --- a/internal/store/council.go +++ b/internal/store/council.go @@ -2,9 +2,11 @@ package store import ( "context" + "database/sql" "errors" "fmt" "strings" + "time" ) var councilReviewerRoles = []string{ @@ -53,6 +55,18 @@ type CouncilStartResult struct { Reviewers []CouncilReviewer `json:"reviewers"` } +type CouncilWaitInput struct { + RunID string + Timeout time.Duration +} + +type CouncilWaitResult struct { + Woke bool `json:"woke"` + RunID string `json:"run_id"` + AllComplete bool `json:"all_complete"` + ReviewerStatuses []CouncilReviewer `json:"reviewers"` +} + func (s *OrchStore) StartCouncil(ctx context.Context, input CouncilStartInput) (CouncilStartResult, error) { runID := strings.TrimSpace(input.RunID) if runID == "" { @@ -442,3 +456,168 @@ func boolToInt(value bool) int { } return 0 } + +func (s *OrchStore) WaitForCouncil(ctx context.Context, input CouncilWaitInput) (CouncilWaitResult, error) { + runID := strings.TrimSpace(input.RunID) + if runID == "" { + return CouncilWaitResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + + if _, err := s.GetCouncilRun(ctx, runID); err != nil { + return CouncilWaitResult{}, err + } + + waitCtx := ctx + cancel := func() {} + if input.Timeout > 0 { + waitCtx, cancel = context.WithTimeout(ctx, input.Timeout) + } + defer cancel() + + for { + reviewers, allComplete, err := s.GetCouncilReviewerStatuses(waitCtx, runID) + if err != nil { + if isDeadlineExceeded(waitCtx) { + return CouncilWaitResult{ + Woke: false, + RunID: runID, + AllComplete: false, + ReviewerStatuses: reviewers, + }, nil + } + return CouncilWaitResult{}, err + } + if allComplete { + return CouncilWaitResult{ + Woke: true, + RunID: runID, + AllComplete: true, + ReviewerStatuses: reviewers, + }, nil + } + + if _, err := s.ReconcileRun(waitCtx, runID); err != nil { + if isSQLiteBusyError(err) { + ok, waitErr := waitForNextPoll(waitCtx, 25*time.Millisecond) + if waitErr != nil { + if errors.Is(waitErr, context.DeadlineExceeded) { + reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) + return CouncilWaitResult{ + Woke: false, + RunID: runID, + AllComplete: false, + ReviewerStatuses: reviewers, + }, nil + } + return CouncilWaitResult{}, waitErr + } + if !ok { + reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) + return CouncilWaitResult{ + Woke: false, + RunID: runID, + AllComplete: false, + ReviewerStatuses: reviewers, + }, nil + } + continue + } + if isDeadlineExceeded(waitCtx) { + reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) + return CouncilWaitResult{ + Woke: false, + RunID: runID, + AllComplete: false, + ReviewerStatuses: reviewers, + }, nil + } + return CouncilWaitResult{}, err + } + + ok, err := waitForNextPoll(waitCtx, 200*time.Millisecond) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) + return CouncilWaitResult{ + Woke: false, + RunID: runID, + AllComplete: false, + ReviewerStatuses: reviewers, + }, nil + } + return CouncilWaitResult{}, err + } + if !ok { + reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) + return CouncilWaitResult{ + Woke: false, + RunID: runID, + AllComplete: false, + ReviewerStatuses: reviewers, + }, nil + } + } +} + +func (s *OrchStore) GetCouncilRun(ctx context.Context, runID string) (CouncilRun, error) { + row := s.db.QueryRowContext( + ctx, + `SELECT run_id, mode, target_type, output_mode, only_unanimous + FROM council_runs + WHERE run_id = ?`, + runID, + ) + + var ( + run CouncilRun + onlyUnanimous int + ) + err := row.Scan(&run.RunID, &run.Mode, &run.TargetType, &run.OutputMode, &onlyUnanimous) + if errors.Is(err, sql.ErrNoRows) { + return CouncilRun{}, fmt.Errorf("%w: council run %s not found", ErrRunNotFound, runID) + } + if err != nil { + return CouncilRun{}, fmt.Errorf("scan council run: %w", err) + } + run.OnlyUnanimous = onlyUnanimous != 0 + return run, nil +} + +func (s *OrchStore) GetCouncilReviewerStatuses(ctx context.Context, runID string) ([]CouncilReviewer, bool, error) { + rows, err := s.db.QueryContext( + ctx, + `SELECT cr.reviewer_role, cr.task_id, t.status + FROM council_reviewers cr + JOIN tasks t + ON t.run_id = cr.run_id + AND t.task_id = cr.task_id + WHERE cr.run_id = ? + ORDER BY cr.reviewer_role ASC`, + runID, + ) + if err != nil { + return nil, false, fmt.Errorf("query council reviewer statuses: %w", err) + } + defer rows.Close() + + reviewers := make([]CouncilReviewer, 0, len(councilReviewerRoles)) + allComplete := true + for rows.Next() { + var reviewer CouncilReviewer + if err := rows.Scan(&reviewer.ReviewerRole, &reviewer.TaskID, &reviewer.Status); err != nil { + return nil, false, fmt.Errorf("scan council reviewer status: %w", err) + } + if reviewer.Status != "done" && reviewer.Status != "failed" && reviewer.Status != "cancelled" { + allComplete = false + } + reviewers = append(reviewers, reviewer) + } + if err := rows.Err(); err != nil { + return nil, false, fmt.Errorf("iterate council reviewer statuses: %w", err) + } + if len(reviewers) == 0 { + return nil, false, fmt.Errorf("%w: council reviewers for run %s not found", ErrRunNotFound, runID) + } + + return reviewers, allComplete, nil +}