Add council review wait command

This commit is contained in:
2026-03-19 15:13:34 +08:00
parent c1beacb703
commit dd6a0e31b6
7 changed files with 522 additions and 10 deletions
+1
View File
@@ -9,5 +9,6 @@ func newCouncilCmd(root *rootOptions) *cobra.Command {
}
cmd.AddCommand(newCouncilStartCmd(root))
cmd.AddCommand(newCouncilWaitCmd(root))
return cmd
}
+69
View File
@@ -0,0 +1,69 @@
package orch
import (
"fmt"
"time"
"ai-workflow-skill/internal/protocol"
"ai-workflow-skill/internal/store"
"github.com/spf13/cobra"
)
type councilWaitOptions struct {
runID string
timeoutSeconds int
}
func newCouncilWaitCmd(root *rootOptions) *cobra.Command {
opts := &councilWaitOptions{}
cmd := &cobra.Command{
Use: "wait",
Short: "Block until all council reviewers complete or timeout is reached",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
sqlDB, err := openOrchDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
result, err := store.NewOrchStore(sqlDB).WaitForCouncil(ctx, store.CouncilWaitInput{
RunID: opts.runID,
Timeout: time.Duration(opts.timeoutSeconds) * time.Second,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "council wait",
Data: map[string]any{
"run_id": result.RunID,
"woke": result.Woke,
"all_complete": result.AllComplete,
"reviewers": result.ReviewerStatuses,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
if !result.Woke {
_, err = fmt.Fprintf(cmd.OutOrStdout(), "council wait timed out for run %s\n", result.RunID)
return err
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "all council reviewers completed for run %s\n", result.RunID)
return err
},
}
cmd.Flags().StringVar(&opts.runID, "run", "", "Council run ID")
cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait before timing out")
_ = cmd.MarkFlagRequired("run")
return cmd
}
+195 -1
View File
@@ -4,6 +4,7 @@ import (
"database/sql"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
@@ -891,7 +892,7 @@ func TestOrchWaitWakesOnBlockedEvent(t *testing.T) {
"--agent", "worker-a",
"--thread", threadID,
)
runInboxCommand(
runInboxCommandEventually(
t,
"--db", dbPath,
"--json",
@@ -1540,3 +1541,196 @@ func TestOrchCouncilStartDispatchesThreeReviewers(t *testing.T) {
t.Fatalf("expected three council tasks, got %#v", tasks)
}
}
func TestOrchCouncilWaitWakesWhenAllReviewersComplete(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
startOut := runOrchCommand(
t,
"--db", dbPath,
"--json",
"council", "start",
"--run", "council_blog_wait_001",
"--target", "Review the current blog architecture.",
)
var startResp map[string]any
mustDecodeJSON(t, startOut, &startResp)
reviewers := nestedArray(t, startResp, "data", "reviewers")
for _, item := range reviewers {
reviewer, ok := item.(map[string]any)
if !ok {
t.Fatalf("expected reviewer object, got %#v", item)
}
taskID, _ := reviewer["task_id"].(string)
statusOut := runOrchCommand(
t,
"--db", dbPath,
"--json",
"status",
"--run", "council_blog_wait_001",
)
var statusResp map[string]any
mustDecodeJSON(t, statusOut, &statusResp)
tasks := nestedArray(t, statusResp, "data", "tasks")
var threadID string
for _, taskItem := range tasks {
task, ok := taskItem.(map[string]any)
if !ok {
t.Fatalf("expected task object, got %#v", taskItem)
}
if task["task_id"] == taskID {
taskStatus := runOrchCommand(
t,
"--db", dbPath,
"--json",
"status",
"--run", "council_blog_wait_001",
)
var taskStatusResp map[string]any
mustDecodeJSON(t, taskStatus, &taskStatusResp)
statusTasks := nestedArray(t, taskStatusResp, "data", "tasks")
for _, statusTaskItem := range statusTasks {
statusTask, ok := statusTaskItem.(map[string]any)
if !ok {
t.Fatalf("expected status task object, got %#v", statusTaskItem)
}
if statusTask["task_id"] == taskID {
break
}
}
}
}
sqlDB, err := openOrchDB(t.Context(), dbPath)
if err != nil {
t.Fatalf("open orch db: %v", err)
}
if err := sqlDB.QueryRowContext(
t.Context(),
`SELECT thread_id
FROM task_attempts
WHERE run_id = ? AND task_id = ? AND attempt_no = 1`,
"council_blog_wait_001",
taskID,
).Scan(&threadID); err != nil {
sqlDB.Close()
t.Fatalf("query council reviewer thread id: %v", err)
}
sqlDB.Close()
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", reviewer["reviewer_role"].(string),
"--thread", threadID,
)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"done",
"--agent", reviewer["reviewer_role"].(string),
"--thread", threadID,
"--summary", "Review complete",
)
}
waitOut := runOrchCommand(
t,
"--db", dbPath,
"--json",
"council", "wait",
"--run", "council_blog_wait_001",
"--timeout-seconds", "2",
)
var waitResp map[string]any
mustDecodeJSON(t, waitOut, &waitResp)
if woke, _ := nestedValue(t, waitResp, "data", "woke").(bool); !woke {
t.Fatalf("expected council wait to wake, got %#v", waitResp)
}
if allComplete, _ := nestedValue(t, waitResp, "data", "all_complete").(bool); !allComplete {
t.Fatalf("expected all reviewers complete, got %#v", waitResp)
}
reviewers = nestedArray(t, waitResp, "data", "reviewers")
if len(reviewers) != 3 {
t.Fatalf("expected three council reviewer statuses, got %#v", reviewers)
}
for _, item := range reviewers {
reviewer, ok := item.(map[string]any)
if !ok {
t.Fatalf("expected reviewer object, got %#v", item)
}
if got, _ := reviewer["status"].(string); got != "done" {
t.Fatalf("expected done reviewer status, got %#v", reviewer["status"])
}
}
}
func TestOrchCouncilWaitTimesOutWhenReviewersIncomplete(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runOrchCommand(
t,
"--db", dbPath,
"--json",
"council", "start",
"--run", "council_blog_wait_002",
"--target", "Review the current blog architecture.",
)
waitOut := runOrchCommand(
t,
"--db", dbPath,
"--json",
"council", "wait",
"--run", "council_blog_wait_002",
"--timeout-seconds", "1",
)
var waitResp map[string]any
mustDecodeJSON(t, waitOut, &waitResp)
if woke, _ := nestedValue(t, waitResp, "data", "woke").(bool); woke {
t.Fatalf("expected council wait timeout, got %#v", waitResp)
}
if allComplete, _ := nestedValue(t, waitResp, "data", "all_complete").(bool); allComplete {
t.Fatalf("expected incomplete reviewer set on timeout, got %#v", waitResp)
}
reviewers := nestedArray(t, waitResp, "data", "reviewers")
if len(reviewers) != 3 {
t.Fatalf("expected three reviewer statuses on timeout, got %#v", reviewers)
}
}
func runInboxCommandEventually(t *testing.T, args ...string) string {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
var lastStdout, lastStderr string
var lastExit int
for {
lastStdout, lastStderr, lastExit = executeInboxCommand(args...)
if lastExit == 0 {
return lastStdout
}
if time.Now().After(deadline) || !isSQLiteBusyPayload(lastStdout) {
t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, lastExit, lastStderr, lastStdout)
}
time.Sleep(25 * time.Millisecond)
}
}
func isSQLiteBusyPayload(stdout string) bool {
return strings.Contains(strings.ToLower(stdout), "sqlite_busy") ||
strings.Contains(strings.ToLower(stdout), "database is locked")
}