Add orch control commands

This commit is contained in:
2026-03-19 14:21:20 +08:00
parent f1785b314f
commit ae272855f6
10 changed files with 1644 additions and 43 deletions
+715 -29
View File
@@ -9,6 +9,8 @@ import (
"fmt"
"strings"
"time"
"ai-workflow-skill/internal/protocol"
)
var ErrRunNotFound = errors.New("run not found")
@@ -173,6 +175,65 @@ type AnswerResult struct {
Message Message `json:"message"`
}
type RetryInput struct {
RunID string
TaskID string
ToAgent string
Body string
PrepareWorkspace DispatchWorkspacePreparer
}
type RetryResult struct {
Task Task `json:"task"`
Attempt TaskAttempt `json:"attempt"`
Thread Thread `json:"thread"`
Message Message `json:"message"`
PreviousAttempt TaskAttempt `json:"previous_attempt"`
}
type ReassignInput struct {
RunID string
TaskID string
ToAgent string
Reason string
PrepareWorkspace DispatchWorkspacePreparer
}
type ReassignResult struct {
Task Task `json:"task"`
Attempt TaskAttempt `json:"attempt"`
Thread Thread `json:"thread"`
Message Message `json:"message"`
PreviousAttempt TaskAttempt `json:"previous_attempt"`
}
type CancelControlInput struct {
RunID string
TaskID string
Reason string
}
type CancelResult struct {
Run Run `json:"run"`
CancelledTasks []Task `json:"cancelled_tasks"`
}
type CleanupInput struct {
RunID string
TaskID string
AttemptNo int
AllCompleted bool
Force bool
}
type CleanupCandidate struct {
Attempt TaskAttempt `json:"attempt"`
}
type CleanupRecord struct {
Attempt TaskAttempt `json:"attempt"`
}
func NewOrchStore(db *sql.DB) *OrchStore {
return &OrchStore{db: db}
}
@@ -502,30 +563,542 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
return DispatchResult{}, fmt.Errorf("%w: task %s is not ready for dispatch", ErrInvalidState, task.TaskID)
}
assignedTo := defaultString(strings.TrimSpace(input.ToAgent), task.DefaultTo)
result, finalizeWorkspace, err := s.dispatchTaskTx(ctx, tx, task, strings.TrimSpace(input.ToAgent), input.Body, strings.TrimSpace(input.BaseRef), input.PrepareWorkspace, now)
if err != nil {
return DispatchResult{}, err
}
workspaceCommitted := false
defer func() {
finalizeWorkspace(workspaceCommitted)
}()
if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil {
return DispatchResult{}, err
}
if err := tx.Commit(); err != nil {
return DispatchResult{}, fmt.Errorf("commit dispatch transaction: %w", err)
}
workspaceCommitted = true
return result, nil
}
func (s *OrchStore) GetTaskWithLatestAttempt(ctx context.Context, runID, taskID string) (Task, *TaskAttempt, error) {
task, err := selectTask(ctx, s.db, runID, taskID)
if err != nil {
return Task{}, nil, err
}
if task.LatestAttemptNo == 0 {
return task, nil, nil
}
attempt, err := selectAttempt(ctx, s.db, runID, taskID, task.LatestAttemptNo)
if err != nil {
return Task{}, nil, err
}
return task, &attempt, nil
}
func (s *OrchStore) RetryTask(ctx context.Context, input RetryInput) (RetryResult, error) {
if strings.TrimSpace(input.RunID) == "" {
return RetryResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
if strings.TrimSpace(input.TaskID) == "" {
return RetryResult{}, fmt.Errorf("%w: task id is required", ErrInvalidInput)
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return RetryResult{}, fmt.Errorf("begin retry transaction: %w", err)
}
defer tx.Rollback()
if _, err := selectRun(ctx, tx, input.RunID); err != nil {
return RetryResult{}, err
}
task, err := selectTask(ctx, tx, input.RunID, input.TaskID)
if err != nil {
return RetryResult{}, err
}
if task.Status != "failed" {
return RetryResult{}, fmt.Errorf("%w: task %s is not failed", ErrInvalidState, task.TaskID)
}
if task.LatestAttemptNo == 0 {
return RetryResult{}, fmt.Errorf("%w: task %s has no attempt to retry", ErrInvalidState, task.TaskID)
}
previousAttempt, err := selectAttempt(ctx, tx, task.RunID, task.TaskID, task.LatestAttemptNo)
if err != nil {
return RetryResult{}, err
}
result, finalizeWorkspace, err := s.dispatchTaskTx(
ctx,
tx,
task,
strings.TrimSpace(input.ToAgent),
input.Body,
defaultString(previousAttempt.BaseRef, previousAttempt.BaseCommit),
input.PrepareWorkspace,
now,
)
if err != nil {
return RetryResult{}, err
}
workspaceCommitted := false
defer func() {
finalizeWorkspace(workspaceCommitted)
}()
_, err = tx.ExecContext(
ctx,
`UPDATE task_attempts
SET workspace_status = CASE
WHEN workspace_status = 'cleaned' THEN workspace_status
ELSE ?
END,
updated_at = ?
WHERE run_id = ? AND task_id = ? AND attempt_no = ?`,
"abandoned",
formatTime(now),
previousAttempt.RunID,
previousAttempt.TaskID,
previousAttempt.AttemptNo,
)
if err != nil {
return RetryResult{}, fmt.Errorf("mark previous retry attempt abandoned: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: task.RunID,
TaskID: task.TaskID,
ThreadID: result.Thread.ThreadID,
Source: "orch",
EventType: "task_retried",
MessageID: result.Message.MessageID,
Summary: result.Message.Summary,
PayloadJSON: marshalJSON(map[string]any{
"previous_attempt_no": previousAttempt.AttemptNo,
"previous_thread_id": previousAttempt.ThreadID,
"attempt_no": result.Attempt.AttemptNo,
"thread_id": result.Attempt.ThreadID,
}),
CreatedAt: now,
}); err != nil {
return RetryResult{}, err
}
if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil {
return RetryResult{}, err
}
if err := tx.Commit(); err != nil {
return RetryResult{}, fmt.Errorf("commit retry transaction: %w", err)
}
workspaceCommitted = true
return RetryResult{
Task: result.Task,
Attempt: result.Attempt,
Thread: result.Thread,
Message: result.Message,
PreviousAttempt: previousAttempt,
}, nil
}
func (s *OrchStore) ReassignTask(ctx context.Context, input ReassignInput) (ReassignResult, error) {
if strings.TrimSpace(input.RunID) == "" {
return ReassignResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
if strings.TrimSpace(input.TaskID) == "" {
return ReassignResult{}, fmt.Errorf("%w: task id is required", ErrInvalidInput)
}
if strings.TrimSpace(input.ToAgent) == "" {
return ReassignResult{}, fmt.Errorf("%w: destination agent is required", ErrInvalidInput)
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return ReassignResult{}, fmt.Errorf("begin reassign transaction: %w", err)
}
defer tx.Rollback()
if _, err := selectRun(ctx, tx, input.RunID); err != nil {
return ReassignResult{}, err
}
task, err := selectTask(ctx, tx, input.RunID, input.TaskID)
if err != nil {
return ReassignResult{}, err
}
if task.Status != "blocked" && task.Status != "failed" {
return ReassignResult{}, fmt.Errorf("%w: task %s is not blocked or failed", ErrInvalidState, task.TaskID)
}
if task.LatestAttemptNo == 0 {
return ReassignResult{}, fmt.Errorf("%w: task %s has no attempt to reassign", ErrInvalidState, task.TaskID)
}
previousAttempt, err := selectAttempt(ctx, tx, task.RunID, task.TaskID, task.LatestAttemptNo)
if err != nil {
return ReassignResult{}, err
}
if task.Status == "blocked" && previousAttempt.ThreadID != "" {
thread, err := selectThread(ctx, tx, previousAttempt.ThreadID)
if err != nil && !errors.Is(err, ErrThreadNotFound) {
return ReassignResult{}, err
}
if err == nil && !isTerminalStatus(thread.Status) {
if err := cancelThreadTx(ctx, tx, thread, defaultString(input.Reason, "task reassigned"), now); err != nil {
return ReassignResult{}, err
}
}
_, err = tx.ExecContext(
ctx,
`UPDATE task_attempts
SET status = ?, workspace_status = CASE
WHEN workspace_status = 'cleaned' THEN workspace_status
ELSE ?
END,
updated_at = ?
WHERE run_id = ? AND task_id = ? AND attempt_no = ?`,
"cancelled",
"abandoned",
formatTime(now),
previousAttempt.RunID,
previousAttempt.TaskID,
previousAttempt.AttemptNo,
)
if err != nil {
return ReassignResult{}, fmt.Errorf("mark previous blocked attempt abandoned: %w", err)
}
} else {
_, err = tx.ExecContext(
ctx,
`UPDATE task_attempts
SET workspace_status = CASE
WHEN workspace_status = 'cleaned' THEN workspace_status
ELSE ?
END,
updated_at = ?
WHERE run_id = ? AND task_id = ? AND attempt_no = ?`,
"abandoned",
formatTime(now),
previousAttempt.RunID,
previousAttempt.TaskID,
previousAttempt.AttemptNo,
)
if err != nil {
return ReassignResult{}, fmt.Errorf("mark previous attempt abandoned: %w", err)
}
}
result, finalizeWorkspace, err := s.dispatchTaskTx(
ctx,
tx,
task,
strings.TrimSpace(input.ToAgent),
input.Reason,
defaultString(previousAttempt.BaseRef, previousAttempt.BaseCommit),
input.PrepareWorkspace,
now,
)
if err != nil {
return ReassignResult{}, err
}
workspaceCommitted := false
defer func() {
finalizeWorkspace(workspaceCommitted)
}()
if err := insertEvent(ctx, tx, eventInput{
RunID: task.RunID,
TaskID: task.TaskID,
ThreadID: result.Thread.ThreadID,
Source: "orch",
EventType: "task_reassigned",
MessageID: result.Message.MessageID,
Summary: defaultString(input.Reason, result.Message.Summary),
PayloadJSON: marshalJSON(map[string]any{
"previous_attempt_no": previousAttempt.AttemptNo,
"previous_thread_id": previousAttempt.ThreadID,
"from_agent": previousAttempt.AssignedTo,
"to_agent": result.Attempt.AssignedTo,
"attempt_no": result.Attempt.AttemptNo,
"thread_id": result.Attempt.ThreadID,
}),
CreatedAt: now,
}); err != nil {
return ReassignResult{}, err
}
if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil {
return ReassignResult{}, err
}
if err := tx.Commit(); err != nil {
return ReassignResult{}, fmt.Errorf("commit reassign transaction: %w", err)
}
workspaceCommitted = true
return ReassignResult{
Task: result.Task,
Attempt: result.Attempt,
Thread: result.Thread,
Message: result.Message,
PreviousAttempt: previousAttempt,
}, nil
}
func (s *OrchStore) Cancel(ctx context.Context, input CancelControlInput) (CancelResult, error) {
if strings.TrimSpace(input.RunID) == "" {
return CancelResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return CancelResult{}, fmt.Errorf("begin cancel transaction: %w", err)
}
defer tx.Rollback()
run, err := selectRun(ctx, tx, input.RunID)
if err != nil {
return CancelResult{}, err
}
var tasks []Task
if strings.TrimSpace(input.TaskID) != "" {
task, err := selectTask(ctx, tx, input.RunID, input.TaskID)
if err != nil {
return CancelResult{}, err
}
tasks = append(tasks, task)
} else {
tasks, err = listTasksForRun(ctx, tx, input.RunID)
if err != nil {
return CancelResult{}, err
}
}
cancelledTasks := make([]Task, 0, len(tasks))
for _, task := range tasks {
if task.Status == "cancelled" {
if strings.TrimSpace(input.TaskID) != "" {
return CancelResult{}, fmt.Errorf("%w: task %s is already cancelled", ErrInvalidState, task.TaskID)
}
continue
}
cancelledTask, err := cancelTaskTx(ctx, tx, task, defaultString(input.Reason, "task cancelled"), now)
if err != nil {
return CancelResult{}, err
}
cancelledTasks = append(cancelledTasks, cancelledTask)
}
if len(cancelledTasks) == 0 && len(tasks) == 0 {
_, err = tx.ExecContext(
ctx,
`UPDATE runs SET status = ?, updated_at = ? WHERE run_id = ?`,
"cancelled",
formatTime(now),
run.RunID,
)
if err != nil {
return CancelResult{}, fmt.Errorf("cancel empty run: %w", err)
}
}
if err := insertEvent(ctx, tx, eventInput{
RunID: run.RunID,
Source: "orch",
EventType: "run_cancelled",
Summary: defaultString(input.Reason, "run cancelled"),
PayloadJSON: marshalJSON(map[string]any{
"task_id": input.TaskID,
"reason": input.Reason,
}),
CreatedAt: now,
}); err != nil {
return CancelResult{}, err
}
if err := updateRunAggregateStatus(ctx, tx, run.RunID, now); err != nil {
return CancelResult{}, err
}
run, err = selectRun(ctx, tx, run.RunID)
if err != nil {
return CancelResult{}, err
}
if err := tx.Commit(); err != nil {
return CancelResult{}, fmt.Errorf("commit cancel transaction: %w", err)
}
return CancelResult{
Run: run,
CancelledTasks: cancelledTasks,
}, nil
}
func (s *OrchStore) ListCleanupCandidates(ctx context.Context, input CleanupInput) ([]CleanupCandidate, error) {
if strings.TrimSpace(input.RunID) == "" {
return nil, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
if input.AttemptNo > 0 && strings.TrimSpace(input.TaskID) == "" {
return nil, fmt.Errorf("%w: task id is required when attempt is specified", ErrInvalidInput)
}
if !input.AllCompleted && strings.TrimSpace(input.TaskID) == "" && input.AttemptNo == 0 {
return nil, fmt.Errorf("%w: specify --task, --attempt, or --all-completed", ErrInvalidInput)
}
if _, err := s.GetRun(ctx, input.RunID); err != nil {
return nil, err
}
conditions := []string{"run_id = ?", "worktree_path <> ''", "workspace_status <> 'cleaned'"}
args := []any{input.RunID}
if strings.TrimSpace(input.TaskID) != "" {
conditions = append(conditions, "task_id = ?")
args = append(args, strings.TrimSpace(input.TaskID))
}
if input.AttemptNo > 0 {
conditions = append(conditions, "attempt_no = ?")
args = append(args, input.AttemptNo)
}
if !input.Force {
conditions = append(conditions, "workspace_status IN (?, ?)")
args = append(args, "completed", "abandoned")
}
query := `SELECT
run_id, task_id, attempt_no, assigned_to, thread_id, base_ref, base_commit,
branch_name, worktree_path, workspace_status, result_commit, status,
created_at, updated_at
FROM task_attempts
WHERE ` + strings.Join(conditions, " AND ") + `
ORDER BY run_id, task_id, attempt_no ASC`
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query cleanup candidates: %w", err)
}
defer rows.Close()
var candidates []CleanupCandidate
for rows.Next() {
attempt, err := scanAttempt(rows)
if err != nil {
return nil, err
}
candidates = append(candidates, CleanupCandidate{Attempt: attempt})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate cleanup candidates: %w", err)
}
if len(candidates) == 0 {
return nil, protocol.NoMatchingWork("no cleanup candidates matched the requested filters")
}
return candidates, nil
}
func (s *OrchStore) MarkAttemptsCleaned(ctx context.Context, records []CleanupRecord) ([]TaskAttempt, error) {
if len(records) == 0 {
return nil, nil
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("begin cleanup commit transaction: %w", err)
}
defer tx.Rollback()
cleaned := make([]TaskAttempt, 0, len(records))
for _, record := range records {
attempt := record.Attempt
_, err := tx.ExecContext(
ctx,
`UPDATE task_attempts
SET workspace_status = ?, updated_at = ?
WHERE run_id = ? AND task_id = ? AND attempt_no = ?`,
"cleaned",
formatTime(now),
attempt.RunID,
attempt.TaskID,
attempt.AttemptNo,
)
if err != nil {
return nil, fmt.Errorf("mark attempt cleaned: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: attempt.RunID,
TaskID: attempt.TaskID,
ThreadID: attempt.ThreadID,
Source: "orch",
EventType: "workspace_cleaned",
Summary: fmt.Sprintf("cleaned workspace for %s/%s attempt %d", attempt.RunID, attempt.TaskID, attempt.AttemptNo),
PayloadJSON: marshalJSON(map[string]any{
"attempt_no": attempt.AttemptNo,
"worktree_path": attempt.WorktreePath,
}),
CreatedAt: now,
}); err != nil {
return nil, err
}
attempt.WorkspaceStatus = "cleaned"
attempt.UpdatedAt = now
cleaned = append(cleaned, attempt)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("commit cleanup transaction: %w", err)
}
return cleaned, nil
}
func (s *OrchStore) dispatchTaskTx(
ctx context.Context,
tx *sql.Tx,
task Task,
toAgent string,
body string,
baseRef string,
prepareWorkspace DispatchWorkspacePreparer,
now time.Time,
) (DispatchResult, func(bool), error) {
assignedTo := defaultString(strings.TrimSpace(toAgent), task.DefaultTo)
if assignedTo == "" {
return DispatchResult{}, fmt.Errorf("%w: dispatch target agent is required", ErrInvalidInput)
return DispatchResult{}, nil, fmt.Errorf("%w: dispatch target agent is required", ErrInvalidInput)
}
attemptNo := task.LatestAttemptNo + 1
workspace := DispatchWorkspace{
BaseRef: strings.TrimSpace(input.BaseRef),
BaseRef: strings.TrimSpace(baseRef),
}
cleanupWorkspace := func() {}
workspaceCommitted := false
if input.PrepareWorkspace != nil {
workspace, cleanupWorkspace, err = input.PrepareWorkspace(task, attemptNo)
finalizeWorkspace := func(success bool) {}
if prepareWorkspace != nil {
cleanupWorkspace := func() {}
var err error
workspace, cleanupWorkspace, err = prepareWorkspace(task, attemptNo)
if err != nil {
return DispatchResult{}, err
return DispatchResult{}, nil, err
}
if cleanupWorkspace == nil {
cleanupWorkspace = func() {}
}
defer func() {
if !workspaceCommitted {
finalizeWorkspace = func(success bool) {
if !success {
cleanupWorkspace()
}
}()
}
}
threadID := newID("thr")
@@ -545,7 +1118,7 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
UpdatedAt: now,
}
_, err = tx.ExecContext(
_, err := tx.ExecContext(
ctx,
`INSERT INTO threads (
thread_id, run_id, task_id, subject, created_by, assigned_to, status,
@@ -564,7 +1137,7 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
formatTime(thread.UpdatedAt),
)
if err != nil {
return DispatchResult{}, fmt.Errorf("insert dispatch thread: %w", err)
return DispatchResult{}, finalizeWorkspace, fmt.Errorf("insert dispatch thread: %w", err)
}
message := Message{
@@ -574,12 +1147,12 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
ToAgent: assignedTo,
Kind: "task",
Summary: defaultString(task.Summary, task.Title),
Body: input.Body,
Body: body,
PayloadJSON: json.RawMessage(payloadJSON),
CreatedAt: now,
}
if err := insertMessage(ctx, tx, message); err != nil {
return DispatchResult{}, err
return DispatchResult{}, finalizeWorkspace, err
}
if err := insertEvent(ctx, tx, eventInput{
RunID: thread.RunID,
@@ -592,7 +1165,7 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
PayloadJSON: payloadJSON,
CreatedAt: now,
}); err != nil {
return DispatchResult{}, err
return DispatchResult{}, finalizeWorkspace, err
}
attempt := TaskAttempt{
@@ -633,7 +1206,7 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
formatTime(attempt.UpdatedAt),
)
if err != nil {
return DispatchResult{}, fmt.Errorf("insert task attempt: %w", err)
return DispatchResult{}, finalizeWorkspace, fmt.Errorf("insert task attempt: %w", err)
}
_, err = tx.ExecContext(
@@ -648,7 +1221,7 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
task.TaskID,
)
if err != nil {
return DispatchResult{}, fmt.Errorf("update task dispatch status: %w", err)
return DispatchResult{}, finalizeWorkspace, fmt.Errorf("update task dispatch status: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
@@ -662,18 +1235,9 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
PayloadJSON: payloadJSON,
CreatedAt: now,
}); err != nil {
return DispatchResult{}, err
return DispatchResult{}, finalizeWorkspace, err
}
if err := updateRunAggregateStatus(ctx, tx, task.RunID, now); err != nil {
return DispatchResult{}, err
}
if err := tx.Commit(); err != nil {
return DispatchResult{}, fmt.Errorf("commit dispatch transaction: %w", err)
}
workspaceCommitted = true
task.Status = "dispatched"
task.LatestAttemptNo = attempt.AttemptNo
task.UpdatedAt = now
@@ -683,7 +1247,129 @@ func (s *OrchStore) DispatchTask(ctx context.Context, input DispatchInput) (Disp
Attempt: attempt,
Thread: thread,
Message: message,
}, nil
}, finalizeWorkspace, nil
}
func cancelTaskTx(ctx context.Context, tx *sql.Tx, task Task, reason string, now time.Time) (Task, error) {
if task.LatestAttemptNo > 0 {
attempt, err := selectAttempt(ctx, tx, task.RunID, task.TaskID, task.LatestAttemptNo)
if err != nil {
return Task{}, err
}
if attempt.ThreadID != "" {
thread, err := selectThread(ctx, tx, attempt.ThreadID)
if err != nil && !errors.Is(err, ErrThreadNotFound) {
return Task{}, err
}
if err == nil && !isTerminalStatus(thread.Status) {
if err := cancelThreadTx(ctx, tx, thread, reason, now); err != nil {
return Task{}, err
}
}
}
attemptStatus := attempt.Status
if attemptStatus != "done" && attemptStatus != "failed" && attemptStatus != "cancelled" {
attemptStatus = "cancelled"
}
workspaceStatus := attempt.WorkspaceStatus
if workspaceStatus != "cleaned" {
workspaceStatus = "abandoned"
}
_, err = tx.ExecContext(
ctx,
`UPDATE task_attempts
SET status = ?, workspace_status = ?, updated_at = ?
WHERE run_id = ? AND task_id = ? AND attempt_no = ?`,
attemptStatus,
nullIfEmpty(workspaceStatus),
formatTime(now),
attempt.RunID,
attempt.TaskID,
attempt.AttemptNo,
)
if err != nil {
return Task{}, fmt.Errorf("update cancelled attempt: %w", err)
}
}
_, err := tx.ExecContext(
ctx,
`UPDATE tasks
SET status = ?, updated_at = ?
WHERE run_id = ? AND task_id = ?`,
"cancelled",
formatTime(now),
task.RunID,
task.TaskID,
)
if err != nil {
return Task{}, fmt.Errorf("update cancelled task: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: task.RunID,
TaskID: task.TaskID,
Source: "orch",
EventType: "task_cancelled",
Summary: defaultString(reason, "task cancelled"),
PayloadJSON: marshalJSON(map[string]any{"reason": reason}),
CreatedAt: now,
}); err != nil {
return Task{}, err
}
task.Status = "cancelled"
task.UpdatedAt = now
return task, nil
}
func cancelThreadTx(ctx context.Context, tx *sql.Tx, thread Thread, reason string, now time.Time) error {
messageID := newID("msg")
summary := defaultString(reason, "thread cancelled")
message := Message{
MessageID: messageID,
ThreadID: thread.ThreadID,
FromAgent: "orch",
ToAgent: thread.AssignedTo,
Kind: "control",
Summary: summary,
Body: reason,
PayloadJSON: json.RawMessage(`{}`),
CreatedAt: now,
}
if err := insertMessage(ctx, tx, message); err != nil {
return err
}
if err := updateThreadState(ctx, tx, thread.ThreadID, "cancelled", thread.AssignedTo, message.MessageID, now); err != nil {
return err
}
if _, err := tx.ExecContext(
ctx,
`UPDATE leases
SET released_at = ?
WHERE thread_id = ?
AND released_at IS NULL`,
formatTime(now),
thread.ThreadID,
); err != nil {
return fmt.Errorf("release lease on orch cancel: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: thread.RunID,
TaskID: thread.TaskID,
ThreadID: thread.ThreadID,
Source: "inbox",
EventType: "thread_cancelled",
MessageID: message.MessageID,
Summary: message.Summary,
PayloadJSON: string(message.PayloadJSON),
CreatedAt: now,
}); err != nil {
return err
}
return nil
}
func (s *OrchStore) ReconcileRun(ctx context.Context, runID string) (ReconcileResult, error) {