Improve orch status reconciliation view

This commit is contained in:
2026-03-20 17:57:58 +08:00
parent 693a79345b
commit cf3c3cbe60
11 changed files with 374 additions and 21 deletions
+127
View File
@@ -72,6 +72,19 @@ type RunOverview struct {
Tasks []Task `json:"tasks,omitempty"`
}
type RunStatusTask struct {
Task
LatestAttempt *TaskAttempt `json:"latest_attempt,omitempty"`
LatestMessage *Message `json:"latest_message,omitempty"`
BlockedQuestion *Message `json:"blocked_question,omitempty"`
}
type RunStatusView struct {
Run Run `json:"run"`
TaskCounts map[string]int `json:"task_counts"`
Tasks []RunStatusTask `json:"tasks,omitempty"`
}
type CreateRunInput struct {
RunID string
Goal string
@@ -1781,6 +1794,52 @@ func (s *OrchStore) GetRunOverview(ctx context.Context, runID string) (RunOvervi
}, nil
}
func (s *OrchStore) GetRunStatusView(ctx context.Context, runID string) (RunStatusView, error) {
if strings.TrimSpace(runID) == "" {
return RunStatusView{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return RunStatusView{}, fmt.Errorf("begin run status transaction: %w", err)
}
defer tx.Rollback()
if _, err := selectRun(ctx, tx, runID); err != nil {
return RunStatusView{}, err
}
if err := refreshReadyStates(ctx, tx, runID, now); err != nil {
return RunStatusView{}, err
}
if err := updateRunAggregateStatus(ctx, tx, runID, now); err != nil {
return RunStatusView{}, err
}
run, err := selectRun(ctx, tx, runID)
if err != nil {
return RunStatusView{}, err
}
taskCounts, err := collectTaskCounts(ctx, tx, runID)
if err != nil {
return RunStatusView{}, err
}
tasks, err := listRunStatusTasks(ctx, tx, runID)
if err != nil {
return RunStatusView{}, err
}
if err := tx.Commit(); err != nil {
return RunStatusView{}, fmt.Errorf("commit run status transaction: %w", err)
}
return RunStatusView{
Run: run,
TaskCounts: taskCounts,
Tasks: tasks,
}, nil
}
func (s *OrchStore) WaitForEvents(ctx context.Context, input WaitInput) (WaitResult, error) {
if strings.TrimSpace(input.RunID) == "" {
return WaitResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
@@ -1893,6 +1952,47 @@ func listTasksForRun(ctx context.Context, db queryRowsContexter, runID string) (
return tasks, nil
}
func listRunStatusTasks(ctx context.Context, db queryRowsAndRower, runID string) ([]RunStatusTask, error) {
tasks, err := listTasksForRun(ctx, db, runID)
if err != nil {
return nil, err
}
items := make([]RunStatusTask, 0, len(tasks))
for _, task := range tasks {
item := RunStatusTask{Task: task}
if task.LatestAttemptNo > 0 {
attempt, err := selectAttempt(ctx, db, runID, task.TaskID, task.LatestAttemptNo)
if err != nil {
return nil, err
}
attemptCopy := attempt
item.LatestAttempt = &attemptCopy
if strings.TrimSpace(attempt.ThreadID) != "" {
latestMessage, err := selectLatestThreadMessage(ctx, db, attempt.ThreadID)
if err != nil {
return nil, err
}
latestMessageCopy := latestMessage
item.LatestMessage = &latestMessageCopy
if task.Status == "blocked" {
question, err := selectLatestQuestionMessage(ctx, db, attempt.ThreadID)
if err != nil {
return nil, err
}
questionCopy := question
item.BlockedQuestion = &questionCopy
}
}
}
items = append(items, item)
}
return items, nil
}
func (s *OrchStore) findRunEventsAfter(ctx context.Context, runID string, afterEventID int64, eventTypes []string) ([]RunEvent, int64, bool, error) {
args := []any{runID, afterEventID}
query := `SELECT
@@ -2194,6 +2294,33 @@ func selectLatestQuestionMessage(ctx context.Context, db queryRowsAndRower, thre
return message, nil
}
func selectLatestThreadMessage(ctx context.Context, db queryRowsAndRower, threadID string) (Message, error) {
row := db.QueryRowContext(
ctx,
`SELECT
message_id, thread_id, from_agent, to_agent, kind, summary, body,
payload_json, created_at
FROM messages
WHERE thread_id = ?
ORDER BY created_at DESC
LIMIT 1`,
threadID,
)
message, err := scanMessage(row)
if errors.Is(err, sql.ErrNoRows) {
return Message{}, fmt.Errorf("%w: thread %s has no messages", ErrInvalidState, threadID)
}
if err != nil {
return Message{}, err
}
artifactsByMessageID, err := loadArtifactsForMessageIDsFromQueryer(ctx, db, []string{message.MessageID})
if err != nil {
return Message{}, err
}
message.Artifacts = artifactsByMessageID[message.MessageID]
return message, nil
}
type queryRowsAndRower interface {
queryRower
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
@@ -375,6 +375,120 @@ func TestOrchStatusReturnsRunSummaryAndTaskList(t *testing.T) {
if got, _ := task["status"].(string); got != "done" {
t.Fatalf("expected task status done, got %#v", task["status"])
}
if got := nestedString(t, task, "latest_attempt", "assigned_to"); got != "worker-a" {
t.Fatalf("expected latest_attempt.assigned_to worker-a, got %q", got)
}
if got := nestedString(t, task, "latest_attempt", "status"); got != "done" {
t.Fatalf("expected latest_attempt.status done, got %q", got)
}
if got := nestedString(t, task, "latest_message", "kind"); got != "result" {
t.Fatalf("expected latest_message.kind result, got %q", got)
}
if got := nestedString(t, task, "latest_message", "summary"); got != "Retry policy implemented" {
t.Fatalf("expected latest_message.summary to match result summary, got %q", got)
}
}
func TestOrchStatusAutoReconcilesAndIncludesBlockedContext(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runOrchCommand(
t,
"--db", dbPath,
"--json",
"run", "init",
"--run", "run_blog_status_002",
"--goal", "Build blog MVP",
)
runOrchCommand(
t,
"--db", dbPath,
"--json",
"task", "add",
"--run", "run_blog_status_002",
"--task", "T1",
"--title", "Implement retry policy",
"--default-to", "worker-a",
)
dispatchOut := runOrchCommand(
t,
"--db", dbPath,
"--json",
"dispatch",
"--run", "run_blog_status_002",
"--task", "T1",
"--body", "Implement retry handling for the HTTP client.",
)
var dispatchResp map[string]any
mustDecodeJSON(t, dispatchOut, &dispatchResp)
threadID := nestedString(t, dispatchResp, "data", "attempt", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-a",
"--thread", threadID,
)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need logging decision",
"--payload-json", `{"question":"Should retry attempts be logged?"}`,
)
statusOut := runOrchCommand(
t,
"--db", dbPath,
"--json",
"status",
"--run", "run_blog_status_002",
)
var statusResp map[string]any
mustDecodeJSON(t, statusOut, &statusResp)
if got := nestedString(t, statusResp, "data", "run", "status"); got != "blocked" {
t.Fatalf("expected run status blocked after status auto-reconcile, got %q", got)
}
tasks := nestedArray(t, statusResp, "data", "tasks")
if len(tasks) != 1 {
t.Fatalf("expected one task in status response, got %#v", tasks)
}
task, ok := tasks[0].(map[string]any)
if !ok {
t.Fatalf("expected task object, got %#v", tasks[0])
}
if got := nestedString(t, task, "status"); got != "blocked" {
t.Fatalf("expected task status blocked, got %q", got)
}
if got := nestedString(t, task, "latest_attempt", "status"); got != "blocked" {
t.Fatalf("expected latest_attempt.status blocked, got %q", got)
}
if got := nestedString(t, task, "latest_attempt", "thread_id"); got != threadID {
t.Fatalf("expected latest_attempt.thread_id %q, got %q", threadID, got)
}
if got := nestedString(t, task, "latest_message", "kind"); got != "question" {
t.Fatalf("expected latest_message.kind question, got %q", got)
}
if got := nestedString(t, task, "latest_message", "summary"); got != "Need logging decision" {
t.Fatalf("expected latest_message.summary to match blocked update, got %q", got)
}
if got := nestedString(t, task, "blocked_question", "summary"); got != "Need logging decision" {
t.Fatalf("expected blocked_question.summary to match latest question, got %q", got)
}
if got := nestedString(t, task, "blocked_question", "kind"); got != "question" {
t.Fatalf("expected blocked_question.kind question, got %q", got)
}
}
func TestOrchReconcileMapsFailedThreadToTerminalTaskState(t *testing.T) {
@@ -28,7 +28,12 @@ func newStatusCmd(root *rootOptions) *cobra.Command {
}
defer sqlDB.Close()
overview, err := store.NewOrchStore(sqlDB).GetRunOverview(ctx, opts.runID)
orchStore := store.NewOrchStore(sqlDB)
if _, err := orchStore.ReconcileRun(ctx, opts.runID); err != nil {
return err
}
overview, err := orchStore.GetRunStatusView(ctx, opts.runID)
if err != nil {
return err
}