orch: require explicit dispatch execution mode

This commit is contained in:
2026-03-20 19:27:30 +08:00
parent 7840b2767f
commit 5859ff219e
43 changed files with 277 additions and 312 deletions
+6 -1
View File
@@ -159,6 +159,7 @@ type WaitResult struct {
}
type DispatchWorkspace struct {
ExecutionMode string `json:"execution_mode,omitempty"`
BaseRef string `json:"base_ref,omitempty"`
BaseCommit string `json:"base_commit,omitempty"`
BranchName string `json:"branch_name,omitempty"`
@@ -1094,7 +1095,8 @@ func (s *OrchStore) dispatchTaskTx(
attemptNo := task.LatestAttemptNo + 1
workspace := DispatchWorkspace{
BaseRef: strings.TrimSpace(baseRef),
ExecutionMode: "analysis",
BaseRef: strings.TrimSpace(baseRef),
}
finalizeWorkspace := func(success bool) {}
if prepareWorkspace != nil {
@@ -2636,6 +2638,9 @@ func buildDispatchPayload(task Task, attemptNo int, workspace DispatchWorkspace)
payload["acceptance"] = acceptance
}
}
if strings.TrimSpace(workspace.ExecutionMode) != "" {
payload["execution_mode"] = strings.TrimSpace(workspace.ExecutionMode)
}
if strings.TrimSpace(workspace.BaseRef) != "" {
payload["base_ref"] = strings.TrimSpace(workspace.BaseRef)
}
@@ -182,6 +182,7 @@ func seedBlockedTaskForAnswerCleanupEdgeTests(t *testing.T, dbPath, runID, taskI
"dispatch",
"--run", runID,
"--task", taskID,
"--execution-mode", "analysis",
)
var dispatchResp map[string]any
@@ -89,6 +89,7 @@ func TestOrchDispatchCreatesAttemptAndThreadForReadyTask(t *testing.T) {
"dispatch",
"--run", "run_blog_dispatch_001",
"--task", "T1",
"--execution-mode", "analysis",
"--body", "Implement retry handling for the HTTP client.",
)
@@ -167,6 +168,7 @@ func TestOrchBlockedListsLatestQuestionForBlockedTask(t *testing.T) {
"dispatch",
"--run", "run_blog_blocked_001",
"--task", "T1",
"--execution-mode", "analysis",
)
var firstDispatchResp map[string]any
@@ -205,6 +207,7 @@ func TestOrchBlockedListsLatestQuestionForBlockedTask(t *testing.T) {
"dispatch",
"--run", "run_blog_blocked_001",
"--task", "T2",
"--execution-mode", "analysis",
)
var secondDispatchResp map[string]any
@@ -305,6 +308,7 @@ func TestOrchStatusReturnsRunSummaryAndTaskList(t *testing.T) {
"dispatch",
"--run", "run_blog_status_001",
"--task", "T1",
"--execution-mode", "analysis",
"--body", "Implement retry handling for the HTTP client.",
)
@@ -420,6 +424,7 @@ func TestOrchStatusAutoReconcilesAndIncludesBlockedContext(t *testing.T) {
"dispatch",
"--run", "run_blog_status_002",
"--task", "T1",
"--execution-mode", "analysis",
"--body", "Implement retry handling for the HTTP client.",
)
@@ -522,6 +527,7 @@ func TestOrchReconcileMapsFailedThreadToTerminalTaskState(t *testing.T) {
"dispatch",
"--run", "run_blog_reconcile_001",
"--task", "T1",
"--execution-mode", "analysis",
"--body", "Implement retry handling for the HTTP client.",
)
@@ -595,7 +601,7 @@ func TestOrchReconcileMapsFailedThreadToTerminalTaskState(t *testing.T) {
}
}
func TestOrchWorkflowStrictWorktreeDispatchToCleanup(t *testing.T) {
func TestOrchWorkflowCodeModeDispatchToCleanup(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
@@ -627,9 +633,9 @@ func TestOrchWorkflowStrictWorktreeDispatchToCleanup(t *testing.T) {
"dispatch",
"--run", "run_blog_workflow_worktree_001",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
"--body", "Implement inside isolated worktree.",
)
@@ -10,15 +10,15 @@ import (
)
type dispatchOptions struct {
runID string
taskID string
toAgent string
body string
bodyFile string
baseRef string
repoPath string
workspaceRoot string
strictWorktree bool
runID string
taskID string
toAgent string
body string
bodyFile string
executionMode string
baseRef string
repoPath string
workspaceRoot string
}
func newDispatchCmd(root *rootOptions) *cobra.Command {
@@ -28,6 +28,11 @@ func newDispatchCmd(root *rootOptions) *cobra.Command {
Use: "dispatch",
Short: "Dispatch a ready task to a worker through inbox",
RunE: func(cmd *cobra.Command, args []string) error {
normalizedOpts, err := normalizeDispatchOptions(*opts)
if err != nil {
return err
}
body, err := resolveBodyValue(opts.body, opts.bodyFile)
if err != nil {
return err
@@ -41,12 +46,12 @@ func newDispatchCmd(root *rootOptions) *cobra.Command {
defer sqlDB.Close()
result, err := store.NewOrchStore(sqlDB).DispatchTask(ctx, store.DispatchInput{
RunID: opts.runID,
TaskID: opts.taskID,
ToAgent: opts.toAgent,
RunID: normalizedOpts.runID,
TaskID: normalizedOpts.taskID,
ToAgent: normalizedOpts.toAgent,
Body: body,
BaseRef: opts.baseRef,
PrepareWorkspace: newDispatchWorkspacePreparer(cmd, *opts),
BaseRef: normalizedOpts.baseRef,
PrepareWorkspace: newDispatchWorkspacePreparer(cmd, normalizedOpts),
})
if err != nil {
return err
@@ -82,12 +87,13 @@ func newDispatchCmd(root *rootOptions) *cobra.Command {
cmd.Flags().StringVar(&opts.toAgent, "to", "", "Worker agent override")
cmd.Flags().StringVar(&opts.body, "body", "", "Task message body")
cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read task message body from file")
cmd.Flags().StringVar(&opts.executionMode, "execution-mode", "", "Execution mode: analysis or code")
cmd.Flags().StringVar(&opts.baseRef, "base-ref", "", "Optional base ref to record on the attempt")
cmd.Flags().StringVar(&opts.repoPath, "repo-path", "", "Source repository path for worktree dispatch")
cmd.Flags().StringVar(&opts.workspaceRoot, "workspace-root", "", "Workspace root for worktree dispatch")
cmd.Flags().BoolVar(&opts.strictWorktree, "strict-worktree", false, "Require strict worktree setup")
_ = cmd.MarkFlagRequired("run")
_ = cmd.MarkFlagRequired("task")
_ = cmd.MarkFlagRequired("execution-mode")
return cmd
}
@@ -71,6 +71,7 @@ func TestOrchRunDispatchReconcileLifecycle(t *testing.T) {
"dispatch",
"--run", "run_blog_001",
"--task", "T1",
"--execution-mode", "analysis",
"--body", "Implement retry handling for the HTTP client.",
)
@@ -256,6 +257,7 @@ func TestOrchDependencyBlockedAndAnswerFlow(t *testing.T) {
"dispatch",
"--run", "run_blog_002",
"--task", "T1",
"--execution-mode", "analysis",
)
var dispatchBackendResp map[string]any
@@ -316,6 +318,7 @@ func TestOrchDependencyBlockedAndAnswerFlow(t *testing.T) {
"dispatch",
"--run", "run_blog_002",
"--task", "T2",
"--execution-mode", "analysis",
)
var dispatchFrontendResp map[string]any
@@ -502,6 +505,7 @@ func TestOrchDispatchRejectsNonReadyTask(t *testing.T) {
"dispatch",
"--run", "run_blog_003",
"--task", "T2",
"--execution-mode", "analysis",
)
if exitCode != 30 {
t.Fatalf("expected invalid_state exit code 30, got %d\nstdout:\n%s", exitCode, stdout)
@@ -509,7 +513,7 @@ func TestOrchDispatchRejectsNonReadyTask(t *testing.T) {
assertErrorJSON(t, stdout, "invalid_state")
}
func TestOrchDispatchCreatesStrictWorktree(t *testing.T) {
func TestOrchDispatchCodeModeCreatesWorktree(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
@@ -541,9 +545,9 @@ func TestOrchDispatchCreatesStrictWorktree(t *testing.T) {
"dispatch",
"--run", "run_blog_worktree_001",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
"--body", "Implement inside isolated worktree.",
)
@@ -628,7 +632,7 @@ func TestOrchDispatchCreatesStrictWorktree(t *testing.T) {
}
}
func TestOrchStrictWorktreeRejectsDirtyRepoWithoutBaseRef(t *testing.T) {
func TestOrchDispatchCodeModeRejectsDirtyRepoWithoutBaseRef(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
@@ -663,9 +667,9 @@ func TestOrchStrictWorktreeRejectsDirtyRepoWithoutBaseRef(t *testing.T) {
"dispatch",
"--run", "run_blog_worktree_002",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
)
if exitCode != 30 {
t.Fatalf("expected invalid_state exit code 30, got %d\nstdout:\n%s", exitCode, stdout)
@@ -677,7 +681,7 @@ func TestOrchStrictWorktreeRejectsDirtyRepoWithoutBaseRef(t *testing.T) {
}
}
func TestOrchStrictWorktreeAllowsExplicitBaseRefOnDirtyRepo(t *testing.T) {
func TestOrchDispatchCodeModeAllowsExplicitBaseRefOnDirtyRepo(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
@@ -714,9 +718,9 @@ func TestOrchStrictWorktreeAllowsExplicitBaseRefOnDirtyRepo(t *testing.T) {
"dispatch",
"--run", "run_blog_worktree_003",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
"--base-ref", "HEAD",
)
@@ -730,9 +734,8 @@ func TestOrchStrictWorktreeAllowsExplicitBaseRefOnDirtyRepo(t *testing.T) {
}
}
func TestOrchDispatchAutoEnablesWorktreeForCodeLikeTask(t *testing.T) {
func TestOrchDispatchRequiresExplicitExecutionMode(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "coord.db")
repoPath := initGitRepo(t)
runOrchCommand(
t,
@@ -750,40 +753,26 @@ func TestOrchDispatchAutoEnablesWorktreeForCodeLikeTask(t *testing.T) {
"--run", "run_blog_auto_worktree_001",
"--task", "T1",
"--title", "Implement backend API",
"--default-to", "backend-worker",
"--default-to", "worker-a",
)
dispatchOut := runOrchCommand(
t,
stdout, _, exitCode := executeOrchCommand(
"--db", dbPath,
"--json",
"dispatch",
"--run", "run_blog_auto_worktree_001",
"--task", "T1",
"--repo-path", repoPath,
)
var dispatchResp map[string]any
mustDecodeJSON(t, dispatchOut, &dispatchResp)
attempt := nestedValue(t, dispatchResp, "data", "attempt").(map[string]any)
worktreePath, _ := attempt["worktree_path"].(string)
if worktreePath == "" {
t.Fatalf("expected auto-detected code task to allocate a worktree, got %#v", attempt)
}
if got, _ := attempt["workspace_status"].(string); got != "created" {
t.Fatalf("expected created workspace status, got %#v", attempt["workspace_status"])
}
if _, err := os.Stat(worktreePath); err != nil {
t.Fatalf("stat auto worktree path %s: %v", worktreePath, err)
if exitCode != 30 {
t.Fatalf("expected invalid_input exit code 30, got %d\nstdout:\n%s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "invalid_input")
}
func TestOrchDispatchDoesNotAutoEnableWorktreeForNonCodeTask(t *testing.T) {
func TestOrchDispatchAnalysisModeSkipsWorktree(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
repoPath := initGitRepo(t)
runOrchCommand(
t,
@@ -812,17 +801,23 @@ func TestOrchDispatchDoesNotAutoEnableWorktreeForNonCodeTask(t *testing.T) {
"dispatch",
"--run", "run_blog_auto_worktree_002",
"--task", "T1",
"--repo-path", repoPath,
"--execution-mode", "analysis",
)
var dispatchResp map[string]any
mustDecodeJSON(t, dispatchOut, &dispatchResp)
attempt := nestedValue(t, dispatchResp, "data", "attempt").(map[string]any)
if got, _ := attempt["worktree_path"].(string); got != "" {
t.Fatalf("expected non-code task to stay on non-worktree path, got %#v", attempt["worktree_path"])
t.Fatalf("expected analysis task to stay on non-worktree path, got %#v", attempt["worktree_path"])
}
if got, _ := attempt["workspace_status"].(string); got != "" {
t.Fatalf("expected no workspace status for non-code task, got %#v", attempt["workspace_status"])
t.Fatalf("expected no workspace status for analysis task, got %#v", attempt["workspace_status"])
}
message := nestedValue(t, dispatchResp, "data", "message").(map[string]any)
payload := message["payload_json"].(map[string]any)
if got, _ := payload["execution_mode"].(string); got != "analysis" {
t.Fatalf("expected analysis execution mode in payload, got %#v", payload["execution_mode"])
}
}
@@ -857,6 +852,7 @@ func TestOrchWaitWakesOnBlockedEvent(t *testing.T) {
"dispatch",
"--run", "run_blog_wait_001",
"--task", "T1",
"--execution-mode", "analysis",
)
var dispatchResp map[string]any
@@ -877,7 +873,7 @@ func TestOrchWaitWakesOnBlockedEvent(t *testing.T) {
"--run", "run_blog_wait_001",
"--for", "task_blocked",
"--after-event", "0",
"--timeout-seconds", "2",
"--timeout-seconds", "5",
)
resultCh <- waitResult{stdout: stdout, stderr: stderr, exitCode: exitCode}
}()
@@ -1010,9 +1006,9 @@ func TestOrchRetryCreatesNewAttempt(t *testing.T) {
"dispatch",
"--run", "run_blog_retry_001",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
)
var dispatchResp map[string]any
@@ -1108,9 +1104,9 @@ func TestOrchReassignCancelsOldThreadAndDispatchesNewAttempt(t *testing.T) {
"dispatch",
"--run", "run_blog_reassign_001",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
)
var dispatchResp map[string]any
@@ -1224,6 +1220,7 @@ func TestOrchCancelTaskAndRun(t *testing.T) {
"dispatch",
"--run", "run_blog_cancel_001",
"--task", "T1",
"--execution-mode", "analysis",
)
var dispatchResp map[string]any
@@ -1344,9 +1341,9 @@ func TestOrchCleanupRemovesCompletedWorktree(t *testing.T) {
"dispatch",
"--run", "run_blog_cleanup_001",
"--task", "T1",
"--execution-mode", "code",
"--repo-path", repoPath,
"--workspace-root", ".orch/worktrees",
"--strict-worktree",
)
var dispatchResp map[string]any
@@ -2,7 +2,6 @@ package orch
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
@@ -15,21 +14,25 @@ import (
"github.com/spf13/cobra"
)
const (
executionModeAnalysis = "analysis"
executionModeCode = "code"
)
func newDispatchWorkspacePreparer(cmd *cobra.Command, opts dispatchOptions) store.DispatchWorkspacePreparer {
ctx := cmd.Context()
return func(task store.Task, attemptNo int) (store.DispatchWorkspace, func(), error) {
effectiveOpts, useWorktree := resolveDispatchWorktreeOptions(task, opts)
if !useWorktree {
return store.DispatchWorkspace{}, func() {}, nil
if opts.executionMode == executionModeAnalysis {
return store.DispatchWorkspace{ExecutionMode: executionModeAnalysis}, func() {}, nil
}
return provisionDispatchWorkspace(ctx, effectiveOpts, task, attemptNo)
return provisionDispatchWorkspace(ctx, opts, task, attemptNo)
}
}
func newAttemptReuseWorkspacePreparer(cmd *cobra.Command, task store.Task, attempt *store.TaskAttempt) store.DispatchWorkspacePreparer {
if attempt == nil || attempt.WorktreePath == "" {
return nil
return newDispatchWorkspacePreparer(cmd, dispatchOptions{executionMode: executionModeAnalysis})
}
workspaceRoot, ok := deriveWorkspaceRootFromAttempt(task.RunID, task.TaskID, attempt.WorktreePath)
@@ -43,114 +46,49 @@ func newAttemptReuseWorkspacePreparer(cmd *cobra.Command, task store.Task, attem
}
opts := dispatchOptions{
repoPath: attempt.WorktreePath,
workspaceRoot: workspaceRoot,
strictWorktree: true,
baseRef: baseRef,
executionMode: executionModeCode,
repoPath: attempt.WorktreePath,
workspaceRoot: workspaceRoot,
baseRef: baseRef,
}
return newDispatchWorkspacePreparer(cmd, opts)
}
func dispatchUsesWorktree(opts dispatchOptions) bool {
return strings.TrimSpace(opts.workspaceRoot) != "" ||
opts.strictWorktree
}
func resolveDispatchWorktreeOptions(task store.Task, opts dispatchOptions) (dispatchOptions, bool) {
if dispatchUsesWorktree(opts) {
return opts, true
}
if !taskLooksLikeCodeWork(task) {
return opts, false
func normalizeDispatchOptions(opts dispatchOptions) (dispatchOptions, error) {
mode, err := normalizeExecutionMode(opts.executionMode)
if err != nil {
return dispatchOptions{}, err
}
auto := opts
auto.strictWorktree = true
return auto, true
}
normalized := opts
normalized.executionMode = mode
func taskLooksLikeCodeWork(task store.Task) bool {
if acceptanceJSONLooksCodeLike(task.AcceptanceJSON) {
return true
}
return roleLooksCodeLike(task.DefaultTo)
}
func acceptanceJSONLooksCodeLike(raw json.RawMessage) bool {
if len(raw) == 0 {
return false
}
var value any
if err := json.Unmarshal(raw, &value); err != nil {
return false
}
return acceptanceValueLooksCodeLike(value)
}
func acceptanceValueLooksCodeLike(value any) bool {
switch typed := value.(type) {
case map[string]any:
for key, raw := range typed {
lowerKey := strings.ToLower(strings.TrimSpace(key))
switch lowerKey {
case "code", "code_task", "writes_code", "worktree":
if boolValue, ok := raw.(bool); ok && boolValue {
return true
}
case "kind", "task_type", "mode", "type":
if stringValue, ok := raw.(string); ok && isCodeLikeMarker(stringValue) {
return true
}
}
if acceptanceValueLooksCodeLike(raw) {
return true
}
if mode == executionModeAnalysis {
if strings.TrimSpace(opts.repoPath) != "" {
return dispatchOptions{}, protocol.InvalidInput("repo-path is only valid with --execution-mode code", nil)
}
case []any:
for _, item := range typed {
if acceptanceValueLooksCodeLike(item) {
return true
}
if strings.TrimSpace(opts.workspaceRoot) != "" {
return dispatchOptions{}, protocol.InvalidInput("workspace-root is only valid with --execution-mode code", nil)
}
case string:
return isCodeLikeMarker(typed)
}
return false
}
func roleLooksCodeLike(role string) bool {
role = strings.ToLower(strings.TrimSpace(role))
if role == "" {
return false
}
for _, token := range splitIdentifierTokens(role) {
switch token {
case "backend", "frontend", "front", "admin", "ui", "fullstack", "foundation", "db", "database", "mobile", "ios", "android", "web", "platform", "infra", "api":
return true
if strings.TrimSpace(opts.baseRef) != "" {
return dispatchOptions{}, protocol.InvalidInput("base-ref is only valid with --execution-mode code", nil)
}
}
return false
return normalized, nil
}
func isCodeLikeMarker(value string) bool {
value = strings.ToLower(strings.TrimSpace(value))
switch value {
case "code", "code_task", "code-task", "code-change", "code_change", "implementation", "patch", "diff", "repo":
return true
func normalizeExecutionMode(value string) (string, error) {
mode := strings.ToLower(strings.TrimSpace(value))
switch mode {
case executionModeAnalysis, executionModeCode:
return mode, nil
default:
return false
return "", protocol.InvalidInput("execution-mode must be one of: analysis, code", nil)
}
}
func splitIdentifierTokens(value string) []string {
return strings.FieldsFunc(value, func(r rune) bool {
return !((r >= 'a' && r <= 'z') || (r >= '0' && r <= '9'))
})
}
func provisionDispatchWorkspace(ctx context.Context, opts dispatchOptions, task store.Task, attemptNo int) (store.DispatchWorkspace, func(), error) {
repoRoot, err := resolveRepoRoot(ctx, opts.repoPath)
if err != nil {
@@ -162,7 +100,7 @@ func provisionDispatchWorkspace(ctx context.Context, opts dispatchOptions, task
return store.DispatchWorkspace{}, nil, err
}
baseRef, baseCommit, err := resolveDispatchBase(ctx, repoRoot, workspaceRoot, opts.baseRef, opts.strictWorktree)
baseRef, baseCommit, err := resolveDispatchBase(ctx, repoRoot, workspaceRoot, opts.baseRef, true)
if err != nil {
return store.DispatchWorkspace{}, nil, err
}
@@ -190,6 +128,7 @@ func provisionDispatchWorkspace(ctx context.Context, opts dispatchOptions, task
}
return store.DispatchWorkspace{
ExecutionMode: executionModeCode,
BaseRef: baseRef,
BaseCommit: baseCommit,
BranchName: branchName,