From 3d4915c5cf19f60bc403fe7904305865480182f5 Mon Sep 17 00:00:00 2001 From: kurihada Date: Tue, 24 Mar 2026 01:43:51 +0800 Subject: [PATCH] Wake default waiters on verifying events --- docs/orch-cli.md | 4 +- packages/coord-core/store/orch.go | 14 +- .../internal/cli/orch/integration_test.go | 126 ++++++++++++++++++ .../orch-runtime/internal/cli/orch/wait.go | 6 +- 4 files changed, 143 insertions(+), 7 deletions(-) diff --git a/docs/orch-cli.md b/docs/orch-cli.md index 4544df2..8d4ebb7 100644 --- a/docs/orch-cli.md +++ b/docs/orch-cli.md @@ -312,7 +312,7 @@ This is the normal wait primitive for the interactive leader. Suggested flags: - `--run RUN_ID` -- `--for task_ready,task_blocked,task_done,task_failed` +- `--for task_ready,task_blocked,task_verifying,task_done,task_failed` - `--after-event EVENT_ID` - `--timeout-seconds N` @@ -690,7 +690,7 @@ orch dep add --run blog_mvp_001 --task T2 --depends-on T1 --json orch ready --run blog_mvp_001 --json orch dispatch --run blog_mvp_001 --task T1 --execution-mode code --to foundation-worker --base-ref main --workspace-root .orch/worktrees --body-file tasks/t1.md --json orch reconcile --run blog_mvp_001 --json -orch wait --run blog_mvp_001 --for task_blocked,task_done,task_failed --after-event 0 --timeout-seconds 900 --json +orch wait --run blog_mvp_001 --for task_blocked,task_verifying,task_done,task_failed --after-event 0 --timeout-seconds 900 --json orch blocked --run blog_mvp_001 --json orch answer --run blog_mvp_001 --task T2 --body "MVP supports draft and published only." --json orch retry --run blog_mvp_001 --task T7a --to backend-worker --body "Retry after fixing the contract mismatch." --json diff --git a/packages/coord-core/store/orch.go b/packages/coord-core/store/orch.go index e7a4fb5..c458809 100644 --- a/packages/coord-core/store/orch.go +++ b/packages/coord-core/store/orch.go @@ -207,6 +207,16 @@ type WaitResult struct { Events []RunEvent `json:"events,omitempty"` } +const DefaultWaitEventTypesCSV = "task_ready,task_blocked,task_verifying,task_done,task_failed" + +var defaultWaitEventTypes = []string{ + "task_ready", + "task_blocked", + "task_verifying", + "task_done", + "task_failed", +} + type DispatchWorkspace struct { ExecutionMode string `json:"execution_mode,omitempty"` BaseRef string `json:"base_ref,omitempty"` @@ -3267,7 +3277,7 @@ func normalizePriority(priority string) (string, error) { func normalizeWaitEventTypes(eventTypes []string) []string { if len(eventTypes) == 0 { - return []string{"task_ready", "task_blocked", "task_done", "task_failed"} + return append([]string(nil), defaultWaitEventTypes...) } normalized := make([]string, 0, len(eventTypes)) @@ -3284,7 +3294,7 @@ func normalizeWaitEventTypes(eventTypes []string) []string { normalized = append(normalized, eventType) } if len(normalized) == 0 { - return []string{"task_ready", "task_blocked", "task_done", "task_failed"} + return append([]string(nil), defaultWaitEventTypes...) } return normalized } diff --git a/packages/orch-runtime/internal/cli/orch/integration_test.go b/packages/orch-runtime/internal/cli/orch/integration_test.go index 9982355..721cce2 100644 --- a/packages/orch-runtime/internal/cli/orch/integration_test.go +++ b/packages/orch-runtime/internal/cli/orch/integration_test.go @@ -2,6 +2,7 @@ package orch import ( "database/sql" + "fmt" "os" "path/filepath" "strings" @@ -1151,6 +1152,131 @@ func TestOrchWaitTimesOutWithoutMatchingEvent(t *testing.T) { } } +func TestOrchWaitDefaultsWakeOnVerifyingEvent(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "run", "init", + "--run", "run_blog_wait_003", + "--goal", "Validate default wait wakes on verifying", + ) + runOrchCommand( + t, + "--db", dbPath, + "--json", + "task", "add", + "--run", "run_blog_wait_003", + "--task", "T1", + "--title", "Implement gated task", + "--default-to", "worker-a", + "--required-check", "lint", + ) + + dispatchOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "dispatch", + "--run", "run_blog_wait_003", + "--task", "T1", + "--execution-mode", "analysis", + ) + + var dispatchResp map[string]any + mustDecodeJSON(t, dispatchOut, &dispatchResp) + threadID := nestedString(t, dispatchResp, "data", "attempt", "thread_id") + + sqlDB, err := openOrchDB(t.Context(), dbPath) + if err != nil { + t.Fatalf("open orch db: %v", err) + } + defer sqlDB.Close() + + var afterEventID int64 + if err := sqlDB.QueryRowContext( + t.Context(), + `SELECT COALESCE(MAX(event_id), 0) + FROM events + WHERE run_id = ?`, + "run_blog_wait_003", + ).Scan(&afterEventID); err != nil { + t.Fatalf("query latest event id: %v", err) + } + + type waitResult struct { + stdout string + stderr string + exitCode int + } + resultCh := make(chan waitResult, 1) + go func() { + stdout, stderr, exitCode := executeOrchCommand( + "--db", dbPath, + "--json", + "wait", + "--run", "run_blog_wait_003", + "--after-event", fmt.Sprintf("%d", afterEventID), + "--timeout-seconds", "5", + ) + resultCh <- waitResult{stdout: stdout, stderr: stderr, exitCode: exitCode} + }() + + time.Sleep(200 * time.Millisecond) + + runInboxCommandEventually( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", threadID, + ) + runInboxCommandEventually( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadID, + "--summary", "Implementation finished", + "--body", "Ready for verification.", + ) + + select { + case result := <-resultCh: + if result.exitCode != 0 { + t.Fatalf("wait exited with %d\nstderr:\n%s\nstdout:\n%s", result.exitCode, result.stderr, result.stdout) + } + + var waitResp map[string]any + mustDecodeJSON(t, result.stdout, &waitResp) + if woke, _ := nestedValue(t, waitResp, "data", "woke").(bool); !woke { + t.Fatalf("expected wait to wake, got %#v", waitResp) + } + events := nestedArray(t, waitResp, "data", "events") + if len(events) != 1 { + t.Fatalf("expected one wait event, got %#v", events) + } + event, ok := events[0].(map[string]any) + if !ok { + t.Fatalf("expected wait event object, got %#v", events[0]) + } + if got, _ := event["type"].(string); got != "task_verifying" { + t.Fatalf("expected task_verifying event, got %#v", event["type"]) + } + if got, _ := event["task_id"].(string); got != "T1" { + t.Fatalf("expected task_verifying for T1, got %#v", event["task_id"]) + } + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for default orch wait result") + } +} + func TestOrchRetryCreatesNewAttempt(t *testing.T) { t.Parallel() diff --git a/packages/orch-runtime/internal/cli/orch/wait.go b/packages/orch-runtime/internal/cli/orch/wait.go index e77287a..04f7f7a 100644 --- a/packages/orch-runtime/internal/cli/orch/wait.go +++ b/packages/orch-runtime/internal/cli/orch/wait.go @@ -26,10 +26,10 @@ func newWaitCmd(root *rootOptions) *cobra.Command { Short: "Block until matching run-scoped task events become available", Long: helpLong( "Use wait as the leader-side blocking primitive.", - "Instead of polling with manual sleep loops, wait blocks until later matching task events exist for the run, such as ready, blocked, done, or failed.", + "Instead of polling with manual sleep loops, wait blocks until later matching task events exist for the run, such as ready, blocked, verifying, done, or failed.", "Use --after-event when resuming from a known cursor so you do not reprocess earlier events.", ), - Example: ` orch --db .agents/coord.db wait --run blog_mvp_001 --for task_blocked,task_done --after-event 0 --timeout-seconds 900`, + Example: ` orch --db .agents/coord.db wait --run blog_mvp_001 --for task_blocked,task_verifying,task_done --after-event 0 --timeout-seconds 900`, RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() @@ -77,7 +77,7 @@ func newWaitCmd(root *rootOptions) *cobra.Command { } cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID") - cmd.Flags().StringVar(&opts.eventTypesRaw, "for", "task_ready,task_blocked,task_done,task_failed", "Comma-separated event types to wait for") + cmd.Flags().StringVar(&opts.eventTypesRaw, "for", store.DefaultWaitEventTypesCSV, "Comma-separated event types to wait for") cmd.Flags().Int64Var(&opts.afterEventID, "after-event", 0, "Only wait for events after this event ID") cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait before timing out") _ = cmd.MarkFlagRequired("run")