diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index f4a212a..5781fec 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -19,7 +19,7 @@ As of now: - shared SQLite schema initialization exists - `inbox` is implemented end-to-end, including send/fetch/claim/renew/update/reply/done/fail/cancel/list/show/watch/wait-reply - `inbox` supports blocking waits, lease renewal, unread fetches backed by per-agent read cursors, `--body-file`, artifact attachments, and structured JSON errors with stable exit codes -- integration tests cover the main inbox lifecycle, wait/watch flows, artifact persistence, and JSON error contracts +- integration tests now cover each implemented inbox command, plus the main inbox workflows, wait/watch flows, artifact persistence, unread behavior, and JSON error contracts - a human-readable inbox command test-plan set has been authored under `docs/tests/inbox/` - `orch` currently exists as a command skeleton only - no scheduler workflows have been implemented yet @@ -289,10 +289,14 @@ Reason: ## Suggested Early Tests -Add these tests before the codebase grows too much: +Completed so far: - schema init test -- inbox thread lifecycle test +- inbox command-level CLI integration coverage aligned to `docs/tests/inbox/` +- inbox workflow lifecycle coverage + +Still recommended before the codebase grows too much: + - single-task orch dispatch and reconcile test - worktree path generation test - council tally grouping test diff --git a/internal/cli/inbox/cancel_integration_test.go b/internal/cli/inbox/cancel_integration_test.go new file mode 100644 index 0000000..0197d4d --- /dev/null +++ b/internal/cli/inbox/cancel_integration_test.go @@ -0,0 +1,148 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func TestCancelMarksThreadCancelled(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--subject", "Implement cancellation", + "--summary", "Initial request", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + cancelOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "cancel", + "--agent", "leader", + "--thread", threadID, + "--reason", "Task superseded by a larger refactor", + ) + + var cancelResp map[string]any + mustDecodeJSON(t, cancelOut, &cancelResp) + if status := nestedString(t, cancelResp, "data", "thread", "status"); status != "cancelled" { + t.Fatalf("expected cancelled thread, got %q", status) + } + if kind := nestedString(t, cancelResp, "data", "message", "kind"); kind != "control" { + t.Fatalf("expected control message, got %q", kind) + } +} + +func TestCancelPersistsReasonAndArtifact(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + cancelPath := filepath.Join(tempDir, "cancel.md") + if err := os.WriteFile(cancelPath, []byte("Cancelled by product decision"), 0o644); err != nil { + t.Fatalf("write cancel artifact: %v", err) + } + + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--subject", "Implement cancellation", + "--summary", "Initial request", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "cancel", + "--agent", "leader", + "--thread", threadID, + "--reason", "Task superseded by a larger refactor", + "--artifact", cancelPath, + "--artifact-kind", "brief", + ) + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + + messages, ok := nestedValue(t, showResp, "data", "messages").([]any) + if !ok || len(messages) == 0 { + t.Fatalf("expected non-empty message history, got %#v", nestedValue(t, showResp, "data", "messages")) + } + + lastMessage, ok := messages[len(messages)-1].(map[string]any) + if !ok { + t.Fatalf("expected message object, got %#v", messages[len(messages)-1]) + } + if got := lastMessage["summary"]; got != "Task superseded by a larger refactor" { + t.Fatalf("expected cancel summary, got %#v", got) + } + if got := lastMessage["body"]; got != "Task superseded by a larger refactor" { + t.Fatalf("expected cancel body, got %#v", got) + } + artifacts, ok := lastMessage["artifacts"].([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one cancel artifact, got %#v", lastMessage["artifacts"]) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if got := artifact["path"]; got != cancelPath { + t.Fatalf("expected artifact path %q, got %#v", cancelPath, got) + } + if got := artifact["kind"]; got != "brief" { + t.Fatalf("expected artifact kind brief, got %#v", got) + } +} + +func TestCancelRejectsWhenThreadMissing(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--agent", "leader", + "--json", + "cancel", + "--thread", "thr_missing", + ) + if exitCode != 40 { + t.Fatalf("expected not-found exit code 40, got %d with %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "not_found") +} + diff --git a/internal/cli/inbox/claim_integration_test.go b/internal/cli/inbox/claim_integration_test.go new file mode 100644 index 0000000..2ee5875 --- /dev/null +++ b/internal/cli/inbox/claim_integration_test.go @@ -0,0 +1,112 @@ +package inbox + +import "testing" + +func TestClaimAcquiresThreadLease(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Race claim", "Claim this task") + + claimOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", threadID, + "--lease-seconds", "300", + ) + + var claimResp map[string]any + mustDecodeJSON(t, claimOut, &claimResp) + if got := nestedString(t, claimResp, "data", "thread", "status"); got != "claimed" { + t.Fatalf("expected claimed status, got %q", got) + } + if got := nestedString(t, claimResp, "data", "thread", "assigned_to"); got != "worker-a" { + t.Fatalf("expected assigned_to worker-a, got %q", got) + } + if got := nestedString(t, claimResp, "data", "message", "kind"); got != "event" { + t.Fatalf("expected event message kind, got %q", got) + } + if got := nestedString(t, claimResp, "data", "message", "summary"); got != "thread claimed" { + t.Fatalf("expected summary thread claimed, got %q", got) + } +} + +func TestClaimRejectsWhenThreadMissing(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-z", + "--thread", "thr_missing", + ) + if exitCode != 40 { + t.Fatalf("expected exit code 40, got %d", exitCode) + } + assertErrorJSON(t, stdout, "not_found") +} + +func TestClaimRejectsWhenThreadAlreadyClaimed(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-z", "Claimed task", "Already claimed") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-z", + "--thread", threadID, + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-y", + "--thread", threadID, + ) + if exitCode != 20 { + t.Fatalf("expected exit code 20, got %d", exitCode) + } + assertErrorJSON(t, stdout, "lease_conflict") +} + +func TestClaimRecordsRequestedLeaseDuration(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Lease payload", "Verify lease payload") + + claimOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", threadID, + "--lease-seconds", "300", + ) + + var claimResp map[string]any + mustDecodeJSON(t, claimOut, &claimResp) + payload, ok := nestedValue(t, claimResp, "data", "message", "payload_json").(map[string]any) + if !ok { + t.Fatalf("expected payload_json object, got %#v", nestedValue(t, claimResp, "data", "message", "payload_json")) + } + leaseSeconds, ok := payload["lease_seconds"].(float64) + if !ok || int(leaseSeconds) != 300 { + t.Fatalf("expected lease_seconds 300, got %#v", payload["lease_seconds"]) + } + leaseToken, _ := payload["lease_token"].(string) + if leaseToken == "" { + t.Fatalf("expected non-empty lease_token, got %#v", payload["lease_token"]) + } +} diff --git a/internal/cli/inbox/done_integration_test.go b/internal/cli/inbox/done_integration_test.go new file mode 100644 index 0000000..f0041e9 --- /dev/null +++ b/internal/cli/inbox/done_integration_test.go @@ -0,0 +1,140 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func TestDoneMarksThreadTerminal(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + doneOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadID, + "--summary", "Retry policy implemented", + "--body", "The HTTP client now retries the selected transient failures.", + ) + + var doneResp map[string]any + mustDecodeJSON(t, doneOut, &doneResp) + if status := nestedString(t, doneResp, "data", "thread", "status"); status != "done" { + t.Fatalf("expected done thread status, got %q", status) + } + if kind := nestedString(t, doneResp, "data", "message", "kind"); kind != "result" { + t.Fatalf("expected result message kind, got %q", kind) + } +} + +func TestDonePersistsResultBodyAndArtifact(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + resultPath := filepath.Join(tempDir, "result.md") + body := "Result from body file." + if err := os.WriteFile(resultPath, []byte(body), 0o644); err != nil { + t.Fatalf("write result file: %v", err) + } + + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadID, + "--summary", "Retry policy implemented", + "--body-file", resultPath, + "--artifact", resultPath, + "--artifact-kind", "report", + ) + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + lastMessage := lastThreadMessageFromShow(t, showResp) + + if gotBody, _ := lastMessage["body"].(string); gotBody != body { + t.Fatalf("expected body %q, got %#v", body, lastMessage["body"]) + } + artifacts, ok := lastMessage["artifacts"].([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %#v", lastMessage["artifacts"]) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if gotPath, _ := artifact["path"].(string); gotPath != resultPath { + t.Fatalf("expected artifact path %q, got %#v", resultPath, artifact["path"]) + } + if gotKind, _ := artifact["kind"].(string); gotKind != "report" { + t.Fatalf("expected artifact kind report, got %#v", artifact["kind"]) + } +} + +func TestDoneRejectsNonOwner(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "done", + "--agent", "worker-b", + "--thread", threadID, + "--summary", "Retry policy implemented", + ) + if exitCode != 20 { + t.Fatalf("expected exit code 20, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "lease_conflict") +} + +func TestDoneRejectsOnTerminalThread(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadID, + "--summary", "Retry policy implemented", + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "done", + "--agent", "worker-a", + "--thread", threadID, + "--summary", "Retry policy implemented", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "invalid_state") +} diff --git a/internal/cli/inbox/fail_integration_test.go b/internal/cli/inbox/fail_integration_test.go new file mode 100644 index 0000000..62bf9d1 --- /dev/null +++ b/internal/cli/inbox/fail_integration_test.go @@ -0,0 +1,143 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func TestFailMarksThreadFailed(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b") + + failOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fail", + "--agent", "worker-b", + "--thread", threadID, + "--summary", "Migration failed", + "--body", "The migration cannot proceed because the prior schema is inconsistent.", + ) + + var failResp map[string]any + mustDecodeJSON(t, failOut, &failResp) + if status := nestedString(t, failResp, "data", "thread", "status"); status != "failed" { + t.Fatalf("expected failed thread status, got %q", status) + } + if kind := nestedString(t, failResp, "data", "message", "kind"); kind != "result" { + t.Fatalf("expected result message kind, got %q", kind) + } + if toAgent := nestedString(t, failResp, "data", "message", "to_agent"); toAgent != "leader" { + t.Fatalf("expected message to_agent leader, got %q", toAgent) + } +} + +func TestFailPersistsFailureBodyAndArtifact(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + failurePath := filepath.Join(tempDir, "failure.md") + body := "Failure details from file." + if err := os.WriteFile(failurePath, []byte(body), 0o644); err != nil { + t.Fatalf("write failure file: %v", err) + } + + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "fail", + "--agent", "worker-b", + "--thread", threadID, + "--summary", "Migration failed", + "--body-file", failurePath, + "--artifact", failurePath, + "--artifact-kind", "report", + ) + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + lastMessage := lastThreadMessageFromShow(t, showResp) + + if gotBody, _ := lastMessage["body"].(string); gotBody != body { + t.Fatalf("expected body %q, got %#v", body, lastMessage["body"]) + } + artifacts, ok := lastMessage["artifacts"].([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %#v", lastMessage["artifacts"]) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if gotPath, _ := artifact["path"].(string); gotPath != failurePath { + t.Fatalf("expected artifact path %q, got %#v", failurePath, artifact["path"]) + } + if gotKind, _ := artifact["kind"].(string); gotKind != "report" { + t.Fatalf("expected artifact kind report, got %#v", artifact["kind"]) + } +} + +func TestFailRejectsNonOwner(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fail", + "--agent", "worker-x", + "--thread", threadID, + "--summary", "Migration failed", + ) + if exitCode != 20 { + t.Fatalf("expected exit code 20, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "lease_conflict") +} + +func TestFailRejectsOnTerminalThread(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "fail", + "--agent", "worker-b", + "--thread", threadID, + "--summary", "Migration failed", + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fail", + "--agent", "worker-b", + "--thread", threadID, + "--summary", "Migration failed", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "invalid_state") +} diff --git a/internal/cli/inbox/fetch_integration_test.go b/internal/cli/inbox/fetch_integration_test.go new file mode 100644 index 0000000..efdbf4e --- /dev/null +++ b/internal/cli/inbox/fetch_integration_test.go @@ -0,0 +1,187 @@ +package inbox + +import "testing" + +func TestFetchReturnsPendingThreadForTargetAgent(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + sendPendingThread(t, dbPath, "leader", "worker-a", "Implement task", "Create API endpoint") + + fetchOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-a", + "--status", "pending", + ) + + var fetchResp map[string]any + mustDecodeJSON(t, fetchOut, &fetchResp) + threads, ok := nestedValue(t, fetchResp, "data", "threads").([]any) + if !ok || len(threads) < 1 { + t.Fatalf("expected at least one fetched thread, got %#v", nestedValue(t, fetchResp, "data", "threads")) + } + thread, ok := threads[0].(map[string]any) + if !ok { + t.Fatalf("expected thread object, got %#v", threads[0]) + } + if got, _ := thread["assigned_to"].(string); got != "worker-a" { + t.Fatalf("expected assigned_to worker-a, got %#v", thread["assigned_to"]) + } + if got, _ := thread["status"].(string); got != "pending" { + t.Fatalf("expected pending status, got %#v", thread["status"]) + } +} + +func TestFetchRespectsStatusAndLimitFilters(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + sendPendingThread(t, dbPath, "leader", "worker-a", "Task A", "Pending task") + blockedThreadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Task B", "Blocked task") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", blockedThreadID, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-a", + "--thread", blockedThreadID, + "--status", "blocked", + "--summary", "Need decision", + "--payload-json", `{"question":"continue?"}`, + ) + + fetchOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-a", + "--status", "pending,blocked", + "--limit", "1", + ) + + var fetchResp map[string]any + mustDecodeJSON(t, fetchOut, &fetchResp) + threads, ok := nestedValue(t, fetchResp, "data", "threads").([]any) + if !ok { + t.Fatalf("expected threads array, got %#v", nestedValue(t, fetchResp, "data", "threads")) + } + if len(threads) > 1 { + t.Fatalf("expected at most one thread with limit=1, got %d", len(threads)) + } + if len(threads) == 0 { + t.Fatalf("expected one thread with status filter, got empty result") + } + thread, ok := threads[0].(map[string]any) + if !ok { + t.Fatalf("expected thread object, got %#v", threads[0]) + } + status, _ := thread["status"].(string) + if status != "pending" && status != "blocked" { + t.Fatalf("expected pending or blocked status, got %#v", thread["status"]) + } +} + +func TestFetchUnreadUsesReadCursor(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-e", "Review navbar copy", "Check top nav wording") + + firstFetchOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + + var firstFetchResp map[string]any + mustDecodeJSON(t, firstFetchOut, &firstFetchResp) + firstThreads, ok := nestedValue(t, firstFetchResp, "data", "threads").([]any) + if !ok || len(firstThreads) != 1 { + t.Fatalf("expected one unread thread before mark-read, got %#v", nestedValue(t, firstFetchResp, "data", "threads")) + } + + runInboxCommand( + t, + "--db", dbPath, + "--agent", "worker-e", + "--json", + "show", + "--thread", threadID, + "--mark-read", + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + if exitCode != 10 { + t.Fatalf("expected unread fetch to return no_matching_work after mark-read, got exit=%d stdout=%s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "no_matching_work") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-e", + "--thread", threadID, + "--summary", "Use sentence case", + "--body", "Keep the nav labels in sentence case.", + ) + + thirdFetchOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + var thirdFetchResp map[string]any + mustDecodeJSON(t, thirdFetchOut, &thirdFetchResp) + thirdThreads, ok := nestedValue(t, thirdFetchResp, "data", "threads").([]any) + if !ok || len(thirdThreads) != 1 { + t.Fatalf("expected unread thread to reappear after new message, got %#v", nestedValue(t, thirdFetchResp, "data", "threads")) + } +} + +func TestFetchReturnsNoMatchingWorkWhenEmpty(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-z", + "--status", "pending", + ) + if exitCode != 10 { + t.Fatalf("expected exit code 10, got %d", exitCode) + } + assertErrorJSON(t, stdout, "no_matching_work") +} diff --git a/internal/cli/inbox/init_integration_test.go b/internal/cli/inbox/init_integration_test.go new file mode 100644 index 0000000..99b8bb0 --- /dev/null +++ b/internal/cli/inbox/init_integration_test.go @@ -0,0 +1,83 @@ +package inbox + +import ( + "path/filepath" + "testing" +) + +func TestInitCreatesSchemaOnEmptyDB(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + initOut := runInboxCommand(t, "--db", dbPath, "--json", "init") + + var initResp map[string]any + mustDecodeJSON(t, initOut, &initResp) + + if ok, _ := initResp["ok"].(bool); !ok { + t.Fatalf("expected ok=true, got %#v", initResp) + } + if cmd, _ := initResp["command"].(string); cmd != "init" { + t.Fatalf("expected command init, got %#v", initResp["command"]) + } + if got := nestedString(t, initResp, "data", "db_path"); got != dbPath { + t.Fatalf("expected db_path %q, got %q", dbPath, got) + } + if got := nestedString(t, initResp, "data", "status"); got != "initialized" { + t.Fatalf("expected initialized status, got %q", got) + } +} + +func TestInitIsIdempotentOnExistingDB(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + firstOut := runInboxCommand(t, "--db", dbPath, "--json", "init") + secondOut := runInboxCommand(t, "--db", dbPath, "--json", "init") + + var firstResp map[string]any + var secondResp map[string]any + mustDecodeJSON(t, firstOut, &firstResp) + mustDecodeJSON(t, secondOut, &secondResp) + + if got := nestedString(t, firstResp, "data", "status"); got != "initialized" { + t.Fatalf("expected first init status initialized, got %q", got) + } + if got := nestedString(t, secondResp, "data", "status"); got != "initialized" { + t.Fatalf("expected second init status initialized, got %q", got) + } + if got := nestedString(t, firstResp, "data", "db_path"); got != dbPath { + t.Fatalf("expected first db_path %q, got %q", dbPath, got) + } + if got := nestedString(t, secondResp, "data", "db_path"); got != dbPath { + t.Fatalf("expected second db_path %q, got %q", dbPath, got) + } +} + +func initCommandTestDB(t *testing.T) string { + t.Helper() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + return dbPath +} + +func sendPendingThread(t *testing.T, dbPath, from, to, subject, summary string) string { + t.Helper() + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", from, + "--to", to, + "--subject", subject, + "--summary", summary, + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + return nestedString(t, sendResp, "data", "thread", "thread_id") +} diff --git a/internal/cli/inbox/integration_test.go b/internal/cli/inbox/integration_test.go index 59eddc6..da84f05 100644 --- a/internal/cli/inbox/integration_test.go +++ b/internal/cli/inbox/integration_test.go @@ -1,8 +1,6 @@ package inbox import ( - "bytes" - "encoding/json" "os" "path/filepath" "testing" @@ -734,74 +732,3 @@ func TestInboxJSONErrorsAndExitCodes(t *testing.T) { } assertErrorJSON(t, stdout, "invalid_input") } - -func runInboxCommand(t *testing.T, args ...string) string { - t.Helper() - - stdout, stderr, exitCode := executeInboxCommand(args...) - if exitCode != 0 { - t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, exitCode, stderr, stdout) - } - - return stdout -} - -func executeInboxCommand(args ...string) (string, string, int) { - var stdout bytes.Buffer - var stderr bytes.Buffer - exitCode := Execute(args, &stdout, &stderr) - return stdout.String(), stderr.String(), exitCode -} - -func mustDecodeJSON(t *testing.T, raw string, target any) { - t.Helper() - - if err := json.Unmarshal([]byte(raw), target); err != nil { - t.Fatalf("decode json %q: %v", raw, err) - } -} - -func nestedString(t *testing.T, value map[string]any, keys ...string) string { - t.Helper() - - current := nestedValue(t, value, keys...) - str, ok := current.(string) - if !ok { - t.Fatalf("expected string at %v, got %#v", keys, current) - } - return str -} - -func nestedValue(t *testing.T, value map[string]any, keys ...string) any { - t.Helper() - - var current any = value - for _, key := range keys { - obj, ok := current.(map[string]any) - if !ok { - t.Fatalf("expected object at %q in %v, got %#v", key, keys, current) - } - current, ok = obj[key] - if !ok { - t.Fatalf("missing key %q in %v", key, keys) - } - } - return current -} - -func assertErrorJSON(t *testing.T, raw string, expectedCode string) { - t.Helper() - - var payload map[string]any - mustDecodeJSON(t, raw, &payload) - if ok, _ := payload["ok"].(bool); ok { - t.Fatalf("expected ok=false error payload, got %#v", payload) - } - errorValue, ok := payload["error"].(map[string]any) - if !ok { - t.Fatalf("expected error object, got %#v", payload["error"]) - } - if code, _ := errorValue["code"].(string); code != expectedCode { - t.Fatalf("expected error code %q, got %#v", expectedCode, errorValue["code"]) - } -} diff --git a/internal/cli/inbox/list_integration_test.go b/internal/cli/inbox/list_integration_test.go new file mode 100644 index 0000000..cd6081e --- /dev/null +++ b/internal/cli/inbox/list_integration_test.go @@ -0,0 +1,183 @@ +package inbox + +import ( + "path/filepath" + "testing" + "time" +) + +func TestListFiltersByStatus(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + createThreadForList(t, dbPath, "leader", "worker-d", "Task pending", "Pending task") + blockedThreadID := createThreadForList(t, dbPath, "leader", "worker-d", "Task blocked", "Blocked task") + runInboxCommand(t, "--db", dbPath, "--json", "claim", "--agent", "worker-d", "--thread", blockedThreadID) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-d", + "--thread", blockedThreadID, + "--status", "blocked", + "--summary", "Need policy decision", + ) + + listOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "list", + "--agent", "worker-d", + "--status", "pending,blocked", + ) + + var listResp map[string]any + mustDecodeJSON(t, listOut, &listResp) + threads, ok := nestedValue(t, listResp, "data", "threads").([]any) + if !ok || len(threads) < 2 { + t.Fatalf("expected at least two matching threads, got %#v", nestedValue(t, listResp, "data", "threads")) + } + for _, raw := range threads { + thread, ok := raw.(map[string]any) + if !ok { + t.Fatalf("expected thread object, got %#v", raw) + } + status, _ := thread["status"].(string) + if status != "pending" && status != "blocked" { + t.Fatalf("expected status pending or blocked, got %#v", status) + } + } +} + +func TestListFiltersByCreatedBy(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + createThreadForList(t, dbPath, "leader", "worker-d", "Leader task", "From leader") + createThreadForList(t, dbPath, "planner", "worker-d", "Planner task", "From planner") + + listOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "list", + "--created-by", "leader", + "--assigned-to", "worker-d", + ) + + var listResp map[string]any + mustDecodeJSON(t, listOut, &listResp) + threads, ok := nestedValue(t, listResp, "data", "threads").([]any) + if !ok || len(threads) == 0 { + t.Fatalf("expected matching leader-created threads, got %#v", nestedValue(t, listResp, "data", "threads")) + } + for _, raw := range threads { + thread, ok := raw.(map[string]any) + if !ok { + t.Fatalf("expected thread object, got %#v", raw) + } + if got := thread["created_by"]; got != "leader" { + t.Fatalf("expected created_by leader, got %#v", got) + } + } +} + +func TestListFiltersByAssignedTo(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + createThreadForList(t, dbPath, "leader", "worker-d", "Worker D task", "For worker-d") + createThreadForList(t, dbPath, "leader", "worker-e", "Worker E task", "For worker-e") + + listOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "list", + "--assigned-to", "worker-d", + "--status", "pending", + ) + + var listResp map[string]any + mustDecodeJSON(t, listOut, &listResp) + threads, ok := nestedValue(t, listResp, "data", "threads").([]any) + if !ok || len(threads) == 0 { + t.Fatalf("expected assigned-to match, got %#v", nestedValue(t, listResp, "data", "threads")) + } + for _, raw := range threads { + thread, ok := raw.(map[string]any) + if !ok { + t.Fatalf("expected thread object, got %#v", raw) + } + if got := thread["assigned_to"]; got != "worker-d" { + t.Fatalf("expected assigned_to worker-d, got %#v", got) + } + if got := thread["status"]; got != "pending" { + t.Fatalf("expected pending status, got %#v", got) + } + } +} + +func TestListRespectsLimit(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + createThreadForList(t, dbPath, "leader", "worker-d", "Task 1", "Earlier task") + time.Sleep(20 * time.Millisecond) + createThreadForList(t, dbPath, "leader", "worker-d", "Task 2", "Latest task") + + listOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "list", + "--assigned-to", "worker-d", + "--limit", "1", + ) + + var listResp map[string]any + mustDecodeJSON(t, listOut, &listResp) + threads, ok := nestedValue(t, listResp, "data", "threads").([]any) + if !ok { + t.Fatalf("expected threads array, got %#v", nestedValue(t, listResp, "data", "threads")) + } + if len(threads) != 1 { + t.Fatalf("expected exactly one row for limit=1, got %d", len(threads)) + } + thread, ok := threads[0].(map[string]any) + if !ok { + t.Fatalf("expected thread object, got %#v", threads[0]) + } + if got := thread["subject"]; got != "Task 2" { + t.Fatalf("expected latest thread subject Task 2, got %#v", got) + } +} + +func createThreadForList(t *testing.T, dbPath, from, to, subject, summary string) string { + t.Helper() + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", from, + "--to", to, + "--subject", subject, + "--summary", summary, + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + return nestedString(t, sendResp, "data", "thread", "thread_id") +} + diff --git a/internal/cli/inbox/renew_integration_test.go b/internal/cli/inbox/renew_integration_test.go new file mode 100644 index 0000000..05ed1ee --- /dev/null +++ b/internal/cli/inbox/renew_integration_test.go @@ -0,0 +1,103 @@ +package inbox + +import "testing" + +func TestRenewExtendsActiveLease(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-c", "Renew lease", "Need renew coverage") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-c", + "--thread", threadID, + "--lease-seconds", "300", + ) + + renewOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "renew", + "--agent", "worker-c", + "--thread", threadID, + "--lease-seconds", "600", + ) + + var renewResp map[string]any + mustDecodeJSON(t, renewOut, &renewResp) + if got := nestedString(t, renewResp, "data", "thread", "status"); got != "claimed" { + t.Fatalf("expected status to stay claimed, got %q", got) + } + if got := nestedString(t, renewResp, "data", "message", "kind"); got != "event" { + t.Fatalf("expected event message kind, got %q", got) + } + if got := nestedString(t, renewResp, "data", "message", "summary"); got != "lease renewed" { + t.Fatalf("expected lease renewed summary, got %q", got) + } + payload, ok := nestedValue(t, renewResp, "data", "message", "payload_json").(map[string]any) + if !ok { + t.Fatalf("expected payload_json object, got %#v", nestedValue(t, renewResp, "data", "message", "payload_json")) + } + leaseSeconds, ok := payload["lease_seconds"].(float64) + if !ok || int(leaseSeconds) != 600 { + t.Fatalf("expected lease_seconds 600, got %#v", payload["lease_seconds"]) + } + leaseToken, _ := payload["lease_token"].(string) + if leaseToken == "" { + t.Fatalf("expected non-empty lease_token, got %#v", payload["lease_token"]) + } +} + +func TestRenewRejectsNonOwner(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-c", "Renew non-owner", "Reject non-owner renew") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-c", + "--thread", threadID, + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "renew", + "--agent", "worker-x", + "--thread", threadID, + "--lease-seconds", "600", + ) + if exitCode != 20 { + t.Fatalf("expected exit code 20, got %d", exitCode) + } + assertErrorJSON(t, stdout, "lease_conflict") +} + +func TestRenewRejectsWithoutActiveLease(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-c", "Renew without lease", "Should fail without active lease") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "renew", + "--agent", "worker-c", + "--thread", threadID, + "--lease-seconds", "600", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d", exitCode) + } + assertErrorJSON(t, stdout, "invalid_state") +} diff --git a/internal/cli/inbox/reply_integration_test.go b/internal/cli/inbox/reply_integration_test.go new file mode 100644 index 0000000..3f68a62 --- /dev/null +++ b/internal/cli/inbox/reply_integration_test.go @@ -0,0 +1,138 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func TestReplyAddsAnswerMessage(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a") + + replyOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-a", + "--thread", threadID, + "--summary", "Retry read timeouts", + "--body", "Yes, include read timeouts in the retry policy.", + ) + + var replyResp map[string]any + mustDecodeJSON(t, replyOut, &replyResp) + if kind := nestedString(t, replyResp, "data", "message", "kind"); kind != "answer" { + t.Fatalf("expected answer message kind, got %q", kind) + } + if gotThreadID := nestedString(t, replyResp, "data", "thread", "thread_id"); gotThreadID != threadID { + t.Fatalf("expected thread_id %q, got %q", threadID, gotThreadID) + } + if status := nestedString(t, replyResp, "data", "thread", "status"); status != "pending" { + t.Fatalf("expected thread status pending, got %q", status) + } +} + +func TestReplySupportsControlKind(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a") + + replyOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-a", + "--thread", threadID, + "--kind", "control", + "--summary", "Pause rollout", + "--body", "Pause rollout until QA confirms the fix.", + ) + + var replyResp map[string]any + mustDecodeJSON(t, replyOut, &replyResp) + if kind := nestedString(t, replyResp, "data", "message", "kind"); kind != "control" { + t.Fatalf("expected control message kind, got %q", kind) + } +} + +func TestReplyAttachesArtifact(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + decisionPath := filepath.Join(tempDir, "decision.md") + if err := os.WriteFile(decisionPath, []byte("Decision note."), 0o644); err != nil { + t.Fatalf("write decision file: %v", err) + } + + threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a") + + replyOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-a", + "--thread", threadID, + "--summary", "Retry read timeouts", + "--artifact", decisionPath, + "--artifact-kind", "brief", + "--artifact-metadata-json", `{"label":"decision"}`, + ) + + var replyResp map[string]any + mustDecodeJSON(t, replyOut, &replyResp) + artifactsValue := nestedValue(t, replyResp, "data", "message", "artifacts") + artifacts, ok := artifactsValue.([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %#v", artifactsValue) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if gotPath, _ := artifact["path"].(string); gotPath != decisionPath { + t.Fatalf("expected artifact path %q, got %#v", decisionPath, artifact["path"]) + } + if gotKind, _ := artifact["kind"].(string); gotKind != "brief" { + t.Fatalf("expected artifact kind brief, got %#v", artifact["kind"]) + } + metadata, ok := artifact["metadata_json"].(map[string]any) + if !ok { + t.Fatalf("expected metadata_json object, got %#v", artifact["metadata_json"]) + } + if gotLabel := metadata["label"]; gotLabel != "decision" { + t.Fatalf("expected metadata label decision, got %#v", gotLabel) + } +} + +func TestReplyRejectsInvalidPayloadJSON(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-a", + "--thread", threadID, + "--summary", "Retry read timeouts", + "--payload-json", "not-json", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "invalid_input") +} diff --git a/internal/cli/inbox/send_integration_test.go b/internal/cli/inbox/send_integration_test.go new file mode 100644 index 0000000..ef167e4 --- /dev/null +++ b/internal/cli/inbox/send_integration_test.go @@ -0,0 +1,230 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func TestSendCreatesNewThread(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--subject", "Implement feature X", + "--summary", "Add retry policy", + "--body", "Implement retry handling for the HTTP client.", + "--run", "run_blog_001", + "--task", "T1", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + + if got := nestedString(t, sendResp, "data", "thread", "thread_id"); got == "" { + t.Fatalf("expected thread_id, got empty") + } + if got := nestedString(t, sendResp, "data", "thread", "status"); got != "pending" { + t.Fatalf("expected pending status, got %q", got) + } + if got := nestedString(t, sendResp, "data", "thread", "created_by"); got != "leader" { + t.Fatalf("expected created_by leader, got %q", got) + } + if got := nestedString(t, sendResp, "data", "thread", "assigned_to"); got != "worker-a" { + t.Fatalf("expected assigned_to worker-a, got %q", got) + } + if got := nestedString(t, sendResp, "data", "message", "kind"); got != "task" { + t.Fatalf("expected message kind task, got %q", got) + } +} + +func TestSendAppendsMessageToExistingThread(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + threadID := sendPendingThread(t, dbPath, "leader", "worker-d", "Build editor", "Create editor v1") + + appendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-d", + "--thread", threadID, + "--summary", "Use a markdown editor", + "--body", "Prefer a textarea-based markdown editor for v1.", + ) + + var appendResp map[string]any + mustDecodeJSON(t, appendOut, &appendResp) + if got := nestedString(t, appendResp, "data", "thread", "thread_id"); got != threadID { + t.Fatalf("expected same thread_id %q, got %q", threadID, got) + } + if got := nestedString(t, appendResp, "data", "thread", "status"); got != "pending" { + t.Fatalf("expected thread status to stay pending, got %q", got) + } + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + messages, ok := nestedValue(t, showResp, "data", "messages").([]any) + if !ok || len(messages) != 2 { + t.Fatalf("expected two messages after append, got %#v", nestedValue(t, showResp, "data", "messages")) + } +} + +func TestSendReadsBodyFromBodyFile(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + bodyPath := filepath.Join(tempDir, "task.md") + bodyContent := "Create the first editor screen.\nUse markdown syntax." + if err := os.WriteFile(bodyPath, []byte(bodyContent), 0o644); err != nil { + t.Fatalf("write body file: %v", err) + } + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-d", + "--subject", "Build admin editor", + "--summary", "Create the first editor screen", + "--body-file", bodyPath, + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + messages, ok := nestedValue(t, showResp, "data", "messages").([]any) + if !ok || len(messages) != 1 { + t.Fatalf("expected one message, got %#v", nestedValue(t, showResp, "data", "messages")) + } + message, ok := messages[0].(map[string]any) + if !ok { + t.Fatalf("expected message object, got %#v", messages[0]) + } + if got, _ := message["body"].(string); got != bodyContent { + t.Fatalf("expected body %q, got %#v", bodyContent, message["body"]) + } +} + +func TestSendAttachesArtifactWithMetadata(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + artifactPath := filepath.Join(tempDir, "task.md") + if err := os.WriteFile(artifactPath, []byte("task brief"), 0o644); err != nil { + t.Fatalf("write artifact file: %v", err) + } + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-d", + "--subject", "Build admin editor", + "--summary", "Create the first editor screen", + "--artifact", artifactPath, + "--artifact-kind", "brief", + "--artifact-metadata-json", `{"label":"task-brief"}`, + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + + artifacts, ok := nestedValue(t, sendResp, "data", "message", "artifacts").([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %#v", nestedValue(t, sendResp, "data", "message", "artifacts")) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if got, _ := artifact["path"].(string); got != artifactPath { + t.Fatalf("expected artifact path %q, got %#v", artifactPath, artifact["path"]) + } + if got, _ := artifact["kind"].(string); got != "brief" { + t.Fatalf("expected artifact kind brief, got %#v", artifact["kind"]) + } + metadata, ok := artifact["metadata_json"].(map[string]any) + if !ok { + t.Fatalf("expected metadata_json object, got %#v", artifact["metadata_json"]) + } + if got, _ := metadata["label"].(string); got != "task-brief" { + t.Fatalf("expected metadata_json.label task-brief, got %#v", metadata["label"]) + } +} + +func TestSendRejectsInvalidPayloadJSON(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-z", + "--subject", "Invalid payload json", + "--payload-json", "not-json", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d", exitCode) + } + assertErrorJSON(t, stdout, "invalid_input") +} + +func TestSendRejectsInvalidArtifactMetadataJSON(t *testing.T) { + t.Parallel() + + dbPath := initCommandTestDB(t) + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-z", + "--subject", "Invalid artifact json", + "--artifact", "/tmp/report.md", + "--artifact-metadata-json", "not-json", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d", exitCode) + } + assertErrorJSON(t, stdout, "invalid_input") +} diff --git a/internal/cli/inbox/show_integration_test.go b/internal/cli/inbox/show_integration_test.go new file mode 100644 index 0000000..f22868c --- /dev/null +++ b/internal/cli/inbox/show_integration_test.go @@ -0,0 +1,196 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func TestShowReturnsThreadAndMessageHistory(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--subject", "Implement feature X", + "--summary", "Initial request", + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--thread", threadID, + "--summary", "Follow-up request", + "--body", "Please include request logging.", + ) + + showOut := runInboxCommand(t, "--db", dbPath, "--json", "show", "--thread", threadID) + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + if got := nestedString(t, showResp, "data", "thread", "thread_id"); got != threadID { + t.Fatalf("expected thread %q, got %q", threadID, got) + } + + messages, ok := nestedValue(t, showResp, "data", "messages").([]any) + if !ok || len(messages) != 2 { + t.Fatalf("expected two ordered messages, got %#v", nestedValue(t, showResp, "data", "messages")) + } + first, ok := messages[0].(map[string]any) + if !ok { + t.Fatalf("expected first message object, got %#v", messages[0]) + } + second, ok := messages[1].(map[string]any) + if !ok { + t.Fatalf("expected second message object, got %#v", messages[1]) + } + if got := first["summary"]; got != "Initial request" { + t.Fatalf("expected first summary Initial request, got %#v", got) + } + if got := second["summary"]; got != "Follow-up request" { + t.Fatalf("expected second summary Follow-up request, got %#v", got) + } +} + +func TestShowIncludesArtifactsPerMessage(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + artifactPath := filepath.Join(tempDir, "task.md") + if err := os.WriteFile(artifactPath, []byte("task brief"), 0o644); err != nil { + t.Fatalf("write artifact file: %v", err) + } + + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--subject", "Artifact task", + "--summary", "Attach brief", + "--artifact", artifactPath, + "--artifact-kind", "brief", + "--artifact-metadata-json", `{"label":"task-brief"}`, + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + showOut := runInboxCommand(t, "--db", dbPath, "--json", "show", "--thread", threadID) + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + + messages, ok := nestedValue(t, showResp, "data", "messages").([]any) + if !ok || len(messages) == 0 { + t.Fatalf("expected messages with artifacts, got %#v", nestedValue(t, showResp, "data", "messages")) + } + first, ok := messages[0].(map[string]any) + if !ok { + t.Fatalf("expected message object, got %#v", messages[0]) + } + artifacts, ok := first["artifacts"].([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %#v", first["artifacts"]) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if got := artifact["path"]; got != artifactPath { + t.Fatalf("expected artifact path %q, got %#v", artifactPath, got) + } + if got := artifact["kind"]; got != "brief" { + t.Fatalf("expected artifact kind brief, got %#v", got) + } +} + +func TestShowMarkReadAdvancesReadCursor(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-e", + "--subject", "Review nav copy", + "--summary", "Check wording", + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + + runInboxCommand( + t, + "--db", dbPath, + "--agent", "worker-e", + "--json", + "show", + "--thread", threadID, + "--mark-read", + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + if exitCode != 10 { + t.Fatalf("expected unread fetch to be empty after mark-read, got exit=%d with %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "no_matching_work") +} + +func TestShowRejectsWhenThreadMissing(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "show", + "--thread", "thr_missing", + ) + if exitCode != 40 { + t.Fatalf("expected not-found exit code 40, got %d with %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "not_found") +} + diff --git a/internal/cli/inbox/test_helpers_test.go b/internal/cli/inbox/test_helpers_test.go new file mode 100644 index 0000000..518882c --- /dev/null +++ b/internal/cli/inbox/test_helpers_test.go @@ -0,0 +1,78 @@ +package inbox + +import ( + "bytes" + "encoding/json" + "testing" +) + +func runInboxCommand(t *testing.T, args ...string) string { + t.Helper() + + stdout, stderr, exitCode := executeInboxCommand(args...) + if exitCode != 0 { + t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, exitCode, stderr, stdout) + } + + return stdout +} + +func executeInboxCommand(args ...string) (string, string, int) { + var stdout bytes.Buffer + var stderr bytes.Buffer + exitCode := Execute(args, &stdout, &stderr) + return stdout.String(), stderr.String(), exitCode +} + +func mustDecodeJSON(t *testing.T, raw string, target any) { + t.Helper() + + if err := json.Unmarshal([]byte(raw), target); err != nil { + t.Fatalf("decode json %q: %v", raw, err) + } +} + +func nestedString(t *testing.T, value map[string]any, keys ...string) string { + t.Helper() + + current := nestedValue(t, value, keys...) + str, ok := current.(string) + if !ok { + t.Fatalf("expected string at %v, got %#v", keys, current) + } + return str +} + +func nestedValue(t *testing.T, value map[string]any, keys ...string) any { + t.Helper() + + var current any = value + for _, key := range keys { + obj, ok := current.(map[string]any) + if !ok { + t.Fatalf("expected object at %q in %v, got %#v", key, keys, current) + } + current, ok = obj[key] + if !ok { + t.Fatalf("missing key %q in %v", key, keys) + } + } + return current +} + +func assertErrorJSON(t *testing.T, raw string, expectedCode string) { + t.Helper() + + var payload map[string]any + mustDecodeJSON(t, raw, &payload) + if ok, _ := payload["ok"].(bool); ok { + t.Fatalf("expected ok=false error payload, got %#v", payload) + } + errorValue, ok := payload["error"].(map[string]any) + if !ok { + t.Fatalf("expected error object, got %#v", payload["error"]) + } + if code, _ := errorValue["code"].(string); code != expectedCode { + t.Fatalf("expected error code %q, got %#v", expectedCode, errorValue["code"]) + } +} diff --git a/internal/cli/inbox/update_integration_test.go b/internal/cli/inbox/update_integration_test.go new file mode 100644 index 0000000..b0185b8 --- /dev/null +++ b/internal/cli/inbox/update_integration_test.go @@ -0,0 +1,226 @@ +package inbox + +import ( + "os" + "path/filepath" + "testing" +) + +func seedThreadForInboxTests(t *testing.T, dbPath, from, to string) string { + t.Helper() + + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", from, + "--to", to, + "--subject", "Implement feature X", + "--summary", "Add retry policy", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + return nestedString(t, sendResp, "data", "thread", "thread_id") +} + +func seedClaimedThreadForInboxTests(t *testing.T, dbPath, from, to, claimer string) string { + t.Helper() + + threadID := seedThreadForInboxTests(t, dbPath, from, to) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", claimer, + "--thread", threadID, + "--lease-seconds", "300", + ) + return threadID +} + +func lastThreadMessageFromShow(t *testing.T, showResp map[string]any) map[string]any { + t.Helper() + + messagesValue := nestedValue(t, showResp, "data", "messages") + messages, ok := messagesValue.([]any) + if !ok || len(messages) == 0 { + t.Fatalf("expected non-empty messages, got %#v", messagesValue) + } + + lastMessage, ok := messages[len(messages)-1].(map[string]any) + if !ok { + t.Fatalf("expected message object, got %#v", messages[len(messages)-1]) + } + return lastMessage +} + +func TestUpdateMovesThreadToInProgress(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + updateOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-a", + "--thread", threadID, + "--status", "in_progress", + "--summary", "Implementation started", + "--body", "Scanning current HTTP client usage.", + ) + + var updateResp map[string]any + mustDecodeJSON(t, updateOut, &updateResp) + if status := nestedString(t, updateResp, "data", "thread", "status"); status != "in_progress" { + t.Fatalf("expected in_progress thread status, got %q", status) + } + if kind := nestedString(t, updateResp, "data", "message", "kind"); kind != "progress" { + t.Fatalf("expected progress message kind, got %q", kind) + } + if toAgent := nestedString(t, updateResp, "data", "message", "to_agent"); toAgent != "leader" { + t.Fatalf("expected message to_agent leader, got %q", toAgent) + } +} + +func TestUpdateMovesThreadToBlockedWithPayload(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + updateOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-a", + "--thread", threadID, + "--status", "blocked", + "--summary", "Need timeout decision", + "--payload-json", `{"question":"Should retries apply to read timeouts?"}`, + ) + + var updateResp map[string]any + mustDecodeJSON(t, updateOut, &updateResp) + if status := nestedString(t, updateResp, "data", "thread", "status"); status != "blocked" { + t.Fatalf("expected blocked thread status, got %q", status) + } + if kind := nestedString(t, updateResp, "data", "message", "kind"); kind != "question" { + t.Fatalf("expected question message kind, got %q", kind) + } + payload, ok := nestedValue(t, updateResp, "data", "message", "payload_json").(map[string]any) + if !ok { + t.Fatalf("expected payload_json object, got %#v", nestedValue(t, updateResp, "data", "message", "payload_json")) + } + if got := payload["question"]; got != "Should retries apply to read timeouts?" { + t.Fatalf("expected payload question, got %#v", got) + } +} + +func TestUpdateAcceptsBodyFileAndArtifact(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "coord.db") + progressPath := filepath.Join(tempDir, "progress.md") + body := "Progress update from file." + if err := os.WriteFile(progressPath, []byte(body), 0o644); err != nil { + t.Fatalf("write progress file: %v", err) + } + + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-a", + "--thread", threadID, + "--status", "in_progress", + "--summary", "Implementation started", + "--body-file", progressPath, + "--artifact", progressPath, + "--artifact-kind", "note", + ) + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + lastMessage := lastThreadMessageFromShow(t, showResp) + + if gotBody, _ := lastMessage["body"].(string); gotBody != body { + t.Fatalf("expected body %q, got %#v", body, lastMessage["body"]) + } + artifacts, ok := lastMessage["artifacts"].([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %#v", lastMessage["artifacts"]) + } + artifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if gotPath, _ := artifact["path"].(string); gotPath != progressPath { + t.Fatalf("expected artifact path %q, got %#v", progressPath, artifact["path"]) + } + if gotKind, _ := artifact["kind"].(string); gotKind != "note" { + t.Fatalf("expected artifact kind note, got %#v", artifact["kind"]) + } +} + +func TestUpdateRejectsInvalidPayloadJSON(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "update", + "--agent", "worker-a", + "--thread", threadID, + "--status", "blocked", + "--summary", "Need timeout decision", + "--payload-json", "not-json", + ) + if exitCode != 30 { + t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "invalid_input") +} + +func TestUpdateRejectsNonOwner(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "update", + "--agent", "worker-b", + "--thread", threadID, + "--status", "in_progress", + "--summary", "Implementation started", + ) + if exitCode != 20 { + t.Fatalf("expected exit code 20, got %d with output %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "lease_conflict") +} diff --git a/internal/cli/inbox/wait_reply_integration_test.go b/internal/cli/inbox/wait_reply_integration_test.go new file mode 100644 index 0000000..a2b31a1 --- /dev/null +++ b/internal/cli/inbox/wait_reply_integration_test.go @@ -0,0 +1,221 @@ +package inbox + +import ( + "path/filepath" + "strconv" + "testing" + "time" +) + +type waitReplyCommandResult struct { + stdout string + stderr string + exit int +} + +func TestWaitReplyWakesOnAnswerAfterMessage(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath) + + waitCh := make(chan waitReplyCommandResult, 1) + go func() { + stdout, stderr, exitCode := executeInboxCommand( + "--db", dbPath, + "--agent", "worker-c", + "--json", + "wait-reply", + "--thread", threadID, + "--after-message", blockedMessageID, + "--timeout-seconds", "2", + ) + waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode} + }() + + time.Sleep(200 * time.Millisecond) + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-c", + "--thread", threadID, + "--summary", "Redirect to login", + "--body", "Redirect guests to login for the MVP.", + ) + + var waitResult waitReplyCommandResult + select { + case waitResult = <-waitCh: + case <-time.After(3 * time.Second): + t.Fatal("wait-reply command did not return") + } + if waitResult.exit != 0 { + t.Fatalf("wait-reply failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout) + } + + var waitResp map[string]any + mustDecodeJSON(t, waitResult.stdout, &waitResp) + if woke, ok := nestedValue(t, waitResp, "data", "woke").(bool); !ok || !woke { + t.Fatalf("expected wait-reply wake, got %#v", nestedValue(t, waitResp, "data", "woke")) + } + if kind := nestedString(t, waitResp, "data", "message", "kind"); kind != "answer" { + t.Fatalf("expected answer wake message, got %q", kind) + } +} + +func TestWaitReplyCanStartFromAfterEvent(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath) + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-c", + "--thread", threadID, + "--summary", "First answer", + "--body", "First reply payload.", + ) + + firstWaitOut := runInboxCommand( + t, + "--db", dbPath, + "--agent", "worker-c", + "--json", + "wait-reply", + "--thread", threadID, + "--after-message", blockedMessageID, + "--timeout-seconds", "2", + ) + var firstWaitResp map[string]any + mustDecodeJSON(t, firstWaitOut, &firstWaitResp) + firstEventIDFloat, ok := nestedValue(t, firstWaitResp, "data", "next_event_id").(float64) + if !ok { + t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, firstWaitResp, "data", "next_event_id")) + } + firstEventID := int64(firstEventIDFloat) + + waitCh := make(chan waitReplyCommandResult, 1) + go func() { + stdout, stderr, exitCode := executeInboxCommand( + "--db", dbPath, + "--agent", "worker-c", + "--json", + "wait-reply", + "--thread", threadID, + "--after-event", strconv.FormatInt(firstEventID, 10), + "--timeout-seconds", "2", + ) + waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode} + }() + + time.Sleep(200 * time.Millisecond) + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "reply", + "--from", "leader", + "--to", "worker-c", + "--thread", threadID, + "--summary", "Second answer", + "--body", "Second reply payload.", + ) + + var waitResult waitReplyCommandResult + select { + case waitResult = <-waitCh: + case <-time.After(3 * time.Second): + t.Fatal("wait-reply after-event command did not return") + } + if waitResult.exit != 0 { + t.Fatalf("wait-reply after-event failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout) + } + + var waitResp map[string]any + mustDecodeJSON(t, waitResult.stdout, &waitResp) + if got := nestedString(t, waitResp, "data", "message", "summary"); got != "Second answer" { + t.Fatalf("expected second answer wake message, got %q", got) + } + secondEventIDFloat, ok := nestedValue(t, waitResp, "data", "next_event_id").(float64) + if !ok { + t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, waitResp, "data", "next_event_id")) + } + if int64(secondEventIDFloat) <= firstEventID { + t.Fatalf("expected second event id > first event id, got %d <= %d", int64(secondEventIDFloat), firstEventID) + } +} + +func TestWaitReplyTimesOutWhenNoReply(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + threadID, _ := seedBlockedThreadForWaitReply(t, dbPath) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--agent", "worker-c", + "--json", + "wait-reply", + "--thread", threadID, + "--timeout-seconds", "1", + ) + if exitCode != 10 { + t.Fatalf("expected wait-reply timeout exit code 10, got %d with %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "no_matching_work") +} + +func seedBlockedThreadForWaitReply(t *testing.T, dbPath string) (threadID string, blockedMessageID string) { + t.Helper() + + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-c", + "--subject", "Investigate auth edge case", + "--summary", "Check auth redirect behavior", + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID = nestedString(t, sendResp, "data", "thread", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-c", + "--thread", threadID, + ) + + blockedOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-c", + "--thread", threadID, + "--status", "blocked", + "--summary", "Need policy decision", + ) + var blockedResp map[string]any + mustDecodeJSON(t, blockedOut, &blockedResp) + blockedMessageID = nestedString(t, blockedResp, "data", "message", "message_id") + return threadID, blockedMessageID +} + diff --git a/internal/cli/inbox/watch_integration_test.go b/internal/cli/inbox/watch_integration_test.go new file mode 100644 index 0000000..8d87eb5 --- /dev/null +++ b/internal/cli/inbox/watch_integration_test.go @@ -0,0 +1,171 @@ +package inbox + +import ( + "path/filepath" + "testing" + "time" +) + +type watchCommandResult struct { + stdout string + stderr string + exit int +} + +func TestWatchWakesOnMatchingThread(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + watchCh := make(chan watchCommandResult, 1) + go func() { + stdout, stderr, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "watch", + "--agent", "worker-d", + "--status", "pending", + "--timeout-seconds", "2", + ) + watchCh <- watchCommandResult{stdout: stdout, stderr: stderr, exit: exitCode} + }() + + time.Sleep(200 * time.Millisecond) + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-d", + "--subject", "Build admin editor", + "--summary", "Create the first editor screen", + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + var watchResult watchCommandResult + select { + case watchResult = <-watchCh: + case <-time.After(3 * time.Second): + t.Fatal("watch command did not return") + } + if watchResult.exit != 0 { + t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", watchResult.exit, watchResult.stderr, watchResult.stdout) + } + + var watchResp map[string]any + mustDecodeJSON(t, watchResult.stdout, &watchResp) + if woke, ok := nestedValue(t, watchResp, "data", "woke").(bool); !ok || !woke { + t.Fatalf("expected watch to wake, got %#v", nestedValue(t, watchResp, "data", "woke")) + } + if got := nestedString(t, watchResp, "data", "thread", "thread_id"); got != threadID { + t.Fatalf("expected woken thread %q, got %q", threadID, got) + } + nextEventID, ok := nestedValue(t, watchResp, "data", "next_event_id").(float64) + if !ok { + t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, watchResp, "data", "next_event_id")) + } + eventID, ok := nestedValue(t, watchResp, "data", "event", "event_id").(float64) + if !ok { + t.Fatalf("expected numeric event_id, got %#v", nestedValue(t, watchResp, "data", "event", "event_id")) + } + if nextEventID != eventID { + t.Fatalf("expected next_event_id == event.event_id, got %v vs %v", nextEventID, eventID) + } +} + +func TestWatchRespectsStatusFilter(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-c", + "--subject", "Investigate policy edge case", + "--summary", "Initial request", + ) + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + watchCh := make(chan watchCommandResult, 1) + go func() { + stdout, stderr, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "watch", + "--agent", "worker-c", + "--status", "blocked", + "--timeout-seconds", "2", + ) + watchCh <- watchCommandResult{stdout: stdout, stderr: stderr, exit: exitCode} + }() + + time.Sleep(200 * time.Millisecond) + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-c", + "--thread", threadID, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "update", + "--agent", "worker-c", + "--thread", threadID, + "--status", "blocked", + "--summary", "Need policy decision", + ) + + var watchResult watchCommandResult + select { + case watchResult = <-watchCh: + case <-time.After(3 * time.Second): + t.Fatal("watch command did not return") + } + if watchResult.exit != 0 { + t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", watchResult.exit, watchResult.stderr, watchResult.stdout) + } + + var watchResp map[string]any + mustDecodeJSON(t, watchResult.stdout, &watchResp) + if status := nestedString(t, watchResp, "data", "thread", "status"); status != "blocked" { + t.Fatalf("expected blocked status wake, got %q", status) + } +} + +func TestWatchTimesOutWithNoActivity(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + runInboxCommand(t, "--db", dbPath, "--json", "init") + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "watch", + "--agent", "worker-d", + "--status", "pending", + "--timeout-seconds", "1", + ) + if exitCode != 10 { + t.Fatalf("expected watch timeout exit code 10, got %d with %s", exitCode, stdout) + } + assertErrorJSON(t, stdout, "no_matching_work") +} +