Wake default waiters on verifying events
This commit is contained in:
@@ -2,6 +2,7 @@ package orch
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -1151,6 +1152,131 @@ func TestOrchWaitTimesOutWithoutMatchingEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrchWaitDefaultsWakeOnVerifyingEvent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
|
||||
runOrchCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"run", "init",
|
||||
"--run", "run_blog_wait_003",
|
||||
"--goal", "Validate default wait wakes on verifying",
|
||||
)
|
||||
runOrchCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"task", "add",
|
||||
"--run", "run_blog_wait_003",
|
||||
"--task", "T1",
|
||||
"--title", "Implement gated task",
|
||||
"--default-to", "worker-a",
|
||||
"--required-check", "lint",
|
||||
)
|
||||
|
||||
dispatchOut := runOrchCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"dispatch",
|
||||
"--run", "run_blog_wait_003",
|
||||
"--task", "T1",
|
||||
"--execution-mode", "analysis",
|
||||
)
|
||||
|
||||
var dispatchResp map[string]any
|
||||
mustDecodeJSON(t, dispatchOut, &dispatchResp)
|
||||
threadID := nestedString(t, dispatchResp, "data", "attempt", "thread_id")
|
||||
|
||||
sqlDB, err := openOrchDB(t.Context(), dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open orch db: %v", err)
|
||||
}
|
||||
defer sqlDB.Close()
|
||||
|
||||
var afterEventID int64
|
||||
if err := sqlDB.QueryRowContext(
|
||||
t.Context(),
|
||||
`SELECT COALESCE(MAX(event_id), 0)
|
||||
FROM events
|
||||
WHERE run_id = ?`,
|
||||
"run_blog_wait_003",
|
||||
).Scan(&afterEventID); err != nil {
|
||||
t.Fatalf("query latest event id: %v", err)
|
||||
}
|
||||
|
||||
type waitResult struct {
|
||||
stdout string
|
||||
stderr string
|
||||
exitCode int
|
||||
}
|
||||
resultCh := make(chan waitResult, 1)
|
||||
go func() {
|
||||
stdout, stderr, exitCode := executeOrchCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"wait",
|
||||
"--run", "run_blog_wait_003",
|
||||
"--after-event", fmt.Sprintf("%d", afterEventID),
|
||||
"--timeout-seconds", "5",
|
||||
)
|
||||
resultCh <- waitResult{stdout: stdout, stderr: stderr, exitCode: exitCode}
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommandEventually(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"claim",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
)
|
||||
runInboxCommandEventually(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"done",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Implementation finished",
|
||||
"--body", "Ready for verification.",
|
||||
)
|
||||
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
if result.exitCode != 0 {
|
||||
t.Fatalf("wait exited with %d\nstderr:\n%s\nstdout:\n%s", result.exitCode, result.stderr, result.stdout)
|
||||
}
|
||||
|
||||
var waitResp map[string]any
|
||||
mustDecodeJSON(t, result.stdout, &waitResp)
|
||||
if woke, _ := nestedValue(t, waitResp, "data", "woke").(bool); !woke {
|
||||
t.Fatalf("expected wait to wake, got %#v", waitResp)
|
||||
}
|
||||
events := nestedArray(t, waitResp, "data", "events")
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected one wait event, got %#v", events)
|
||||
}
|
||||
event, ok := events[0].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected wait event object, got %#v", events[0])
|
||||
}
|
||||
if got, _ := event["type"].(string); got != "task_verifying" {
|
||||
t.Fatalf("expected task_verifying event, got %#v", event["type"])
|
||||
}
|
||||
if got, _ := event["task_id"].(string); got != "T1" {
|
||||
t.Fatalf("expected task_verifying for T1, got %#v", event["task_id"])
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timed out waiting for default orch wait result")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrchRetryCreatesNewAttempt(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
@@ -26,10 +26,10 @@ func newWaitCmd(root *rootOptions) *cobra.Command {
|
||||
Short: "Block until matching run-scoped task events become available",
|
||||
Long: helpLong(
|
||||
"Use wait as the leader-side blocking primitive.",
|
||||
"Instead of polling with manual sleep loops, wait blocks until later matching task events exist for the run, such as ready, blocked, done, or failed.",
|
||||
"Instead of polling with manual sleep loops, wait blocks until later matching task events exist for the run, such as ready, blocked, verifying, done, or failed.",
|
||||
"Use --after-event when resuming from a known cursor so you do not reprocess earlier events.",
|
||||
),
|
||||
Example: ` orch --db .agents/coord.db wait --run blog_mvp_001 --for task_blocked,task_done --after-event 0 --timeout-seconds 900`,
|
||||
Example: ` orch --db .agents/coord.db wait --run blog_mvp_001 --for task_blocked,task_verifying,task_done --after-event 0 --timeout-seconds 900`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
ctx := cmd.Context()
|
||||
|
||||
@@ -77,7 +77,7 @@ func newWaitCmd(root *rootOptions) *cobra.Command {
|
||||
}
|
||||
|
||||
cmd.Flags().StringVar(&opts.runID, "run", "", "Run ID")
|
||||
cmd.Flags().StringVar(&opts.eventTypesRaw, "for", "task_ready,task_blocked,task_done,task_failed", "Comma-separated event types to wait for")
|
||||
cmd.Flags().StringVar(&opts.eventTypesRaw, "for", store.DefaultWaitEventTypesCSV, "Comma-separated event types to wait for")
|
||||
cmd.Flags().Int64Var(&opts.afterEventID, "after-event", 0, "Only wait for events after this event ID")
|
||||
cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait before timing out")
|
||||
_ = cmd.MarkFlagRequired("run")
|
||||
|
||||
Reference in New Issue
Block a user