test: add inbox command integration coverage
This commit is contained in:
@@ -0,0 +1,221 @@
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type waitReplyCommandResult struct {
|
||||
stdout string
|
||||
stderr string
|
||||
exit int
|
||||
}
|
||||
|
||||
func TestWaitReplyWakesOnAnswerAfterMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath)
|
||||
|
||||
waitCh := make(chan waitReplyCommandResult, 1)
|
||||
go func() {
|
||||
stdout, stderr, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--agent", "worker-c",
|
||||
"--json",
|
||||
"wait-reply",
|
||||
"--thread", threadID,
|
||||
"--after-message", blockedMessageID,
|
||||
"--timeout-seconds", "2",
|
||||
)
|
||||
waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--summary", "Redirect to login",
|
||||
"--body", "Redirect guests to login for the MVP.",
|
||||
)
|
||||
|
||||
var waitResult waitReplyCommandResult
|
||||
select {
|
||||
case waitResult = <-waitCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("wait-reply command did not return")
|
||||
}
|
||||
if waitResult.exit != 0 {
|
||||
t.Fatalf("wait-reply failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
|
||||
}
|
||||
|
||||
var waitResp map[string]any
|
||||
mustDecodeJSON(t, waitResult.stdout, &waitResp)
|
||||
if woke, ok := nestedValue(t, waitResp, "data", "woke").(bool); !ok || !woke {
|
||||
t.Fatalf("expected wait-reply wake, got %#v", nestedValue(t, waitResp, "data", "woke"))
|
||||
}
|
||||
if kind := nestedString(t, waitResp, "data", "message", "kind"); kind != "answer" {
|
||||
t.Fatalf("expected answer wake message, got %q", kind)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitReplyCanStartFromAfterEvent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--summary", "First answer",
|
||||
"--body", "First reply payload.",
|
||||
)
|
||||
|
||||
firstWaitOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--agent", "worker-c",
|
||||
"--json",
|
||||
"wait-reply",
|
||||
"--thread", threadID,
|
||||
"--after-message", blockedMessageID,
|
||||
"--timeout-seconds", "2",
|
||||
)
|
||||
var firstWaitResp map[string]any
|
||||
mustDecodeJSON(t, firstWaitOut, &firstWaitResp)
|
||||
firstEventIDFloat, ok := nestedValue(t, firstWaitResp, "data", "next_event_id").(float64)
|
||||
if !ok {
|
||||
t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, firstWaitResp, "data", "next_event_id"))
|
||||
}
|
||||
firstEventID := int64(firstEventIDFloat)
|
||||
|
||||
waitCh := make(chan waitReplyCommandResult, 1)
|
||||
go func() {
|
||||
stdout, stderr, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--agent", "worker-c",
|
||||
"--json",
|
||||
"wait-reply",
|
||||
"--thread", threadID,
|
||||
"--after-event", strconv.FormatInt(firstEventID, 10),
|
||||
"--timeout-seconds", "2",
|
||||
)
|
||||
waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"reply",
|
||||
"--from", "leader",
|
||||
"--to", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--summary", "Second answer",
|
||||
"--body", "Second reply payload.",
|
||||
)
|
||||
|
||||
var waitResult waitReplyCommandResult
|
||||
select {
|
||||
case waitResult = <-waitCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("wait-reply after-event command did not return")
|
||||
}
|
||||
if waitResult.exit != 0 {
|
||||
t.Fatalf("wait-reply after-event failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
|
||||
}
|
||||
|
||||
var waitResp map[string]any
|
||||
mustDecodeJSON(t, waitResult.stdout, &waitResp)
|
||||
if got := nestedString(t, waitResp, "data", "message", "summary"); got != "Second answer" {
|
||||
t.Fatalf("expected second answer wake message, got %q", got)
|
||||
}
|
||||
secondEventIDFloat, ok := nestedValue(t, waitResp, "data", "next_event_id").(float64)
|
||||
if !ok {
|
||||
t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, waitResp, "data", "next_event_id"))
|
||||
}
|
||||
if int64(secondEventIDFloat) <= firstEventID {
|
||||
t.Fatalf("expected second event id > first event id, got %d <= %d", int64(secondEventIDFloat), firstEventID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitReplyTimesOutWhenNoReply(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||
threadID, _ := seedBlockedThreadForWaitReply(t, dbPath)
|
||||
|
||||
stdout, _, exitCode := executeInboxCommand(
|
||||
"--db", dbPath,
|
||||
"--agent", "worker-c",
|
||||
"--json",
|
||||
"wait-reply",
|
||||
"--thread", threadID,
|
||||
"--timeout-seconds", "1",
|
||||
)
|
||||
if exitCode != 10 {
|
||||
t.Fatalf("expected wait-reply timeout exit code 10, got %d with %s", exitCode, stdout)
|
||||
}
|
||||
assertErrorJSON(t, stdout, "no_matching_work")
|
||||
}
|
||||
|
||||
func seedBlockedThreadForWaitReply(t *testing.T, dbPath string) (threadID string, blockedMessageID string) {
|
||||
t.Helper()
|
||||
|
||||
runInboxCommand(t, "--db", dbPath, "--json", "init")
|
||||
|
||||
sendOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"send",
|
||||
"--from", "leader",
|
||||
"--to", "worker-c",
|
||||
"--subject", "Investigate auth edge case",
|
||||
"--summary", "Check auth redirect behavior",
|
||||
)
|
||||
var sendResp map[string]any
|
||||
mustDecodeJSON(t, sendOut, &sendResp)
|
||||
threadID = nestedString(t, sendResp, "data", "thread", "thread_id")
|
||||
|
||||
runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"claim",
|
||||
"--agent", "worker-c",
|
||||
"--thread", threadID,
|
||||
)
|
||||
|
||||
blockedOut := runInboxCommand(
|
||||
t,
|
||||
"--db", dbPath,
|
||||
"--json",
|
||||
"update",
|
||||
"--agent", "worker-c",
|
||||
"--thread", threadID,
|
||||
"--status", "blocked",
|
||||
"--summary", "Need policy decision",
|
||||
)
|
||||
var blockedResp map[string]any
|
||||
mustDecodeJSON(t, blockedOut, &blockedResp)
|
||||
blockedMessageID = nestedString(t, blockedResp, "data", "message", "message_id")
|
||||
return threadID, blockedMessageID
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user