Add spec-aware orch tasks and verification gates

This commit is contained in:
2026-03-23 14:05:10 +08:00
parent 4d8c90eb26
commit 9f9b66330c
22 changed files with 1696 additions and 55 deletions
+802 -29
View File
@@ -30,17 +30,58 @@ type Run struct {
}
type Task struct {
RunID string `json:"run_id"`
TaskID string `json:"task_id"`
Title string `json:"title"`
Summary string `json:"summary"`
Status string `json:"status"`
DefaultTo string `json:"default_to,omitempty"`
Priority string `json:"priority"`
AcceptanceJSON json.RawMessage `json:"acceptance_json"`
LatestAttemptNo int `json:"latest_attempt_no,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RunID string `json:"run_id"`
TaskID string `json:"task_id"`
Title string `json:"title"`
Summary string `json:"summary"`
Status string `json:"status"`
DefaultTo string `json:"default_to,omitempty"`
Priority string `json:"priority"`
AcceptanceJSON json.RawMessage `json:"acceptance_json"`
LatestAttemptNo int `json:"latest_attempt_no,omitempty"`
Spec *TaskSpec `json:"spec,omitempty"`
Gate *VerificationGate `json:"gate,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type TaskSpec struct {
RunID string `json:"run_id"`
TaskID string `json:"task_id"`
SpecFile string `json:"spec_file,omitempty"`
SpecSHA string `json:"spec_sha,omitempty"`
SpecBody string `json:"spec_body,omitempty"`
CheckProfile string `json:"check_profile,omitempty"`
RequiredChecks []string `json:"required_checks,omitempty"`
AllowedPaths []string `json:"allowed_paths,omitempty"`
BlockedPaths []string `json:"blocked_paths,omitempty"`
MetadataJSON json.RawMessage `json:"metadata_json,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type TaskCheckRun struct {
RunID string `json:"run_id"`
TaskID string `json:"task_id"`
AttemptNo int `json:"attempt_no"`
CheckName string `json:"check_name"`
Status string `json:"status"`
Summary string `json:"summary"`
Body string `json:"body,omitempty"`
MetadataJSON json.RawMessage `json:"metadata_json,omitempty"`
RecordedBy string `json:"recorded_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type VerificationGate struct {
Status string `json:"status"`
AttemptNo int `json:"attempt_no,omitempty"`
CheckProfile string `json:"check_profile,omitempty"`
RequiredChecks []string `json:"required_checks,omitempty"`
PendingChecks []string `json:"pending_checks,omitempty"`
FailedChecks []string `json:"failed_checks,omitempty"`
Checks []TaskCheckRun `json:"checks,omitempty"`
}
type TaskDependency struct {
@@ -99,6 +140,14 @@ type AddTaskInput struct {
DefaultTo string
AcceptanceJSON string
Priority string
SpecFile string
SpecSHA string
SpecBody string
CheckProfile string
RequiredChecks []string
AllowedPaths []string
BlockedPaths []string
MetadataJSON string
}
type AddDependencyInput struct {
@@ -189,6 +238,38 @@ type AnswerResult struct {
Message Message `json:"message"`
}
type VerifyRecordInput struct {
RunID string
TaskID string
AttemptNo int
CheckName string
Status string
Summary string
Body string
MetadataJSON string
RecordedBy string
}
type VerifyRecordResult struct {
Task Task `json:"task"`
Attempt TaskAttempt `json:"attempt"`
Check TaskCheckRun `json:"check"`
Gate *VerificationGate `json:"gate,omitempty"`
}
type VerificationStatusInput struct {
RunID string
TaskID string
AttemptNo int
}
type VerificationStatusResult struct {
Task Task `json:"task"`
Attempt *TaskAttempt `json:"attempt,omitempty"`
Spec *TaskSpec `json:"spec,omitempty"`
Gate *VerificationGate `json:"gate,omitempty"`
}
type RetryInput struct {
RunID string
TaskID string
@@ -338,6 +419,25 @@ func (s *OrchStore) AddTask(ctx context.Context, input AddTaskInput) (Task, erro
if err != nil {
return Task{}, err
}
specMetadataJSON, err := validateAndNormalizeJSON("metadata-json", input.MetadataJSON)
if err != nil {
return Task{}, err
}
requiredChecks := normalizeStringList(input.RequiredChecks)
allowedPaths := normalizeStringList(input.AllowedPaths)
blockedPaths := normalizeStringList(input.BlockedPaths)
requiredChecksJSON, err := marshalStringList("required-check", requiredChecks)
if err != nil {
return Task{}, err
}
allowedPathsJSON, err := marshalStringList("allowed-path", allowedPaths)
if err != nil {
return Task{}, err
}
blockedPathsJSON, err := marshalStringList("blocked-path", blockedPaths)
if err != nil {
return Task{}, err
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
@@ -375,17 +475,48 @@ func (s *OrchStore) AddTask(ctx context.Context, input AddTaskInput) (Task, erro
}
if err := insertEvent(ctx, tx, eventInput{
RunID: input.RunID,
TaskID: input.TaskID,
Source: "orch",
EventType: "task_added",
Summary: input.Title,
PayloadJSON: marshalJSON(map[string]any{"title": input.Title, "priority": priority}),
CreatedAt: now,
RunID: input.RunID,
TaskID: input.TaskID,
Source: "orch",
EventType: "task_added",
Summary: input.Title,
PayloadJSON: marshalJSON(map[string]any{
"title": input.Title,
"priority": priority,
"spec_file": strings.TrimSpace(input.SpecFile),
"check_profile": strings.TrimSpace(input.CheckProfile),
}),
CreatedAt: now,
}); err != nil {
return Task{}, err
}
if shouldPersistTaskSpec(input, specMetadataJSON, requiredChecks, allowedPaths, blockedPaths) {
_, err = tx.ExecContext(
ctx,
`INSERT INTO task_specs (
run_id, task_id, spec_file, spec_sha, spec_body, check_profile,
required_checks_json, allowed_paths_json, blocked_paths_json,
metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
input.RunID,
input.TaskID,
strings.TrimSpace(input.SpecFile),
strings.TrimSpace(input.SpecSHA),
input.SpecBody,
strings.TrimSpace(input.CheckProfile),
requiredChecksJSON,
allowedPathsJSON,
blockedPathsJSON,
specMetadataJSON,
formatTime(now),
formatTime(now),
)
if err != nil {
return Task{}, fmt.Errorf("insert task spec: %w", err)
}
}
if err := refreshReadyStates(ctx, tx, input.RunID, now); err != nil {
return Task{}, err
}
@@ -397,6 +528,9 @@ func (s *OrchStore) AddTask(ctx context.Context, input AddTaskInput) (Task, erro
if err != nil {
return Task{}, err
}
if err := attachTaskHarnessData(ctx, tx, &task, false); err != nil {
return Task{}, err
}
if err := tx.Commit(); err != nil {
return Task{}, fmt.Errorf("commit add task transaction: %w", err)
@@ -534,6 +668,9 @@ func (s *OrchStore) ListReadyTasks(ctx context.Context, input ListReadyInput) ([
if err != nil {
return nil, err
}
if err := attachTaskHarnessData(ctx, tx, &task, false); err != nil {
return nil, err
}
tasks = append(tasks, task)
}
if err := rows.Err(); err != nil {
@@ -602,6 +739,9 @@ func (s *OrchStore) GetTaskWithLatestAttempt(ctx context.Context, runID, taskID
if err != nil {
return Task{}, nil, err
}
if err := attachTaskHarnessData(ctx, s.db, &task, false); err != nil {
return Task{}, nil, err
}
if task.LatestAttemptNo == 0 {
return task, nil, nil
}
@@ -1118,6 +1258,11 @@ func (s *OrchStore) dispatchTaskTx(
threadID := newID("thr")
messageID := newID("msg")
spec, err := selectTaskSpec(ctx, tx, task.RunID, task.TaskID, true)
if err != nil {
return DispatchResult{}, finalizeWorkspace, err
}
task.Spec = spec
payloadJSON := buildDispatchPayload(task, attemptNo, workspace)
thread := Thread{
ThreadID: threadID,
@@ -1133,7 +1278,7 @@ func (s *OrchStore) dispatchTaskTx(
UpdatedAt: now,
}
_, err := tx.ExecContext(
_, err = tx.ExecContext(
ctx,
`INSERT INTO threads (
thread_id, run_id, task_id, subject, created_by, assigned_to, status,
@@ -1441,7 +1586,10 @@ func (s *OrchStore) ReconcileRun(ctx context.Context, runID string) (ReconcileRe
return ReconcileResult{}, fmt.Errorf("scan reconcile candidate: %w", err)
}
nextStatus := reconcileTaskStatus(threadStatus)
nextStatus, err := reconcileTaskStatus(ctx, tx, runID, taskID, attemptNo, taskStatus, threadStatus)
if err != nil {
return ReconcileResult{}, err
}
if nextStatus == "" {
continue
}
@@ -1535,6 +1683,9 @@ func (s *OrchStore) ReconcileRun(ctx context.Context, runID string) (ReconcileRe
if err != nil {
return ReconcileResult{}, err
}
if err := attachTaskHarnessData(ctx, tx, &task, false); err != nil {
return ReconcileResult{}, err
}
updatedTasks = append(updatedTasks, task)
}
@@ -1593,6 +1744,9 @@ func (s *OrchStore) ListBlockedTasks(ctx context.Context, runID string) ([]Block
if err != nil {
return nil, err
}
if err := attachTaskHarnessData(ctx, tx, &task, false); err != nil {
return nil, err
}
question, err := selectLatestQuestionMessage(ctx, tx, attempt.ThreadID)
if err != nil {
return nil, err
@@ -1750,6 +1904,268 @@ func (s *OrchStore) AnswerTask(ctx context.Context, input AnswerInput) (AnswerRe
}, nil
}
func (s *OrchStore) RecordCheck(ctx context.Context, input VerifyRecordInput) (VerifyRecordResult, error) {
if strings.TrimSpace(input.RunID) == "" {
return VerifyRecordResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
if strings.TrimSpace(input.TaskID) == "" {
return VerifyRecordResult{}, fmt.Errorf("%w: task id is required", ErrInvalidInput)
}
checkName := strings.TrimSpace(input.CheckName)
if checkName == "" {
return VerifyRecordResult{}, fmt.Errorf("%w: check name is required", ErrInvalidInput)
}
checkStatus, err := normalizeCheckStatus(input.Status)
if err != nil {
return VerifyRecordResult{}, err
}
metadataJSON, err := validateAndNormalizeJSON("metadata-json", input.MetadataJSON)
if err != nil {
return VerifyRecordResult{}, err
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return VerifyRecordResult{}, fmt.Errorf("begin record check transaction: %w", err)
}
defer tx.Rollback()
task, err := selectTask(ctx, tx, input.RunID, input.TaskID)
if err != nil {
return VerifyRecordResult{}, err
}
if task.LatestAttemptNo == 0 {
return VerifyRecordResult{}, fmt.Errorf("%w: task %s has no attempt to verify", ErrInvalidState, task.TaskID)
}
if task.Status != "verifying" && task.Status != "failed" && task.Status != "done" {
return VerifyRecordResult{}, fmt.Errorf("%w: task %s is not ready for verification recording", ErrInvalidState, task.TaskID)
}
attemptNo := input.AttemptNo
if attemptNo == 0 {
attemptNo = task.LatestAttemptNo
}
if attemptNo != task.LatestAttemptNo {
return VerifyRecordResult{}, fmt.Errorf("%w: can only record verification for the latest attempt", ErrInvalidState)
}
attempt, err := selectAttempt(ctx, tx, input.RunID, input.TaskID, attemptNo)
if err != nil {
return VerifyRecordResult{}, err
}
checkRun := TaskCheckRun{
RunID: input.RunID,
TaskID: input.TaskID,
AttemptNo: attempt.AttemptNo,
CheckName: checkName,
Status: checkStatus,
Summary: strings.TrimSpace(input.Summary),
Body: input.Body,
MetadataJSON: json.RawMessage(metadataJSON),
RecordedBy: defaultString(strings.TrimSpace(input.RecordedBy), "orch"),
CreatedAt: now,
UpdatedAt: now,
}
_, err = tx.ExecContext(
ctx,
`INSERT INTO check_runs (
run_id, task_id, attempt_no, check_name, status, summary, body,
metadata_json, recorded_by, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id, task_id, attempt_no, check_name) DO UPDATE SET
status = excluded.status,
summary = excluded.summary,
body = excluded.body,
metadata_json = excluded.metadata_json,
recorded_by = excluded.recorded_by,
updated_at = excluded.updated_at`,
checkRun.RunID,
checkRun.TaskID,
checkRun.AttemptNo,
checkRun.CheckName,
checkRun.Status,
checkRun.Summary,
checkRun.Body,
string(checkRun.MetadataJSON),
checkRun.RecordedBy,
formatTime(checkRun.CreatedAt),
formatTime(checkRun.UpdatedAt),
)
if err != nil {
return VerifyRecordResult{}, fmt.Errorf("upsert check run: %w", err)
}
checkRun, err = selectCheckRun(ctx, tx, checkRun.RunID, checkRun.TaskID, checkRun.AttemptNo, checkRun.CheckName)
if err != nil {
return VerifyRecordResult{}, err
}
if err := insertEvent(ctx, tx, eventInput{
RunID: task.RunID,
TaskID: task.TaskID,
ThreadID: attempt.ThreadID,
Source: "orch",
EventType: "task_verification_recorded",
Summary: defaultString(checkRun.Summary, fmt.Sprintf("%s: %s", checkRun.CheckName, checkRun.Status)),
PayloadJSON: marshalJSON(map[string]any{
"attempt_no": attempt.AttemptNo,
"check_name": checkRun.CheckName,
"status": checkRun.Status,
}),
CreatedAt: now,
}); err != nil {
return VerifyRecordResult{}, err
}
gate, err := buildVerificationGate(ctx, tx, task.RunID, task.TaskID, attempt.AttemptNo)
if err != nil {
return VerifyRecordResult{}, err
}
nextStatus := task.Status
switch {
case gate == nil:
nextStatus = task.Status
case gate.Status == "failed":
nextStatus = "failed"
case gate.Status == "passed":
nextStatus = "done"
default:
nextStatus = "verifying"
}
if nextStatus != task.Status {
_, err = tx.ExecContext(
ctx,
`UPDATE tasks
SET status = ?, updated_at = ?
WHERE run_id = ? AND task_id = ?`,
nextStatus,
formatTime(now),
task.RunID,
task.TaskID,
)
if err != nil {
return VerifyRecordResult{}, fmt.Errorf("update verified task status: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: task.RunID,
TaskID: task.TaskID,
ThreadID: attempt.ThreadID,
Source: "orch",
EventType: "task_" + nextStatus,
Summary: verificationSummary(nextStatus, gate, checkRun),
PayloadJSON: marshalJSON(map[string]any{
"attempt_no": attempt.AttemptNo,
"check_name": checkRun.CheckName,
"status": checkRun.Status,
}),
CreatedAt: now,
}); err != nil {
return VerifyRecordResult{}, err
}
task.Status = nextStatus
task.UpdatedAt = now
}
if nextStatus != attempt.Status {
_, err = tx.ExecContext(
ctx,
`UPDATE task_attempts
SET status = ?, updated_at = ?
WHERE run_id = ? AND task_id = ? AND attempt_no = ?`,
nextStatus,
formatTime(now),
attempt.RunID,
attempt.TaskID,
attempt.AttemptNo,
)
if err != nil {
return VerifyRecordResult{}, fmt.Errorf("update verified attempt status: %w", err)
}
attempt.Status = nextStatus
attempt.UpdatedAt = now
}
if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil {
return VerifyRecordResult{}, err
}
if err := attachTaskHarnessData(ctx, tx, &task, false); err != nil {
return VerifyRecordResult{}, err
}
if task.Gate != nil {
gate = task.Gate
}
if err := tx.Commit(); err != nil {
return VerifyRecordResult{}, fmt.Errorf("commit record check transaction: %w", err)
}
return VerifyRecordResult{
Task: task,
Attempt: attempt,
Check: checkRun,
Gate: gate,
}, nil
}
func (s *OrchStore) GetVerificationStatus(ctx context.Context, input VerificationStatusInput) (VerificationStatusResult, error) {
if strings.TrimSpace(input.RunID) == "" {
return VerificationStatusResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
if strings.TrimSpace(input.TaskID) == "" {
return VerificationStatusResult{}, fmt.Errorf("%w: task id is required", ErrInvalidInput)
}
task, err := selectTask(ctx, s.db, input.RunID, input.TaskID)
if err != nil {
return VerificationStatusResult{}, err
}
spec, err := selectTaskSpec(ctx, s.db, input.RunID, input.TaskID, false)
if err != nil {
return VerificationStatusResult{}, err
}
task.Spec = spec
var attempt *TaskAttempt
var gate *VerificationGate
if task.LatestAttemptNo > 0 {
attemptNo := input.AttemptNo
if attemptNo == 0 {
attemptNo = task.LatestAttemptNo
}
record, err := selectAttempt(ctx, s.db, input.RunID, input.TaskID, attemptNo)
if err != nil {
return VerificationStatusResult{}, err
}
attempt = &record
gate, err = buildVerificationGate(ctx, s.db, input.RunID, input.TaskID, attemptNo)
if err != nil {
return VerificationStatusResult{}, err
}
}
if gate == nil && spec != nil && len(spec.RequiredChecks) > 0 {
gate = &VerificationGate{
Status: "pending",
CheckProfile: spec.CheckProfile,
RequiredChecks: append([]string(nil), spec.RequiredChecks...),
PendingChecks: append([]string(nil), spec.RequiredChecks...),
}
}
task.Gate = gate
return VerificationStatusResult{
Task: task,
Attempt: attempt,
Spec: spec,
Gate: gate,
}, nil
}
func (s *OrchStore) GetRunOverview(ctx context.Context, runID string) (RunOverview, error) {
if strings.TrimSpace(runID) == "" {
return RunOverview{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
@@ -1924,7 +2340,7 @@ func (s *OrchStore) WaitForEvents(ctx context.Context, input WaitInput) (WaitRes
}
}
func listTasksForRun(ctx context.Context, db queryRowsContexter, runID string) ([]Task, error) {
func listTasksForRun(ctx context.Context, db queryRowsAndRower, runID string) ([]Task, error) {
rows, err := db.QueryContext(
ctx,
`SELECT
@@ -1946,6 +2362,9 @@ func listTasksForRun(ctx context.Context, db queryRowsContexter, runID string) (
if err != nil {
return nil, err
}
if err := attachTaskHarnessData(ctx, db, &task, false); err != nil {
return nil, err
}
tasks = append(tasks, task)
}
if err := rows.Err(); err != nil {
@@ -2147,6 +2566,82 @@ func scanAttempt(scanner threadScanner) (TaskAttempt, error) {
return attempt, nil
}
func scanTaskSpec(scanner threadScanner) (TaskSpec, error) {
var (
spec TaskSpec
requiredChecksJSON, allowedPathsJSON string
blockedPathsJSON, metadataJSON string
createdAt, updatedAt string
)
if err := scanner.Scan(
&spec.RunID,
&spec.TaskID,
&spec.SpecFile,
&spec.SpecSHA,
&spec.SpecBody,
&spec.CheckProfile,
&requiredChecksJSON,
&allowedPathsJSON,
&blockedPathsJSON,
&metadataJSON,
&createdAt,
&updatedAt,
); err != nil {
return TaskSpec{}, fmt.Errorf("scan task spec: %w", err)
}
requiredChecks, err := unmarshalStringList(requiredChecksJSON)
if err != nil {
return TaskSpec{}, err
}
allowedPaths, err := unmarshalStringList(allowedPathsJSON)
if err != nil {
return TaskSpec{}, err
}
blockedPaths, err := unmarshalStringList(blockedPathsJSON)
if err != nil {
return TaskSpec{}, err
}
spec.RequiredChecks = requiredChecks
spec.AllowedPaths = allowedPaths
spec.BlockedPaths = blockedPaths
spec.MetadataJSON = json.RawMessage(metadataJSON)
spec.CreatedAt = parseTime(createdAt)
spec.UpdatedAt = parseTime(updatedAt)
return spec, nil
}
func scanTaskCheckRun(scanner threadScanner) (TaskCheckRun, error) {
var (
check TaskCheckRun
metadataJSON string
createdAt, updatedAt string
)
if err := scanner.Scan(
&check.RunID,
&check.TaskID,
&check.AttemptNo,
&check.CheckName,
&check.Status,
&check.Summary,
&check.Body,
&metadataJSON,
&check.RecordedBy,
&createdAt,
&updatedAt,
); err != nil {
return TaskCheckRun{}, fmt.Errorf("scan task check run: %w", err)
}
check.MetadataJSON = json.RawMessage(metadataJSON)
check.CreatedAt = parseTime(createdAt)
check.UpdatedAt = parseTime(updatedAt)
return check, nil
}
func scanTaskAndAttempt(scanner threadScanner) (Task, TaskAttempt, error) {
var (
task Task
@@ -2269,6 +2764,85 @@ func selectAttempt(ctx context.Context, db queryRower, runID, taskID string, att
return attempt, err
}
func selectTaskSpec(ctx context.Context, db queryRower, runID, taskID string, includeBody bool) (*TaskSpec, error) {
columns := `run_id, task_id, spec_file, spec_sha, spec_body, check_profile,
required_checks_json, allowed_paths_json, blocked_paths_json, metadata_json,
created_at, updated_at`
if !includeBody {
columns = `run_id, task_id, spec_file, spec_sha, '' AS spec_body, check_profile,
required_checks_json, allowed_paths_json, blocked_paths_json, metadata_json,
created_at, updated_at`
}
row := db.QueryRowContext(
ctx,
`SELECT `+columns+`
FROM task_specs
WHERE run_id = ? AND task_id = ?`,
runID,
taskID,
)
spec, err := scanTaskSpec(row)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &spec, nil
}
func selectCheckRuns(ctx context.Context, db queryRowsContexter, runID, taskID string, attemptNo int) ([]TaskCheckRun, error) {
rows, err := db.QueryContext(
ctx,
`SELECT
run_id, task_id, attempt_no, check_name, status, summary, body,
metadata_json, recorded_by, created_at, updated_at
FROM check_runs
WHERE run_id = ? AND task_id = ? AND attempt_no = ?
ORDER BY check_name ASC`,
runID,
taskID,
attemptNo,
)
if err != nil {
return nil, fmt.Errorf("query check runs: %w", err)
}
defer rows.Close()
var checks []TaskCheckRun
for rows.Next() {
check, err := scanTaskCheckRun(rows)
if err != nil {
return nil, err
}
checks = append(checks, check)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate check runs: %w", err)
}
return checks, nil
}
func selectCheckRun(ctx context.Context, db queryRower, runID, taskID string, attemptNo int, checkName string) (TaskCheckRun, error) {
row := db.QueryRowContext(
ctx,
`SELECT
run_id, task_id, attempt_no, check_name, status, summary, body,
metadata_json, recorded_by, created_at, updated_at
FROM check_runs
WHERE run_id = ? AND task_id = ? AND attempt_no = ? AND check_name = ?`,
runID,
taskID,
attemptNo,
checkName,
)
check, err := scanTaskCheckRun(row)
if errors.Is(err, sql.ErrNoRows) {
return TaskCheckRun{}, fmt.Errorf("%w: check %s for %s/%s attempt %d not found", ErrInvalidState, checkName, runID, taskID, attemptNo)
}
return check, err
}
func selectLatestQuestionMessage(ctx context.Context, db queryRowsAndRower, threadID string) (Message, error) {
row := db.QueryRowContext(
ctx,
@@ -2368,6 +2942,93 @@ func loadArtifactsForMessageIDsFromQueryer(ctx context.Context, db queryRowsCont
return result, nil
}
func attachTaskHarnessData(ctx context.Context, db queryRowsAndRower, task *Task, includeSpecBody bool) error {
if task == nil {
return nil
}
spec, err := selectTaskSpec(ctx, db, task.RunID, task.TaskID, includeSpecBody)
if err != nil {
return err
}
task.Spec = spec
gate, err := buildVerificationGate(ctx, db, task.RunID, task.TaskID, task.LatestAttemptNo)
if err != nil {
return err
}
if gate == nil && spec != nil && len(spec.RequiredChecks) > 0 {
gate = &VerificationGate{
Status: "pending",
CheckProfile: spec.CheckProfile,
RequiredChecks: append([]string(nil), spec.RequiredChecks...),
PendingChecks: append([]string(nil), spec.RequiredChecks...),
}
}
task.Gate = gate
return nil
}
func buildVerificationGate(ctx context.Context, db queryRowsAndRower, runID, taskID string, attemptNo int) (*VerificationGate, error) {
spec, err := selectTaskSpec(ctx, db, runID, taskID, false)
if err != nil {
return nil, err
}
if spec == nil || len(spec.RequiredChecks) == 0 {
return nil, nil
}
gate := &VerificationGate{
Status: "pending",
AttemptNo: attemptNo,
CheckProfile: spec.CheckProfile,
RequiredChecks: append([]string(nil), spec.RequiredChecks...),
PendingChecks: append([]string(nil), spec.RequiredChecks...),
}
if attemptNo == 0 {
return gate, nil
}
checks, err := selectCheckRuns(ctx, db, runID, taskID, attemptNo)
if err != nil {
return nil, err
}
gate.Checks = checks
checkByName := make(map[string]TaskCheckRun, len(checks))
for _, check := range checks {
checkByName[check.CheckName] = check
}
pending := make([]string, 0, len(spec.RequiredChecks))
failed := make([]string, 0)
for _, checkName := range spec.RequiredChecks {
check, ok := checkByName[checkName]
if !ok || check.Status == "skipped" {
pending = append(pending, checkName)
continue
}
if check.Status == "failed" {
failed = append(failed, checkName)
}
if check.Status != "passed" && check.Status != "failed" {
pending = append(pending, checkName)
}
}
gate.PendingChecks = pending
gate.FailedChecks = failed
switch {
case len(failed) > 0:
gate.Status = "failed"
case len(pending) == 0:
gate.Status = "passed"
default:
gate.Status = "pending"
}
return gate, nil
}
func refreshReadyStates(ctx context.Context, tx *sql.Tx, runID string, now time.Time) error {
rows, err := tx.QueryContext(
ctx,
@@ -2538,6 +3199,9 @@ func deriveRunStatus(counts map[string]int) string {
if counts["running"] > 0 || counts["dispatched"] > 0 {
return "running"
}
if counts["verifying"] > 0 {
return "verifying"
}
if counts["ready"] > 0 {
return "ready"
}
@@ -2553,22 +3217,34 @@ func deriveRunStatus(counts map[string]int) string {
return "active"
}
func reconcileTaskStatus(threadStatus string) string {
func reconcileTaskStatus(ctx context.Context, db queryRowsAndRower, runID, taskID string, attemptNo int, currentTaskStatus, threadStatus string) (string, error) {
switch threadStatus {
case "pending":
return "dispatched"
return "dispatched", nil
case "claimed", "in_progress":
return "running"
return "running", nil
case "blocked":
return "blocked"
return "blocked", nil
case "done":
return "done"
gate, err := buildVerificationGate(ctx, db, runID, taskID, attemptNo)
if err != nil {
return "", err
}
if gate != nil {
switch currentTaskStatus {
case "done", "failed":
return currentTaskStatus, nil
default:
return "verifying", nil
}
}
return "done", nil
case "failed":
return "failed"
return "failed", nil
case "cancelled":
return "cancelled"
return "cancelled", nil
default:
return ""
return "", nil
}
}
@@ -2622,6 +3298,83 @@ func validateAndNormalizeJSONDefault(fieldName, value, defaultValue string) (str
return compact.String(), nil
}
func normalizeStringList(values []string) []string {
normalized := make([]string, 0, len(values))
seen := make(map[string]struct{}, len(values))
for _, value := range values {
value = strings.TrimSpace(value)
if value == "" {
continue
}
if _, ok := seen[value]; ok {
continue
}
seen[value] = struct{}{}
normalized = append(normalized, value)
}
return normalized
}
func marshalStringList(fieldName string, values []string) (string, error) {
encoded, err := json.Marshal(values)
if err != nil {
return "", fmt.Errorf("%w: %s must be serializable", ErrInvalidInput, fieldName)
}
return string(encoded), nil
}
func unmarshalStringList(raw string) ([]string, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
raw = "[]"
}
var values []string
if err := json.Unmarshal([]byte(raw), &values); err != nil {
return nil, fmt.Errorf("%w: invalid string list JSON", ErrInvalidInput)
}
return normalizeStringList(values), nil
}
func shouldPersistTaskSpec(input AddTaskInput, metadataJSON string, requiredChecks, allowedPaths, blockedPaths []string) bool {
return strings.TrimSpace(input.SpecFile) != "" ||
strings.TrimSpace(input.SpecSHA) != "" ||
strings.TrimSpace(input.SpecBody) != "" ||
strings.TrimSpace(input.CheckProfile) != "" ||
len(requiredChecks) > 0 ||
len(allowedPaths) > 0 ||
len(blockedPaths) > 0 ||
strings.TrimSpace(metadataJSON) != "" && strings.TrimSpace(metadataJSON) != "{}"
}
func normalizeCheckStatus(status string) (string, error) {
status = strings.TrimSpace(status)
switch status {
case "passed", "failed", "skipped":
return status, nil
default:
return "", fmt.Errorf("%w: check status must be one of passed, failed, skipped", ErrInvalidInput)
}
}
func verificationSummary(nextStatus string, gate *VerificationGate, check TaskCheckRun) string {
switch nextStatus {
case "done":
return fmt.Sprintf("verification passed after %s", check.CheckName)
case "failed":
if gate != nil && len(gate.FailedChecks) > 0 {
return fmt.Sprintf("verification failed: %s", strings.Join(gate.FailedChecks, ", "))
}
return fmt.Sprintf("verification failed after %s", check.CheckName)
case "verifying":
if gate != nil && len(gate.PendingChecks) > 0 {
return fmt.Sprintf("waiting on checks: %s", strings.Join(gate.PendingChecks, ", "))
}
return fmt.Sprintf("recorded verification for %s", check.CheckName)
default:
return defaultString(check.Summary, fmt.Sprintf("%s: %s", check.CheckName, check.Status))
}
}
func buildDispatchPayload(task Task, attemptNo int, workspace DispatchWorkspace) string {
payload := map[string]any{
"run_id": task.RunID,
@@ -2638,6 +3391,26 @@ func buildDispatchPayload(task Task, attemptNo int, workspace DispatchWorkspace)
payload["acceptance"] = acceptance
}
}
if task.Spec != nil {
specPayload := map[string]any{
"file": task.Spec.SpecFile,
"sha": task.Spec.SpecSHA,
"check_profile": task.Spec.CheckProfile,
"required_checks": task.Spec.RequiredChecks,
"allowed_paths": task.Spec.AllowedPaths,
"blocked_paths": task.Spec.BlockedPaths,
}
if strings.TrimSpace(task.Spec.SpecBody) != "" {
specPayload["body"] = task.Spec.SpecBody
}
if len(task.Spec.MetadataJSON) > 0 {
var metadata any
if err := json.Unmarshal(task.Spec.MetadataJSON, &metadata); err == nil {
specPayload["metadata"] = metadata
}
}
payload["spec"] = specPayload
}
if strings.TrimSpace(workspace.ExecutionMode) != "" {
payload["execution_mode"] = strings.TrimSpace(workspace.ExecutionMode)
}