package inbox import ( "context" "os" "path/filepath" "strings" "testing" ) func releaseLeaseForLifecycleTest(t *testing.T, dbPath, threadID string) { t.Helper() sqlDB, err := openInboxDB(context.Background(), dbPath) if err != nil { t.Fatalf("open db: %v", err) } defer sqlDB.Close() if _, err := sqlDB.ExecContext(context.Background(), `UPDATE leases SET released_at = expires_at WHERE thread_id = ?`, threadID); err != nil { t.Fatalf("release lease: %v", err) } } func assertInboxErrorMessageContains(t *testing.T, raw string, want string) { t.Helper() var payload map[string]any mustDecodeJSON(t, raw, &payload) errorValue, ok := payload["error"].(map[string]any) if !ok { t.Fatalf("expected error object, got %#v", payload["error"]) } message, _ := errorValue["message"].(string) if !strings.Contains(message, want) { t.Fatalf("expected error message to contain %q, got %q", want, message) } } // TestClaimRejectsNonPendingThread verifies claim rejects threads that are no longer pending. func TestClaimRejectsNonPendingThread(t *testing.T) { t.Parallel() dbPath := initCommandTestDB(t) threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Claimed already", "Trigger invalid state") runInboxCommand( t, "--db", dbPath, "--json", "claim", "--agent", "worker-a", "--thread", threadID, ) releaseLeaseForLifecycleTest(t, dbPath, threadID) stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "claim", "--agent", "worker-a", "--thread", threadID, ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_state") assertInboxErrorMessageContains(t, stdout, "is not pending") } // TestInboxLifecycleCommandsRespectRootAgentAndPlainTextOutput verifies lifecycle commands accept the root agent flag and render text output. func TestInboxLifecycleCommandsRespectRootAgentAndPlainTextOutput(t *testing.T) { t.Parallel() t.Run("claim", func(t *testing.T) { dbPath := initCommandTestDB(t) threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Claim root agent", "Use root agent") stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--agent", "worker-a", "claim", "--thread", threadID, ) if exitCode != 0 { t.Fatalf("expected exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } if !strings.Contains(stdout, "claimed thread "+threadID) { t.Fatalf("expected claim output to mention thread %s, got %q", threadID, stdout) } }) t.Run("renew", func(t *testing.T) { dbPath := initCommandTestDB(t) threadID := sendPendingThread(t, dbPath, "leader", "worker-b", "Renew root agent", "Use root agent") runInboxCommand(t, "--db", dbPath, "--json", "claim", "--agent", "worker-b", "--thread", threadID) stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--agent", "worker-b", "renew", "--thread", threadID, ) if exitCode != 0 { t.Fatalf("expected exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } if !strings.Contains(stdout, "renewed lease on thread "+threadID) { t.Fatalf("expected renew output to mention thread %s, got %q", threadID, stdout) } }) t.Run("update", func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-c", "worker-c") stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--agent", "worker-c", "update", "--thread", threadID, "--status", "in_progress", "--summary", "Started from root agent", ) if exitCode != 0 { t.Fatalf("expected exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } if !strings.Contains(stdout, "updated thread "+threadID+" to in_progress") { t.Fatalf("expected update output to mention thread %s, got %q", threadID, stdout) } }) t.Run("done", func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-d", "worker-d") stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--agent", "worker-d", "done", "--thread", threadID, "--summary", "Finished via root agent", ) if exitCode != 0 { t.Fatalf("expected exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } if !strings.Contains(stdout, "done thread "+threadID) { t.Fatalf("expected done output to mention thread %s, got %q", threadID, stdout) } }) t.Run("fail", func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-e", "worker-e") stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--agent", "worker-e", "fail", "--thread", threadID, "--summary", "Failed via root agent", ) if exitCode != 0 { t.Fatalf("expected exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } if !strings.Contains(stdout, "fail thread "+threadID) { t.Fatalf("expected fail output to mention thread %s, got %q", threadID, stdout) } }) t.Run("cancel", func(t *testing.T) { dbPath := initCommandTestDB(t) threadID := sendPendingThread(t, dbPath, "leader", "worker-f", "Cancel root agent", "Use root agent") stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--agent", "leader", "cancel", "--thread", threadID, "--reason", "Task superseded", ) if exitCode != 0 { t.Fatalf("expected exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } if !strings.Contains(stdout, "cancelled thread "+threadID) { t.Fatalf("expected cancel output to mention thread %s, got %q", threadID, stdout) } }) } // TestInboxLifecycleCommandHelpContracts verifies claim, renew, done, fail, and cancel help text explains their contracts. func TestInboxLifecycleCommandHelpContracts(t *testing.T) { t.Parallel() cases := []struct { name string args []string want []string }{ { name: "claim", args: []string{"claim", "--help"}, want: []string{"Only one active lease may exist per thread at a time.", "Constraints:", "--lease-seconds 1800"}, }, { name: "renew", args: []string{"renew", "--help"}, want: []string{"renew only applies when the caller already owns the active lease.", "Constraints:", "--lease-seconds 1800"}, }, { name: "done", args: []string{"done", "--help"}, want: []string{"done is a terminal operation", "Constraints:", "--summary \"Implemented retry policy\""}, }, { name: "fail", args: []string{"fail", "--help"}, want: []string{"fail is a terminal operation", "Constraints:", "retry, reassignment, or cancellation"}, }, { name: "cancel", args: []string{"cancel", "--help"}, want: []string{"leader or operator action", "Constraints:", "--reason \"Task superseded by a new plan.\""}, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { stdout, stderr, exitCode := executeInboxCommand(tc.args...) if exitCode != 0 { t.Fatalf("expected help exit 0, got %d\nstderr:\n%s\nstdout:\n%s", exitCode, stderr, stdout) } combined := stdout + stderr for _, want := range tc.want { if !strings.Contains(combined, want) { t.Fatalf("expected help for %s to contain %q, got:\n%s", tc.name, want, combined) } } }) } } // TestRenewRejectsMissingAndTerminalThread verifies renew reports not found and terminal-thread errors. func TestRenewRejectsMissingAndTerminalThread(t *testing.T) { t.Parallel() t.Run("missing thread", func(t *testing.T) { dbPath := initCommandTestDB(t) stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "renew", "--agent", "worker-a", "--thread", "thr_missing", ) if exitCode != 40 { t.Fatalf("expected exit code 40, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "not_found") }) t.Run("terminal thread", func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") runInboxCommand(t, "--db", dbPath, "--json", "done", "--agent", "worker-a", "--thread", threadID, "--summary", "Finished") stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "renew", "--agent", "worker-a", "--thread", threadID, ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_state") assertInboxErrorMessageContains(t, stdout, "already terminal") }) } // TestUpdateRejectsMissingAndTerminalThread verifies update rejects unknown or terminal threads. func TestUpdateRejectsMissingAndTerminalThread(t *testing.T) { t.Parallel() t.Run("missing thread", func(t *testing.T) { dbPath := initCommandTestDB(t) stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "update", "--agent", "worker-a", "--thread", "thr_missing", "--status", "in_progress", "--summary", "Started", ) if exitCode != 40 { t.Fatalf("expected exit code 40, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "not_found") }) t.Run("terminal thread", func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") runInboxCommand(t, "--db", dbPath, "--json", "done", "--agent", "worker-a", "--thread", threadID, "--summary", "Finished") stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "update", "--agent", "worker-a", "--thread", threadID, "--status", "blocked", "--summary", "Need answer", ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_state") assertInboxErrorMessageContains(t, stdout, "already terminal") }) } // TestUpdateRejectsInvalidBodyFileInputs verifies update rejects mutually exclusive or unreadable body-file inputs. func TestUpdateRejectsInvalidBodyFileInputs(t *testing.T) { t.Parallel() dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") t.Run("body and body-file", func(t *testing.T) { tempDir := t.TempDir() bodyPath := filepath.Join(tempDir, "body.txt") if err := os.WriteFile(bodyPath, []byte("from file"), 0o644); err != nil { t.Fatalf("write body file: %v", err) } stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "update", "--agent", "worker-a", "--thread", threadID, "--status", "in_progress", "--summary", "Started", "--body", "inline", "--body-file", bodyPath, ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, "mutually exclusive") }) t.Run("unreadable body-file", func(t *testing.T) { stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "update", "--agent", "worker-a", "--thread", threadID, "--status", "in_progress", "--summary", "Started", "--body-file", filepath.Join(t.TempDir(), "missing.txt"), ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, "failed to read body-file") }) } // TestUpdateRejectsInvalidArtifactFlagCombinations verifies update validates artifact flag combinations before command execution. func TestUpdateRejectsInvalidArtifactFlagCombinations(t *testing.T) { t.Parallel() dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") cases := []struct { name string args []string want string }{ { name: "kind without artifact", args: []string{"--artifact-kind", "log"}, want: "require at least one artifact path", }, { name: "empty artifact path", args: []string{"--artifact", " "}, want: "artifact path cannot be empty", }, { name: "artifact-kind count mismatch", args: []string{"--artifact", "a.log", "--artifact", "b.log", "--artifact-kind", "log", "--artifact-kind", "brief", "--artifact-kind", "extra"}, want: "artifact-kind must be specified once or once per artifact", }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { args := []string{ "--db", dbPath, "--json", "update", "--agent", "worker-a", "--thread", threadID, "--status", "in_progress", "--summary", "Started", } args = append(args, tc.args...) stdout, _, exitCode := executeInboxCommand(args...) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, tc.want) }) } } // TestDoneAndFailRejectMissingThread verifies terminal commands reject unknown threads. func TestDoneAndFailRejectMissingThread(t *testing.T) { t.Parallel() for _, command := range []string{"done", "fail"} { t.Run(command, func(t *testing.T) { dbPath := initCommandTestDB(t) stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", command, "--agent", "worker-a", "--thread", "thr_missing", "--summary", "Missing thread", ) if exitCode != 40 { t.Fatalf("expected exit code 40, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "not_found") }) } } // TestDoneAndFailRejectInvalidBodyFileInputs verifies terminal commands reject bad body-file inputs. func TestDoneAndFailRejectInvalidBodyFileInputs(t *testing.T) { t.Parallel() cases := []struct { command string agent string }{ {command: "done", agent: "worker-a"}, {command: "fail", agent: "worker-b"}, } for _, tc := range cases { t.Run(tc.command, func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", tc.agent, tc.agent) bodyPath := filepath.Join(t.TempDir(), "body.txt") if err := os.WriteFile(bodyPath, []byte("from file"), 0o644); err != nil { t.Fatalf("write body file: %v", err) } stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", tc.command, "--agent", tc.agent, "--thread", threadID, "--summary", "Finished", "--body", "inline", "--body-file", bodyPath, ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, "mutually exclusive") stdout, _, exitCode = executeInboxCommand( "--db", dbPath, "--json", tc.command, "--agent", tc.agent, "--thread", threadID, "--summary", "Finished", "--body-file", filepath.Join(t.TempDir(), "missing.txt"), ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, "failed to read body-file") }) } } // TestDoneAndFailRejectInvalidArtifactFlagCombinations verifies terminal commands validate artifact flag combinations. func TestDoneAndFailRejectInvalidArtifactFlagCombinations(t *testing.T) { t.Parallel() cases := []struct { command string agent string args []string want string }{ { command: "done", agent: "worker-a", args: []string{"--artifact-kind", "brief"}, want: "require at least one artifact path", }, { command: "fail", agent: "worker-b", args: []string{"--artifact", "a.log", "--artifact", "b.log", "--artifact-metadata-json", `{}`, "--artifact-metadata-json", `{}`, "--artifact-metadata-json", `{}`}, want: "artifact-metadata-json must be specified once or once per artifact", }, } for _, tc := range cases { t.Run(tc.command, func(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", tc.agent, tc.agent) args := []string{ "--db", dbPath, "--json", tc.command, "--agent", tc.agent, "--thread", threadID, "--summary", "Finished", } args = append(args, tc.args...) stdout, _, exitCode := executeInboxCommand(args...) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, tc.want) }) } } // TestCancelRejectsTerminalThread verifies cancel rejects threads that are already terminal. func TestCancelRejectsTerminalThread(t *testing.T) { t.Parallel() dbPath := filepath.Join(t.TempDir(), "coord.db") threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") runInboxCommand(t, "--db", dbPath, "--json", "done", "--agent", "worker-a", "--thread", threadID, "--summary", "Finished") stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "cancel", "--agent", "leader", "--thread", threadID, ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_state") assertInboxErrorMessageContains(t, stdout, "already terminal") } // TestCancelUsesDefaultReasonWhenOmitted verifies cancel falls back to the default summary when no reason is supplied. func TestCancelUsesDefaultReasonWhenOmitted(t *testing.T) { t.Parallel() dbPath := initCommandTestDB(t) threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Default cancel reason", "Check default summary") runInboxCommand( t, "--db", dbPath, "--json", "cancel", "--agent", "leader", "--thread", threadID, ) showOut := runInboxCommand(t, "--db", dbPath, "--json", "show", "--thread", threadID) var showResp map[string]any mustDecodeJSON(t, showOut, &showResp) lastMessage := lastThreadMessageFromShow(t, showResp) if got := lastMessage["summary"]; got != "thread cancelled" { t.Fatalf("expected default cancel summary, got %#v", got) } if got := lastMessage["body"]; got != "" { t.Fatalf("expected empty cancel body when reason omitted, got %#v", got) } } // TestCancelRejectsInvalidArtifactFlagCombinations verifies cancel validates artifact flag combinations. func TestCancelRejectsInvalidArtifactFlagCombinations(t *testing.T) { t.Parallel() dbPath := initCommandTestDB(t) threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Cancel invalid artifact", "Check artifact validation") stdout, _, exitCode := executeInboxCommand( "--db", dbPath, "--json", "cancel", "--agent", "leader", "--thread", threadID, "--artifact-metadata-json", `{}`, ) if exitCode != 30 { t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) } assertErrorJSON(t, stdout, "invalid_input") assertInboxErrorMessageContains(t, stdout, "require at least one artifact path") }