Add inbox and orch command contract tests
This commit is contained in:
@@ -0,0 +1,103 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestReplyReadsBodyFromBodyFile verifies reply loads its message body from a body file.
|
||||
func TestReplyReadsBodyFromBodyFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
dbPath := filepath.Join(tempDir, "coord.db")
|
||||
bodyPath := filepath.Join(tempDir, "reply.md")
|
||||
body := "Decision note from body file."
|
||||
if err := os.WriteFile(bodyPath, []byte(body), 0o644); err != nil {
|
||||
t.Fatalf("write reply body file: %v", err)
|
||||
}
|
||||
|
||||
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
|
||||
|
||||
replyOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Use the retry policy",
|
||||
"--body-file", bodyPath,
|
||||
)
|
||||
|
||||
var replyResp map[string]any
|
||||
mustDecodeJSON(t, replyOut, &replyResp)
|
||||
if got := nestedString(t, replyResp, "data", "message", "body"); got != body {
|
||||
t.Fatalf("expected reply body %q, got %q", body, got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReplyPersistsPayloadJSON verifies reply preserves payload JSON on a successful reply.
|
||||
func TestReplyPersistsPayloadJSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
|
||||
|
||||
replyOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Retry read timeouts",
|
||||
"--payload-json", `{"decision":"retry","owner":"leader"}`,
|
||||
)
|
||||
|
||||
var replyResp map[string]any
|
||||
mustDecodeJSON(t, replyOut, &replyResp)
|
||||
payload, ok := nestedValue(t, replyResp, "data", "message", "payload_json").(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected payload_json object, got %#v", nestedValue(t, replyResp, "data", "message", "payload_json"))
|
||||
}
|
||||
if got := payload["decision"]; got != "retry" {
|
||||
t.Fatalf("expected decision retry, got %#v", got)
|
||||
}
|
||||
if got := payload["owner"]; got != "leader" {
|
||||
t.Fatalf("expected owner leader, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReplyRejectsInvalidArtifactMetadataJSON verifies reply rejects malformed artifact metadata JSON.
|
||||
func TestReplyRejectsInvalidArtifactMetadataJSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
dbPath := filepath.Join(tempDir, "coord.db")
|
||||
artifactPath := filepath.Join(tempDir, "decision.md")
|
||||
if err := os.WriteFile(artifactPath, []byte("Decision note."), 0o644); err != nil {
|
||||
t.Fatalf("write artifact file: %v", err)
|
||||
}
|
||||
|
||||
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Retry read timeouts",
|
||||
"--artifact", artifactPath,
|
||||
"--artifact-metadata-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestSendSupportsExplicitKindPriorityAndPayload verifies send honors explicit kind, priority, and payload JSON.
|
||||
func TestSendSupportsExplicitKindPriorityAndPayload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := initCommandTestDB(t)
|
||||
|
||||
sendOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"send",
|
||||
"--from", "leader",
|
||||
"--to", "worker-a",
|
||||
"--subject", "Clarify auth behavior",
|
||||
"--summary", "Need redirect policy",
|
||||
"--kind", "question",
|
||||
"--priority", "high",
|
||||
"--payload-json", `{"topic":"auth","severity":"high"}`,
|
||||
)
|
||||
|
||||
var sendResp map[string]any
|
||||
mustDecodeJSON(t, sendOut, &sendResp)
|
||||
|
||||
if got := nestedString(t, sendResp, "data", "message", "kind"); got != "question" {
|
||||
t.Fatalf("expected message kind question, got %q", got)
|
||||
}
|
||||
if got := nestedString(t, sendResp, "data", "thread", "priority"); got != "high" {
|
||||
t.Fatalf("expected thread priority high, got %q", got)
|
||||
}
|
||||
payload, ok := nestedValue(t, sendResp, "data", "message", "payload_json").(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected payload_json object, got %#v", nestedValue(t, sendResp, "data", "message", "payload_json"))
|
||||
}
|
||||
if got, _ := payload["topic"].(string); got != "auth" {
|
||||
t.Fatalf("expected payload topic auth, got %#v", payload["topic"])
|
||||
}
|
||||
if got, _ := payload["severity"].(string); got != "high" {
|
||||
t.Fatalf("expected payload severity high, got %#v", payload["severity"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestReplySupportsQuestionAndProgressKinds verifies reply accepts the remaining documented non-default kinds.
|
||||
func TestReplySupportsQuestionAndProgressKinds(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
kind string
|
||||
summary string
|
||||
body string
|
||||
}{
|
||||
{
|
||||
name: "question",
|
||||
kind: "question",
|
||||
summary: "Need product confirmation",
|
||||
body: "Should the guest redirect happen before the paywall?",
|
||||
},
|
||||
{
|
||||
name: "progress",
|
||||
kind: "progress",
|
||||
summary: "Investigating",
|
||||
body: "Checking the redirect and onboarding flows.",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
|
||||
|
||||
replyOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--kind", tc.kind,
|
||||
"--summary", tc.summary,
|
||||
"--body", tc.body,
|
||||
)
|
||||
|
||||
var replyResp map[string]any
|
||||
mustDecodeJSON(t, replyOut, &replyResp)
|
||||
if got := nestedString(t, replyResp, "data", "message", "kind"); got != tc.kind {
|
||||
t.Fatalf("expected reply kind %q, got %q", tc.kind, got)
|
||||
}
|
||||
if got := nestedString(t, replyResp, "data", "message", "summary"); got != tc.summary {
|
||||
t.Fatalf("expected reply summary %q, got %q", tc.summary, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestDonePersistsPayloadJSONOnFinalMessage verifies done persists payload JSON on the final result message.
|
||||
func TestDonePersistsPayloadJSONOnFinalMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"done",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Retry policy implemented",
|
||||
"--payload-json", `{"artifact":"report","lines":12}`,
|
||||
)
|
||||
|
||||
showOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"show",
|
||||
"--thread", threadID,
|
||||
)
|
||||
|
||||
var showResp map[string]any
|
||||
mustDecodeJSON(t, showOut, &showResp)
|
||||
lastMessage := lastThreadMessageFromShow(t, showResp)
|
||||
|
||||
payload, ok := lastMessage["payload_json"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected payload_json object, got %#v", lastMessage["payload_json"])
|
||||
}
|
||||
if got, _ := payload["artifact"].(string); got != "report" {
|
||||
t.Fatalf("expected payload artifact report, got %#v", payload["artifact"])
|
||||
}
|
||||
if got, _ := payload["lines"].(float64); got != 12 {
|
||||
t.Fatalf("expected payload lines 12, got %#v", payload["lines"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestFailPersistsPayloadJSONOnFailureMessage verifies fail persists payload JSON on the failure result message.
|
||||
func TestFailPersistsPayloadJSONOnFailureMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"fail",
|
||||
"--agent", "worker-b",
|
||||
"--thread", threadID,
|
||||
"--summary", "Migration failed",
|
||||
"--payload-json", `{"code":"schema_mismatch","retryable":false}`,
|
||||
)
|
||||
|
||||
showOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"show",
|
||||
"--thread", threadID,
|
||||
)
|
||||
|
||||
var showResp map[string]any
|
||||
mustDecodeJSON(t, showOut, &showResp)
|
||||
lastMessage := lastThreadMessageFromShow(t, showResp)
|
||||
|
||||
payload, ok := lastMessage["payload_json"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected payload_json object, got %#v", lastMessage["payload_json"])
|
||||
}
|
||||
if got, _ := payload["code"].(string); got != "schema_mismatch" {
|
||||
t.Fatalf("expected payload code schema_mismatch, got %#v", payload["code"])
|
||||
}
|
||||
if got, _ := payload["retryable"].(bool); got {
|
||||
t.Fatalf("expected retryable=false, got %#v", payload["retryable"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestDoneRejectsInvalidArtifactMetadataJSONInput verifies done rejects malformed artifact metadata JSON.
|
||||
func TestDoneRejectsInvalidArtifactMetadataJSONInput(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
dbPath := filepath.Join(tempDir, "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
|
||||
artifactPath := filepath.Join(tempDir, "result.md")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"done",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Retry policy implemented",
|
||||
"--artifact", artifactPath,
|
||||
"--artifact-metadata-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
|
||||
// TestFailRejectsInvalidArtifactMetadataJSONInput verifies fail rejects malformed artifact metadata JSON.
|
||||
func TestFailRejectsInvalidArtifactMetadataJSONInput(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
dbPath := filepath.Join(tempDir, "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
|
||||
artifactPath := filepath.Join(tempDir, "failure.md")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"fail",
|
||||
"--agent", "worker-b",
|
||||
"--thread", threadID,
|
||||
"--summary", "Migration failed",
|
||||
"--artifact", artifactPath,
|
||||
"--artifact-metadata-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
|
||||
// TestCancelRejectsInvalidArtifactMetadataJSONInput verifies cancel rejects malformed artifact metadata JSON.
|
||||
func TestCancelRejectsInvalidArtifactMetadataJSONInput(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
dbPath := filepath.Join(tempDir, "coord.db")
|
||||
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
|
||||
artifactPath := filepath.Join(tempDir, "cancel.md")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"cancel",
|
||||
"--agent", "leader",
|
||||
"--thread", threadID,
|
||||
"--reason", "Task superseded by a larger refactor",
|
||||
"--artifact", artifactPath,
|
||||
"--artifact-metadata-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestUpdateRejectsInvalidStatus verifies update rejects statuses outside the documented contract.
|
||||
func TestUpdateRejectsInvalidStatus(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"update",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--status", "pending",
|
||||
"--summary", "Unexpected status",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
|
||||
// TestDoneRejectsInvalidPayloadJSON verifies done rejects malformed payload JSON.
|
||||
func TestDoneRejectsInvalidPayloadJSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"done",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--summary", "Retry policy implemented",
|
||||
"--payload-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
|
||||
// TestFailRejectsInvalidPayloadJSON verifies fail rejects malformed payload JSON.
|
||||
func TestFailRejectsInvalidPayloadJSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"fail",
|
||||
"--agent", "worker-b",
|
||||
"--thread", threadID,
|
||||
"--summary", "Migration failed",
|
||||
"--payload-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestWatchWakesOnTerminalStatuses verifies watch can wake on done and failed status filters.
|
||||
func TestWatchWakesOnTerminalStatuses(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
status string
|
||||
agent string
|
||||
finish func(t *testing.T, dbPath, threadID, agent string)
|
||||
}{
|
||||
{
|
||||
name: "done",
|
||||
status: "done",
|
||||
agent: "worker-d",
|
||||
finish: func(t *testing.T, dbPath, threadID, agent string) {
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"done",
|
||||
"--agent", agent,
|
||||
"--thread", threadID,
|
||||
"--summary", "Task complete",
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "failed",
|
||||
status: "failed",
|
||||
agent: "worker-f",
|
||||
finish: func(t *testing.T, dbPath, threadID, agent string) {
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"fail",
|
||||
"--agent", agent,
|
||||
"--thread", threadID,
|
||||
"--summary", "Task failed",
|
||||
)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
runInboxCommand(t, "--db", dbPath, "--json", "init")
|
||||
threadID := sendPendingThread(t, dbPath, "leader", tc.agent, "Implement feature", "Initial request")
|
||||
|
||||
watchCh := make(chan watchCommandResult, 1)
|
||||
go func() {
|
||||
stdout, stderr, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"watch",
|
||||
"--agent", tc.agent,
|
||||
"--status", tc.status,
|
||||
"--timeout-seconds", "2",
|
||||
)
|
||||
watchCh <- watchCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"claim",
|
||||
"--agent", tc.agent,
|
||||
"--thread", threadID,
|
||||
)
|
||||
tc.finish(t, dbPath, threadID, tc.agent)
|
||||
|
||||
select {
|
||||
case result := <-watchCh:
|
||||
if result.exit != 0 {
|
||||
t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", result.exit, result.stderr, result.stdout)
|
||||
}
|
||||
var watchResp map[string]any
|
||||
mustDecodeJSON(t, result.stdout, &watchResp)
|
||||
if got := nestedString(t, watchResp, "data", "thread", "status"); got != tc.status {
|
||||
t.Fatalf("expected watch status %q, got %q", tc.status, got)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("watch command did not return for terminal status %s", tc.status)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,159 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestUpdateRejectsInvalidArtifactMetadataJSONOnProgressUpdate verifies update rejects malformed artifact metadata JSON on progress updates.
|
||||
func TestUpdateRejectsInvalidArtifactMetadataJSONOnProgressUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
dbPath := filepath.Join(tempDir, "coord.db")
|
||||
artifactPath := filepath.Join(tempDir, "progress.md")
|
||||
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"update",
|
||||
"--agent", "worker-a",
|
||||
"--thread", threadID,
|
||||
"--status", "in_progress",
|
||||
"--summary", "Implementation started",
|
||||
"--artifact", artifactPath,
|
||||
"--artifact-metadata-json", "not-json",
|
||||
)
|
||||
if exitCode != 30 {
|
||||
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "invalid_input")
|
||||
}
|
||||
|
||||
// TestWaitReplyWakesOnResultKindFilter verifies wait-reply wakes on a result message when --kinds result is used.
|
||||
func TestWaitReplyWakesOnResultKindFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath)
|
||||
|
||||
waitCh := make(chan waitReplyCommandResult, 1)
|
||||
go func() {
|
||||
stdout, stderr, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--agent", "worker-c",
|
||||
"--json",
|
||||
"wait-reply",
|
||||
"--thread", threadID,
|
||||
"--after-message", blockedMessageID,
|
||||
"--kinds", "result",
|
||||
"--timeout-seconds", "2",
|
||||
)
|
||||
waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"done",
|
||||
"--agent", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--summary", "Worker completed after unblock",
|
||||
"--body", "Final result payload.",
|
||||
)
|
||||
|
||||
var waitResult waitReplyCommandResult
|
||||
select {
|
||||
case waitResult = <-waitCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("wait-reply result filter command did not return")
|
||||
}
|
||||
if waitResult.exit != 0 {
|
||||
t.Fatalf("wait-reply result filter failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
|
||||
}
|
||||
|
||||
var waitResp map[string]any
|
||||
mustDecodeJSON(t, waitResult.stdout, &waitResp)
|
||||
if kind := nestedString(t, waitResp, "data", "message", "kind"); kind != "result" {
|
||||
t.Fatalf("expected result wake message, got %q", kind)
|
||||
}
|
||||
if got := nestedString(t, waitResp, "data", "message", "summary"); got != "Worker completed after unblock" {
|
||||
t.Fatalf("expected result summary, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWaitReplyKindsFilterSkipsAnswerUntilControl verifies wait-reply ignores non-matching answers until a matching control message arrives.
|
||||
func TestWaitReplyKindsFilterSkipsAnswerUntilControl(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath)
|
||||
|
||||
waitCh := make(chan waitReplyCommandResult, 1)
|
||||
go func() {
|
||||
stdout, stderr, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--agent", "worker-c",
|
||||
"--json",
|
||||
"wait-reply",
|
||||
"--thread", threadID,
|
||||
"--after-message", blockedMessageID,
|
||||
"--kinds", "control",
|
||||
"--timeout-seconds", "2",
|
||||
)
|
||||
waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--summary", "Regular answer",
|
||||
"--body", "This should not wake the control-only waiter.",
|
||||
)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--kind", "control",
|
||||
"--summary", "Pause work",
|
||||
"--body", "Pause work until product confirms the decision.",
|
||||
)
|
||||
|
||||
var waitResult waitReplyCommandResult
|
||||
select {
|
||||
case waitResult = <-waitCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("wait-reply control filter command did not return")
|
||||
}
|
||||
if waitResult.exit != 0 {
|
||||
t.Fatalf("wait-reply control filter failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
|
||||
}
|
||||
|
||||
var waitResp map[string]any
|
||||
mustDecodeJSON(t, waitResult.stdout, &waitResp)
|
||||
if kind := nestedString(t, waitResp, "data", "message", "kind"); kind != "control" {
|
||||
t.Fatalf("expected control wake message, got %q", kind)
|
||||
}
|
||||
if got := nestedString(t, waitResp, "data", "message", "summary"); got != "Pause work" {
|
||||
t.Fatalf("expected control summary, got %q", got)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user