refactor(monorepo): extract inbox and orch runtimes

This commit is contained in:
2026-03-20 13:08:33 +08:00
parent 1938eb8f07
commit 9b8e886289
78 changed files with 10516 additions and 77 deletions
+11
View File
@@ -0,0 +1,11 @@
package main
import (
"os"
inboxcli "ai-workflow-skill/packages/inbox-runtime/internal/cli/inbox"
)
func main() {
os.Exit(inboxcli.Execute(os.Args[1:], os.Stdout, os.Stderr))
}
+4
View File
@@ -1,3 +1,7 @@
module ai-workflow-skill/packages/inbox-runtime
go 1.26
require (
github.com/spf13/cobra v1.10.1
)
@@ -0,0 +1,78 @@
package inbox
import (
"strings"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type artifactOptions struct {
paths []string
kinds []string
metadataJSONs []string
}
func addArtifactFlags(cmd *cobra.Command, opts *artifactOptions) {
cmd.Flags().StringArrayVar(&opts.paths, "artifact", nil, "Artifact path to attach; may be repeated")
cmd.Flags().StringArrayVar(&opts.kinds, "artifact-kind", nil, "Artifact kind; one value applies to all, or match artifact count")
cmd.Flags().StringArrayVar(&opts.metadataJSONs, "artifact-metadata-json", nil, "Artifact metadata JSON; one value applies to all, or match artifact count")
}
func resolveArtifacts(opts artifactOptions) ([]store.ArtifactInput, error) {
if len(opts.paths) == 0 {
if len(opts.kinds) > 0 || len(opts.metadataJSONs) > 0 {
return nil, protocol.InvalidInput("artifact-kind and artifact-metadata-json require at least one artifact path", nil)
}
return nil, nil
}
kinds, err := expandArtifactValues(opts.kinds, len(opts.paths), "artifact-kind")
if err != nil {
return nil, err
}
metadataJSONs, err := expandArtifactValues(opts.metadataJSONs, len(opts.paths), "artifact-metadata-json")
if err != nil {
return nil, err
}
artifacts := make([]store.ArtifactInput, 0, len(opts.paths))
for i, path := range opts.paths {
if strings.TrimSpace(path) == "" {
return nil, protocol.InvalidInput("artifact path cannot be empty", nil)
}
artifact := store.ArtifactInput{
Path: path,
Kind: "file",
}
if len(kinds) > 0 {
artifact.Kind = kinds[i]
}
if len(metadataJSONs) > 0 {
artifact.MetadataJSON = metadataJSONs[i]
}
artifacts = append(artifacts, artifact)
}
return artifacts, nil
}
func expandArtifactValues(values []string, target int, flagName string) ([]string, error) {
switch len(values) {
case 0:
return nil, nil
case 1:
out := make([]string, target)
for i := range out {
out[i] = values[0]
}
return out, nil
case target:
return values, nil
default:
return nil, protocol.InvalidInput(flagName+" must be specified once or once per artifact", nil)
}
}
@@ -0,0 +1,22 @@
package inbox
import (
"os"
"ai-workflow-skill/packages/coord-core/protocol"
)
func resolveBodyValue(body, bodyFile string) (string, error) {
if body != "" && bodyFile != "" {
return "", protocol.InvalidInput("body and body-file are mutually exclusive", nil)
}
if bodyFile == "" {
return body, nil
}
content, err := os.ReadFile(bodyFile)
if err != nil {
return "", protocol.InvalidInput("failed to read body-file", err)
}
return string(content), nil
}
@@ -0,0 +1,83 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type cancelOptions struct {
agent string
threadID string
reason string
artifacts artifactOptions
}
func newCancelCmd(root *rootOptions) *cobra.Command {
opts := &cancelOptions{}
cmd := &cobra.Command{
Use: "cancel",
Short: "Cancel a thread",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
if agent == "" {
return protocol.InvalidInput("agent is required", nil)
}
artifacts, err := resolveArtifacts(opts.artifacts)
if err != nil {
return err
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
thread, message, err := s.CancelThread(ctx, store.CancelInput{
ThreadID: opts.threadID,
Agent: agent,
Reason: opts.reason,
Artifacts: artifacts,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "cancel",
Data: map[string]any{
"thread": thread,
"message": message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "cancelled thread %s\n", thread.ThreadID)
return err
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Acting agent")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().StringVar(&opts.reason, "reason", "", "Cancellation reason")
addArtifactFlags(cmd, &opts.artifacts)
_ = cmd.MarkFlagRequired("thread")
return cmd
}
@@ -0,0 +1,148 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func TestCancelMarksThreadCancelled(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--subject", "Implement cancellation",
"--summary", "Initial request",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
cancelOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"cancel",
"--agent", "leader",
"--thread", threadID,
"--reason", "Task superseded by a larger refactor",
)
var cancelResp map[string]any
mustDecodeJSON(t, cancelOut, &cancelResp)
if status := nestedString(t, cancelResp, "data", "thread", "status"); status != "cancelled" {
t.Fatalf("expected cancelled thread, got %q", status)
}
if kind := nestedString(t, cancelResp, "data", "message", "kind"); kind != "control" {
t.Fatalf("expected control message, got %q", kind)
}
}
func TestCancelPersistsReasonAndArtifact(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
cancelPath := filepath.Join(tempDir, "cancel.md")
if err := os.WriteFile(cancelPath, []byte("Cancelled by product decision"), 0o644); err != nil {
t.Fatalf("write cancel artifact: %v", err)
}
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--subject", "Implement cancellation",
"--summary", "Initial request",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"cancel",
"--agent", "leader",
"--thread", threadID,
"--reason", "Task superseded by a larger refactor",
"--artifact", cancelPath,
"--artifact-kind", "brief",
)
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
messages, ok := nestedValue(t, showResp, "data", "messages").([]any)
if !ok || len(messages) == 0 {
t.Fatalf("expected non-empty message history, got %#v", nestedValue(t, showResp, "data", "messages"))
}
lastMessage, ok := messages[len(messages)-1].(map[string]any)
if !ok {
t.Fatalf("expected message object, got %#v", messages[len(messages)-1])
}
if got := lastMessage["summary"]; got != "Task superseded by a larger refactor" {
t.Fatalf("expected cancel summary, got %#v", got)
}
if got := lastMessage["body"]; got != "Task superseded by a larger refactor" {
t.Fatalf("expected cancel body, got %#v", got)
}
artifacts, ok := lastMessage["artifacts"].([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one cancel artifact, got %#v", lastMessage["artifacts"])
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if got := artifact["path"]; got != cancelPath {
t.Fatalf("expected artifact path %q, got %#v", cancelPath, got)
}
if got := artifact["kind"]; got != "brief" {
t.Fatalf("expected artifact kind brief, got %#v", got)
}
}
func TestCancelRejectsWhenThreadMissing(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--agent", "leader",
"--json",
"cancel",
"--thread", "thr_missing",
)
if exitCode != 40 {
t.Fatalf("expected not-found exit code 40, got %d with %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "not_found")
}
@@ -0,0 +1,80 @@
package inbox
import (
"errors"
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type claimOptions struct {
agent string
threadID string
leaseSeconds int
}
func newClaimCmd(root *rootOptions) *cobra.Command {
opts := &claimOptions{}
cmd := &cobra.Command{
Use: "claim",
Short: "Acquire a lease on a pending thread",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
if agent == "" {
return protocol.InvalidInput("agent is required", nil)
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
result, err := s.ClaimThread(ctx, store.ClaimInput{
ThreadID: opts.threadID,
Agent: agent,
LeaseSeconds: opts.leaseSeconds,
})
if err != nil {
if errors.Is(err, store.ErrLeaseConflict) {
return fmt.Errorf("lease conflict: %w", err)
}
return err
}
resp := protocol.Success{
OK: true,
Command: "claim",
Data: map[string]any{
"thread": result.Thread,
"message": result.Message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "claimed thread %s\n", result.Thread.ThreadID)
return err
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Claiming agent")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().IntVar(&opts.leaseSeconds, "lease-seconds", 900, "Lease duration in seconds")
_ = cmd.MarkFlagRequired("thread")
return cmd
}
@@ -0,0 +1,112 @@
package inbox
import "testing"
func TestClaimAcquiresThreadLease(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Race claim", "Claim this task")
claimOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-a",
"--thread", threadID,
"--lease-seconds", "300",
)
var claimResp map[string]any
mustDecodeJSON(t, claimOut, &claimResp)
if got := nestedString(t, claimResp, "data", "thread", "status"); got != "claimed" {
t.Fatalf("expected claimed status, got %q", got)
}
if got := nestedString(t, claimResp, "data", "thread", "assigned_to"); got != "worker-a" {
t.Fatalf("expected assigned_to worker-a, got %q", got)
}
if got := nestedString(t, claimResp, "data", "message", "kind"); got != "event" {
t.Fatalf("expected event message kind, got %q", got)
}
if got := nestedString(t, claimResp, "data", "message", "summary"); got != "thread claimed" {
t.Fatalf("expected summary thread claimed, got %q", got)
}
}
func TestClaimRejectsWhenThreadMissing(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-z",
"--thread", "thr_missing",
)
if exitCode != 40 {
t.Fatalf("expected exit code 40, got %d", exitCode)
}
assertErrorJSON(t, stdout, "not_found")
}
func TestClaimRejectsWhenThreadAlreadyClaimed(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-z", "Claimed task", "Already claimed")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-z",
"--thread", threadID,
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-y",
"--thread", threadID,
)
if exitCode != 20 {
t.Fatalf("expected exit code 20, got %d", exitCode)
}
assertErrorJSON(t, stdout, "lease_conflict")
}
func TestClaimRecordsRequestedLeaseDuration(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Lease payload", "Verify lease payload")
claimOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-a",
"--thread", threadID,
"--lease-seconds", "300",
)
var claimResp map[string]any
mustDecodeJSON(t, claimOut, &claimResp)
payload, ok := nestedValue(t, claimResp, "data", "message", "payload_json").(map[string]any)
if !ok {
t.Fatalf("expected payload_json object, got %#v", nestedValue(t, claimResp, "data", "message", "payload_json"))
}
leaseSeconds, ok := payload["lease_seconds"].(float64)
if !ok || int(leaseSeconds) != 300 {
t.Fatalf("expected lease_seconds 300, got %#v", payload["lease_seconds"])
}
leaseToken, _ := payload["lease_token"].(string)
if leaseToken == "" {
t.Fatalf("expected non-empty lease_token, got %#v", payload["lease_token"])
}
}
@@ -0,0 +1,22 @@
package inbox
import (
"context"
"database/sql"
"ai-workflow-skill/packages/coord-core/db"
)
func openInboxDB(ctx context.Context, dbPath string) (*sql.DB, error) {
sqlDB, err := db.Open(ctx, dbPath)
if err != nil {
return nil, err
}
if err := db.ApplyMigrations(ctx, sqlDB); err != nil {
_ = sqlDB.Close()
return nil, err
}
return sqlDB, nil
}
@@ -0,0 +1,106 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type completeOptions struct {
agent string
threadID string
summary string
body string
bodyFile string
payloadJSON string
artifacts artifactOptions
}
func newDoneCmd(root *rootOptions) *cobra.Command {
return newCompleteCmd(root, "done")
}
func newFailCmd(root *rootOptions) *cobra.Command {
return newCompleteCmd(root, "fail")
}
func newCompleteCmd(root *rootOptions, mode string) *cobra.Command {
opts := &completeOptions{}
cmd := &cobra.Command{
Use: mode,
Short: map[string]string{"done": "Mark a thread complete", "fail": "Mark a thread failed"}[mode],
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
if agent == "" {
return protocol.InvalidInput("agent is required", nil)
}
body, err := resolveBodyValue(opts.body, opts.bodyFile)
if err != nil {
return err
}
artifacts, err := resolveArtifacts(opts.artifacts)
if err != nil {
return err
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
thread, message, err := s.CompleteThread(ctx, store.CompleteInput{
ThreadID: opts.threadID,
Agent: agent,
Summary: opts.summary,
Body: body,
PayloadJSON: opts.payloadJSON,
Failed: mode == "fail",
Artifacts: artifacts,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: mode,
Data: map[string]any{
"thread": thread,
"message": message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "%s thread %s\n", mode, thread.ThreadID)
return err
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Acting agent")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().StringVar(&opts.summary, "summary", "", "Short completion summary")
cmd.Flags().StringVar(&opts.body, "body", "", "Completion body")
cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read completion body from file")
cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string")
addArtifactFlags(cmd, &opts.artifacts)
_ = cmd.MarkFlagRequired("thread")
_ = cmd.MarkFlagRequired("summary")
return cmd
}
@@ -0,0 +1,140 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func TestDoneMarksThreadTerminal(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
doneOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"done",
"--agent", "worker-a",
"--thread", threadID,
"--summary", "Retry policy implemented",
"--body", "The HTTP client now retries the selected transient failures.",
)
var doneResp map[string]any
mustDecodeJSON(t, doneOut, &doneResp)
if status := nestedString(t, doneResp, "data", "thread", "status"); status != "done" {
t.Fatalf("expected done thread status, got %q", status)
}
if kind := nestedString(t, doneResp, "data", "message", "kind"); kind != "result" {
t.Fatalf("expected result message kind, got %q", kind)
}
}
func TestDonePersistsResultBodyAndArtifact(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
resultPath := filepath.Join(tempDir, "result.md")
body := "Result from body file."
if err := os.WriteFile(resultPath, []byte(body), 0o644); err != nil {
t.Fatalf("write result file: %v", err)
}
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"done",
"--agent", "worker-a",
"--thread", threadID,
"--summary", "Retry policy implemented",
"--body-file", resultPath,
"--artifact", resultPath,
"--artifact-kind", "report",
)
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
lastMessage := lastThreadMessageFromShow(t, showResp)
if gotBody, _ := lastMessage["body"].(string); gotBody != body {
t.Fatalf("expected body %q, got %#v", body, lastMessage["body"])
}
artifacts, ok := lastMessage["artifacts"].([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %#v", lastMessage["artifacts"])
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if gotPath, _ := artifact["path"].(string); gotPath != resultPath {
t.Fatalf("expected artifact path %q, got %#v", resultPath, artifact["path"])
}
if gotKind, _ := artifact["kind"].(string); gotKind != "report" {
t.Fatalf("expected artifact kind report, got %#v", artifact["kind"])
}
}
func TestDoneRejectsNonOwner(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"done",
"--agent", "worker-b",
"--thread", threadID,
"--summary", "Retry policy implemented",
)
if exitCode != 20 {
t.Fatalf("expected exit code 20, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "lease_conflict")
}
func TestDoneRejectsOnTerminalThread(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"done",
"--agent", "worker-a",
"--thread", threadID,
"--summary", "Retry policy implemented",
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"done",
"--agent", "worker-a",
"--thread", threadID,
"--summary", "Retry policy implemented",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "invalid_state")
}
@@ -0,0 +1,113 @@
package inbox
import (
"errors"
"fmt"
"io"
"strings"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
)
func Execute(args []string, stdout, stderr io.Writer) int {
cmd := NewRootCmd()
cmd.SetOut(stdout)
cmd.SetErr(stderr)
cmd.SetArgs(args)
if err := cmd.Execute(); err != nil {
jsonOutput := hasJSONFlag(args)
renderError(stdout, stderr, jsonOutput, err)
return exitCodeForError(err)
}
return 0
}
func exitCodeForError(err error) int {
var cliErr *protocol.CLIError
if errors.As(err, &cliErr) {
return cliErr.ExitCode
}
switch {
case isUsageError(err):
return 30
case errors.Is(err, store.ErrLeaseConflict):
return 20
case errors.Is(err, store.ErrThreadNotFound), errors.Is(err, store.ErrMessageNotFound):
return 40
case errors.Is(err, store.ErrInvalidInput), errors.Is(err, store.ErrInvalidState), errors.Is(err, store.ErrNoActiveLease):
return 30
default:
return 50
}
}
func errorCodeForError(err error) string {
var cliErr *protocol.CLIError
if errors.As(err, &cliErr) {
return cliErr.Code
}
switch {
case isUsageError(err):
return "invalid_input"
case errors.Is(err, store.ErrLeaseConflict):
return "lease_conflict"
case errors.Is(err, store.ErrThreadNotFound), errors.Is(err, store.ErrMessageNotFound):
return "not_found"
case errors.Is(err, store.ErrInvalidInput):
return "invalid_input"
case errors.Is(err, store.ErrInvalidState), errors.Is(err, store.ErrNoActiveLease):
return "invalid_state"
default:
return "internal_error"
}
}
func renderError(stdout, stderr io.Writer, jsonOutput bool, err error) {
message := errorMessage(err)
if jsonOutput {
_ = protocol.WriteJSON(stdout, protocol.Error{
OK: false,
Error: protocol.ErrorPayload{
Code: errorCodeForError(err),
Message: message,
},
})
return
}
_, _ = fmt.Fprintln(stderr, message)
}
func errorMessage(err error) string {
var cliErr *protocol.CLIError
if errors.As(err, &cliErr) {
return cliErr.Message
}
return err.Error()
}
func hasJSONFlag(args []string) bool {
for _, arg := range args {
if arg == "--json" {
return true
}
if strings.HasPrefix(arg, "--json=") {
return !strings.HasSuffix(arg, "=false")
}
}
return false
}
func isUsageError(err error) bool {
message := err.Error()
return strings.HasPrefix(message, "required flag(s)") ||
strings.HasPrefix(message, "unknown flag:") ||
strings.HasPrefix(message, "unknown command ") ||
strings.Contains(message, " accepts ") ||
strings.Contains(message, "invalid argument ")
}
@@ -0,0 +1,143 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func TestFailMarksThreadFailed(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
failOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fail",
"--agent", "worker-b",
"--thread", threadID,
"--summary", "Migration failed",
"--body", "The migration cannot proceed because the prior schema is inconsistent.",
)
var failResp map[string]any
mustDecodeJSON(t, failOut, &failResp)
if status := nestedString(t, failResp, "data", "thread", "status"); status != "failed" {
t.Fatalf("expected failed thread status, got %q", status)
}
if kind := nestedString(t, failResp, "data", "message", "kind"); kind != "result" {
t.Fatalf("expected result message kind, got %q", kind)
}
if toAgent := nestedString(t, failResp, "data", "message", "to_agent"); toAgent != "leader" {
t.Fatalf("expected message to_agent leader, got %q", toAgent)
}
}
func TestFailPersistsFailureBodyAndArtifact(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
failurePath := filepath.Join(tempDir, "failure.md")
body := "Failure details from file."
if err := os.WriteFile(failurePath, []byte(body), 0o644); err != nil {
t.Fatalf("write failure file: %v", err)
}
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"fail",
"--agent", "worker-b",
"--thread", threadID,
"--summary", "Migration failed",
"--body-file", failurePath,
"--artifact", failurePath,
"--artifact-kind", "report",
)
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
lastMessage := lastThreadMessageFromShow(t, showResp)
if gotBody, _ := lastMessage["body"].(string); gotBody != body {
t.Fatalf("expected body %q, got %#v", body, lastMessage["body"])
}
artifacts, ok := lastMessage["artifacts"].([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %#v", lastMessage["artifacts"])
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if gotPath, _ := artifact["path"].(string); gotPath != failurePath {
t.Fatalf("expected artifact path %q, got %#v", failurePath, artifact["path"])
}
if gotKind, _ := artifact["kind"].(string); gotKind != "report" {
t.Fatalf("expected artifact kind report, got %#v", artifact["kind"])
}
}
func TestFailRejectsNonOwner(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fail",
"--agent", "worker-x",
"--thread", threadID,
"--summary", "Migration failed",
)
if exitCode != 20 {
t.Fatalf("expected exit code 20, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "lease_conflict")
}
func TestFailRejectsOnTerminalThread(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-b", "worker-b")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"fail",
"--agent", "worker-b",
"--thread", threadID,
"--summary", "Migration failed",
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fail",
"--agent", "worker-b",
"--thread", threadID,
"--summary", "Migration failed",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "invalid_state")
}
@@ -0,0 +1,97 @@
package inbox
import (
"fmt"
"strings"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type fetchOptions struct {
agent string
statuses string
limit int
unread bool
}
func newFetchCmd(root *rootOptions) *cobra.Command {
opts := &fetchOptions{}
cmd := &cobra.Command{
Use: "fetch",
Short: "List candidate threads for an agent",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
threads, err := s.FetchThreads(ctx, store.FetchInput{
Agent: agent,
Statuses: parseCSV(opts.statuses),
Limit: opts.limit,
Unread: opts.unread,
})
if err != nil {
return err
}
if len(threads) == 0 {
return protocol.NoMatchingWork("no matching work")
}
resp := protocol.Success{
OK: true,
Command: "fetch",
Data: map[string]any{
"threads": threads,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
for _, thread := range threads {
if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", thread.ThreadID, thread.Status, thread.Subject); err != nil {
return err
}
}
return nil
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Assigned agent filter")
cmd.Flags().StringVar(&opts.statuses, "status", "pending", "Comma-separated status filter")
cmd.Flags().IntVar(&opts.limit, "limit", 20, "Maximum number of threads")
cmd.Flags().BoolVar(&opts.unread, "unread", false, "Only return threads whose latest message is unread by the agent")
return cmd
}
func parseCSV(value string) []string {
if strings.TrimSpace(value) == "" {
return nil
}
raw := strings.Split(value, ",")
out := make([]string, 0, len(raw))
for _, entry := range raw {
entry = strings.TrimSpace(entry)
if entry != "" {
out = append(out, entry)
}
}
return out
}
@@ -0,0 +1,187 @@
package inbox
import "testing"
func TestFetchReturnsPendingThreadForTargetAgent(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
sendPendingThread(t, dbPath, "leader", "worker-a", "Implement task", "Create API endpoint")
fetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-a",
"--status", "pending",
)
var fetchResp map[string]any
mustDecodeJSON(t, fetchOut, &fetchResp)
threads, ok := nestedValue(t, fetchResp, "data", "threads").([]any)
if !ok || len(threads) < 1 {
t.Fatalf("expected at least one fetched thread, got %#v", nestedValue(t, fetchResp, "data", "threads"))
}
thread, ok := threads[0].(map[string]any)
if !ok {
t.Fatalf("expected thread object, got %#v", threads[0])
}
if got, _ := thread["assigned_to"].(string); got != "worker-a" {
t.Fatalf("expected assigned_to worker-a, got %#v", thread["assigned_to"])
}
if got, _ := thread["status"].(string); got != "pending" {
t.Fatalf("expected pending status, got %#v", thread["status"])
}
}
func TestFetchRespectsStatusAndLimitFilters(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
sendPendingThread(t, dbPath, "leader", "worker-a", "Task A", "Pending task")
blockedThreadID := sendPendingThread(t, dbPath, "leader", "worker-a", "Task B", "Blocked task")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-a",
"--thread", blockedThreadID,
)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", blockedThreadID,
"--status", "blocked",
"--summary", "Need decision",
"--payload-json", `{"question":"continue?"}`,
)
fetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-a",
"--status", "pending,blocked",
"--limit", "1",
)
var fetchResp map[string]any
mustDecodeJSON(t, fetchOut, &fetchResp)
threads, ok := nestedValue(t, fetchResp, "data", "threads").([]any)
if !ok {
t.Fatalf("expected threads array, got %#v", nestedValue(t, fetchResp, "data", "threads"))
}
if len(threads) > 1 {
t.Fatalf("expected at most one thread with limit=1, got %d", len(threads))
}
if len(threads) == 0 {
t.Fatalf("expected one thread with status filter, got empty result")
}
thread, ok := threads[0].(map[string]any)
if !ok {
t.Fatalf("expected thread object, got %#v", threads[0])
}
status, _ := thread["status"].(string)
if status != "pending" && status != "blocked" {
t.Fatalf("expected pending or blocked status, got %#v", thread["status"])
}
}
func TestFetchUnreadUsesReadCursor(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-e", "Review navbar copy", "Check top nav wording")
firstFetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
var firstFetchResp map[string]any
mustDecodeJSON(t, firstFetchOut, &firstFetchResp)
firstThreads, ok := nestedValue(t, firstFetchResp, "data", "threads").([]any)
if !ok || len(firstThreads) != 1 {
t.Fatalf("expected one unread thread before mark-read, got %#v", nestedValue(t, firstFetchResp, "data", "threads"))
}
runInboxCommand(
t,
"--db", dbPath,
"--agent", "worker-e",
"--json",
"show",
"--thread", threadID,
"--mark-read",
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
if exitCode != 10 {
t.Fatalf("expected unread fetch to return no_matching_work after mark-read, got exit=%d stdout=%s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "no_matching_work")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-e",
"--thread", threadID,
"--summary", "Use sentence case",
"--body", "Keep the nav labels in sentence case.",
)
thirdFetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
var thirdFetchResp map[string]any
mustDecodeJSON(t, thirdFetchOut, &thirdFetchResp)
thirdThreads, ok := nestedValue(t, thirdFetchResp, "data", "threads").([]any)
if !ok || len(thirdThreads) != 1 {
t.Fatalf("expected unread thread to reappear after new message, got %#v", nestedValue(t, thirdFetchResp, "data", "threads"))
}
}
func TestFetchReturnsNoMatchingWorkWhenEmpty(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-z",
"--status", "pending",
)
if exitCode != 10 {
t.Fatalf("expected exit code 10, got %d", exitCode)
}
assertErrorJSON(t, stdout, "no_matching_work")
}
@@ -0,0 +1,41 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"github.com/spf13/cobra"
)
func newInitCmd(opts *rootOptions) *cobra.Command {
return &cobra.Command{
Use: "init",
Short: "Initialize the shared SQLite database schema",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
sqlDB, err := openInboxDB(ctx, opts.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
resp := protocol.Success{
OK: true,
Command: "init",
Data: map[string]any{
"db_path": opts.dbPath,
"status": "initialized",
},
}
if opts.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "initialized database: %s\n", opts.dbPath)
return err
},
}
}
@@ -0,0 +1,83 @@
package inbox
import (
"path/filepath"
"testing"
)
func TestInitCreatesSchemaOnEmptyDB(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
initOut := runInboxCommand(t, "--db", dbPath, "--json", "init")
var initResp map[string]any
mustDecodeJSON(t, initOut, &initResp)
if ok, _ := initResp["ok"].(bool); !ok {
t.Fatalf("expected ok=true, got %#v", initResp)
}
if cmd, _ := initResp["command"].(string); cmd != "init" {
t.Fatalf("expected command init, got %#v", initResp["command"])
}
if got := nestedString(t, initResp, "data", "db_path"); got != dbPath {
t.Fatalf("expected db_path %q, got %q", dbPath, got)
}
if got := nestedString(t, initResp, "data", "status"); got != "initialized" {
t.Fatalf("expected initialized status, got %q", got)
}
}
func TestInitIsIdempotentOnExistingDB(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
firstOut := runInboxCommand(t, "--db", dbPath, "--json", "init")
secondOut := runInboxCommand(t, "--db", dbPath, "--json", "init")
var firstResp map[string]any
var secondResp map[string]any
mustDecodeJSON(t, firstOut, &firstResp)
mustDecodeJSON(t, secondOut, &secondResp)
if got := nestedString(t, firstResp, "data", "status"); got != "initialized" {
t.Fatalf("expected first init status initialized, got %q", got)
}
if got := nestedString(t, secondResp, "data", "status"); got != "initialized" {
t.Fatalf("expected second init status initialized, got %q", got)
}
if got := nestedString(t, firstResp, "data", "db_path"); got != dbPath {
t.Fatalf("expected first db_path %q, got %q", dbPath, got)
}
if got := nestedString(t, secondResp, "data", "db_path"); got != dbPath {
t.Fatalf("expected second db_path %q, got %q", dbPath, got)
}
}
func initCommandTestDB(t *testing.T) string {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
return dbPath
}
func sendPendingThread(t *testing.T, dbPath, from, to, subject, summary string) string {
t.Helper()
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", from,
"--to", to,
"--subject", subject,
"--summary", summary,
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
return nestedString(t, sendResp, "data", "thread", "thread_id")
}
@@ -0,0 +1,734 @@
package inbox
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestInboxLifecycle(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
initOut := runInboxCommand(t, "--db", dbPath, "--json", "init")
var initResp map[string]any
mustDecodeJSON(t, initOut, &initResp)
if initResp["ok"] != true {
t.Fatalf("expected init ok=true, got %#v", initResp)
}
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--subject", "Implement feature X",
"--summary", "Add retry policy",
"--body", "Implement retry handling for the HTTP client.",
"--run", "run_blog_001",
"--task", "T1",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
threadStatus := nestedString(t, sendResp, "data", "thread", "status")
if threadStatus != "pending" {
t.Fatalf("expected pending thread, got %q", threadStatus)
}
fetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-a",
"--status", "pending",
)
var fetchResp map[string]any
mustDecodeJSON(t, fetchOut, &fetchResp)
threadsValue := nestedValue(t, fetchResp, "data", "threads")
threads, ok := threadsValue.([]any)
if !ok || len(threads) != 1 {
t.Fatalf("expected one fetched thread, got %#v", threadsValue)
}
claimOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-a",
"--thread", threadID,
"--lease-seconds", "300",
)
var claimResp map[string]any
mustDecodeJSON(t, claimOut, &claimResp)
claimedStatus := nestedString(t, claimResp, "data", "thread", "status")
if claimedStatus != "claimed" {
t.Fatalf("expected claimed thread, got %q", claimedStatus)
}
updateOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "in_progress",
"--summary", "Implementation started",
"--body", "Scanning current HTTP client usage.",
)
var updateResp map[string]any
mustDecodeJSON(t, updateOut, &updateResp)
updatedStatus := nestedString(t, updateResp, "data", "thread", "status")
if updatedStatus != "in_progress" {
t.Fatalf("expected in_progress thread, got %q", updatedStatus)
}
blockedOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need timeout decision",
"--payload-json", `{"question":"Should retries apply to read timeouts?"}`,
)
var blockedResp map[string]any
mustDecodeJSON(t, blockedOut, &blockedResp)
blockedStatus := nestedString(t, blockedResp, "data", "thread", "status")
if blockedStatus != "blocked" {
t.Fatalf("expected blocked thread, got %q", blockedStatus)
}
replyOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-a",
"--thread", threadID,
"--summary", "Retry read timeouts",
"--body", "Yes, include read timeouts in the retry policy.",
)
var replyResp map[string]any
mustDecodeJSON(t, replyOut, &replyResp)
replyKind := nestedString(t, replyResp, "data", "message", "kind")
if replyKind != "answer" {
t.Fatalf("expected answer reply, got %q", replyKind)
}
doneOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"done",
"--agent", "worker-a",
"--thread", threadID,
"--summary", "Retry policy implemented",
"--body", "The HTTP client now retries the selected transient failures.",
)
var doneResp map[string]any
mustDecodeJSON(t, doneOut, &doneResp)
doneStatus := nestedString(t, doneResp, "data", "thread", "status")
if doneStatus != "done" {
t.Fatalf("expected done thread, got %q", doneStatus)
}
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
showStatus := nestedString(t, showResp, "data", "thread", "status")
if showStatus != "done" {
t.Fatalf("expected show status done, got %q", showStatus)
}
messagesValue := nestedValue(t, showResp, "data", "messages")
messages, ok := messagesValue.([]any)
if !ok || len(messages) != 6 {
t.Fatalf("expected six messages in thread history, got %#v", messagesValue)
}
}
func TestInboxFailLifecycle(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-b",
"--subject", "Investigate failing migration",
"--summary", "Check migration failure",
"--run", "run_blog_002",
"--task", "T2",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-b",
"--thread", threadID,
)
failOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fail",
"--agent", "worker-b",
"--thread", threadID,
"--summary", "Migration failed",
"--body", "The migration cannot proceed because the prior schema is inconsistent.",
)
var failResp map[string]any
mustDecodeJSON(t, failOut, &failResp)
failStatus := nestedString(t, failResp, "data", "thread", "status")
if failStatus != "failed" {
t.Fatalf("expected failed thread, got %q", failStatus)
}
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
showStatus := nestedString(t, showResp, "data", "thread", "status")
if showStatus != "failed" {
t.Fatalf("expected show status failed, got %q", showStatus)
}
}
func TestInboxRenewWaitReplyAndCancel(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-c",
"--subject", "Investigate auth edge case",
"--summary", "Check auth redirect behavior",
"--run", "run_blog_003",
"--task", "T3",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-c",
"--thread", threadID,
"--lease-seconds", "300",
)
renewOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"renew",
"--agent", "worker-c",
"--thread", threadID,
"--lease-seconds", "600",
)
var renewResp map[string]any
mustDecodeJSON(t, renewOut, &renewResp)
if got := nestedString(t, renewResp, "data", "message", "summary"); got != "lease renewed" {
t.Fatalf("expected lease renewed summary, got %q", got)
}
blockedOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-c",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need policy decision",
"--body", "Should guest users be redirected to login or shown a 403 page?",
)
var blockedResp map[string]any
mustDecodeJSON(t, blockedOut, &blockedResp)
blockedMessageID := nestedString(t, blockedResp, "data", "message", "message_id")
type commandResult struct {
stdout string
stderr string
exit int
}
waitCh := make(chan commandResult, 1)
go func() {
stdout, stderr, exitCode := executeInboxCommand(
"--db", dbPath,
"--agent", "worker-c",
"--json",
"wait-reply",
"--thread", threadID,
"--after-message", blockedMessageID,
"--timeout-seconds", "2",
)
waitCh <- commandResult{stdout: stdout, stderr: stderr, exit: exitCode}
}()
time.Sleep(200 * time.Millisecond)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-c",
"--thread", threadID,
"--summary", "Redirect to login",
"--body", "Redirect guests to login for the MVP.",
)
var waitResult commandResult
select {
case waitResult = <-waitCh:
case <-time.After(3 * time.Second):
t.Fatal("wait-reply command did not return")
}
if waitResult.exit != 0 {
t.Fatalf("wait-reply failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
}
var waitResp map[string]any
mustDecodeJSON(t, waitResult.stdout, &waitResp)
if woke, ok := nestedValue(t, waitResp, "data", "woke").(bool); !ok || !woke {
t.Fatalf("expected wait-reply to wake, got %#v", nestedValue(t, waitResp, "data", "woke"))
}
if kind := nestedString(t, waitResp, "data", "message", "kind"); kind != "answer" {
t.Fatalf("expected answer wake message, got %q", kind)
}
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--agent", "worker-c",
"--json",
"fetch",
"--status", "blocked",
"--unread",
)
if exitCode != 10 {
t.Fatalf("expected blocked unread list to be cleared after wait-reply, got exit %d with %s", exitCode, stdout)
}
cancelOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"cancel",
"--agent", "leader",
"--thread", threadID,
"--reason", "Task superseded by a larger refactor",
)
var cancelResp map[string]any
mustDecodeJSON(t, cancelOut, &cancelResp)
if status := nestedString(t, cancelResp, "data", "thread", "status"); status != "cancelled" {
t.Fatalf("expected cancelled thread, got %q", status)
}
}
func TestInboxWatchListUnreadAndAppend(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
bodyPath := filepath.Join(tempDir, "task.md")
if err := os.WriteFile(bodyPath, []byte("Implement the initial admin post editor."), 0o644); err != nil {
t.Fatalf("write body file: %v", err)
}
runInboxCommand(t, "--db", dbPath, "--json", "init")
type commandResult struct {
stdout string
stderr string
exit int
}
watchCh := make(chan commandResult, 1)
go func() {
stdout, stderr, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"watch",
"--agent", "worker-d",
"--status", "pending",
"--timeout-seconds", "2",
)
watchCh <- commandResult{stdout: stdout, stderr: stderr, exit: exitCode}
}()
time.Sleep(200 * time.Millisecond)
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-d",
"--subject", "Build admin editor",
"--summary", "Create the first editor screen",
"--body-file", bodyPath,
"--artifact", bodyPath,
"--artifact-kind", "brief",
"--artifact-metadata-json", `{"label":"task-brief"}`,
"--run", "run_blog_004",
"--task", "T4",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
var watchResult commandResult
select {
case watchResult = <-watchCh:
case <-time.After(3 * time.Second):
t.Fatal("watch command did not return")
}
if watchResult.exit != 0 {
t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", watchResult.exit, watchResult.stderr, watchResult.stdout)
}
var watchResp map[string]any
mustDecodeJSON(t, watchResult.stdout, &watchResp)
if woke, ok := nestedValue(t, watchResp, "data", "woke").(bool); !ok || !woke {
t.Fatalf("expected watch to wake, got %#v", nestedValue(t, watchResp, "data", "woke"))
}
if watchedThreadID := nestedString(t, watchResp, "data", "thread", "thread_id"); watchedThreadID != threadID {
t.Fatalf("expected watch on thread %s, got %s", threadID, watchedThreadID)
}
fetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-d",
"--status", "pending",
"--unread",
)
var fetchResp map[string]any
mustDecodeJSON(t, fetchOut, &fetchResp)
fetchedThreads, ok := nestedValue(t, fetchResp, "data", "threads").([]any)
if !ok || len(fetchedThreads) != 1 {
t.Fatalf("expected one unread pending thread, got %#v", nestedValue(t, fetchResp, "data", "threads"))
}
listOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"list",
"--assigned-to", "worker-d",
"--status", "pending",
)
var listResp map[string]any
mustDecodeJSON(t, listOut, &listResp)
listedThreads, ok := nestedValue(t, listResp, "data", "threads").([]any)
if !ok || len(listedThreads) != 1 {
t.Fatalf("expected one listed thread, got %#v", nestedValue(t, listResp, "data", "threads"))
}
runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-d",
"--thread", threadID,
"--summary", "Use a markdown editor",
"--body", "Prefer a textarea-based markdown editor for v1.",
)
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
messages, ok := nestedValue(t, showResp, "data", "messages").([]any)
if !ok || len(messages) != 2 {
t.Fatalf("expected two messages after append, got %#v", nestedValue(t, showResp, "data", "messages"))
}
firstMessage, ok := messages[0].(map[string]any)
if !ok {
t.Fatalf("expected first message object, got %#v", messages[0])
}
if firstMessage["body"] != "Implement the initial admin post editor." {
t.Fatalf("expected body-file content in first message, got %#v", firstMessage["body"])
}
artifacts, ok := firstMessage["artifacts"].([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact on first message, got %#v", firstMessage["artifacts"])
}
firstArtifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if firstArtifact["path"] != bodyPath {
t.Fatalf("expected artifact path %q, got %#v", bodyPath, firstArtifact["path"])
}
if firstArtifact["kind"] != "brief" {
t.Fatalf("expected artifact kind brief, got %#v", firstArtifact["kind"])
}
}
func TestInboxUnreadReadCursor(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-e",
"--subject", "Review navbar copy",
"--summary", "Check top nav wording",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
fetchOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
var fetchResp map[string]any
mustDecodeJSON(t, fetchOut, &fetchResp)
threads, ok := nestedValue(t, fetchResp, "data", "threads").([]any)
if !ok || len(threads) != 1 {
t.Fatalf("expected one unread pending thread, got %#v", nestedValue(t, fetchResp, "data", "threads"))
}
runInboxCommand(
t,
"--db", dbPath,
"--agent", "worker-e",
"--json",
"show",
"--thread", threadID,
"--mark-read",
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
if exitCode != 10 {
t.Fatalf("expected unread fetch to clear after mark-read, got exit %d with %s", exitCode, stdout)
}
runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-e",
"--thread", threadID,
"--summary", "Use sentence case",
"--body", "Keep the nav labels in sentence case.",
)
fetchOut = runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
mustDecodeJSON(t, fetchOut, &fetchResp)
threads, ok = nestedValue(t, fetchResp, "data", "threads").([]any)
if !ok || len(threads) != 1 {
t.Fatalf("expected unread thread to reappear after new message, got %#v", nestedValue(t, fetchResp, "data", "threads"))
}
}
func TestInboxJSONErrorsAndExitCodes(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
if _, _, exitCode := executeInboxCommand("--db", dbPath, "--json", "init"); exitCode != 0 {
t.Fatalf("expected init exit code 0, got %d", exitCode)
}
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-z",
"--status", "pending",
)
if exitCode != 10 {
t.Fatalf("expected fetch no-match exit code 10, got %d", exitCode)
}
assertErrorJSON(t, stdout, "no_matching_work")
stdout, _, exitCode = executeInboxCommand(
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-z",
"--thread", "thr_missing",
)
if exitCode != 40 {
t.Fatalf("expected claim missing-thread exit code 40, got %d", exitCode)
}
assertErrorJSON(t, stdout, "not_found")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-z",
"--subject", "Review cache settings",
"--summary", "Check cache config",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-z",
"--thread", threadID,
)
stdout, _, exitCode = executeInboxCommand(
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-y",
"--thread", threadID,
)
if exitCode != 20 {
t.Fatalf("expected lease conflict exit code 20, got %d", exitCode)
}
assertErrorJSON(t, stdout, "lease_conflict")
stdout, _, exitCode = executeInboxCommand(
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-z",
"--subject", "Invalid payload json",
"--payload-json", "not-json",
)
if exitCode != 30 {
t.Fatalf("expected invalid input exit code 30, got %d", exitCode)
}
assertErrorJSON(t, stdout, "invalid_input")
stdout, _, exitCode = executeInboxCommand(
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-z",
"--subject", "Invalid artifact json",
"--artifact", "/tmp/report.md",
"--artifact-metadata-json", "not-json",
)
if exitCode != 30 {
t.Fatalf("expected invalid artifact metadata exit code 30, got %d", exitCode)
}
assertErrorJSON(t, stdout, "invalid_input")
}
@@ -0,0 +1,83 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type listOptions struct {
agent string
statuses string
createdBy string
assignedTo string
limit int
}
func newListCmd(root *rootOptions) *cobra.Command {
opts := &listOptions{}
cmd := &cobra.Command{
Use: "list",
Short: "List threads with filters",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
threads, err := s.ListThreads(ctx, store.ListInput{
Agent: agent,
Statuses: parseCSV(opts.statuses),
CreatedBy: opts.createdBy,
AssignedTo: opts.assignedTo,
Limit: opts.limit,
})
if err != nil {
return err
}
if len(threads) == 0 {
return protocol.NoMatchingWork("no matching work")
}
resp := protocol.Success{
OK: true,
Command: "list",
Data: map[string]any{
"threads": threads,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
for _, thread := range threads {
if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\t%s\n", thread.ThreadID, thread.Status, thread.AssignedTo, thread.Subject); err != nil {
return err
}
}
return nil
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Assigned agent filter shortcut")
cmd.Flags().StringVar(&opts.statuses, "status", "", "Comma-separated status filter")
cmd.Flags().StringVar(&opts.createdBy, "created-by", "", "Created-by filter")
cmd.Flags().StringVar(&opts.assignedTo, "assigned-to", "", "Assigned-to filter")
cmd.Flags().IntVar(&opts.limit, "limit", 20, "Maximum number of threads")
return cmd
}
@@ -0,0 +1,183 @@
package inbox
import (
"path/filepath"
"testing"
"time"
)
func TestListFiltersByStatus(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
createThreadForList(t, dbPath, "leader", "worker-d", "Task pending", "Pending task")
blockedThreadID := createThreadForList(t, dbPath, "leader", "worker-d", "Task blocked", "Blocked task")
runInboxCommand(t, "--db", dbPath, "--json", "claim", "--agent", "worker-d", "--thread", blockedThreadID)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-d",
"--thread", blockedThreadID,
"--status", "blocked",
"--summary", "Need policy decision",
)
listOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"list",
"--agent", "worker-d",
"--status", "pending,blocked",
)
var listResp map[string]any
mustDecodeJSON(t, listOut, &listResp)
threads, ok := nestedValue(t, listResp, "data", "threads").([]any)
if !ok || len(threads) < 2 {
t.Fatalf("expected at least two matching threads, got %#v", nestedValue(t, listResp, "data", "threads"))
}
for _, raw := range threads {
thread, ok := raw.(map[string]any)
if !ok {
t.Fatalf("expected thread object, got %#v", raw)
}
status, _ := thread["status"].(string)
if status != "pending" && status != "blocked" {
t.Fatalf("expected status pending or blocked, got %#v", status)
}
}
}
func TestListFiltersByCreatedBy(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
createThreadForList(t, dbPath, "leader", "worker-d", "Leader task", "From leader")
createThreadForList(t, dbPath, "planner", "worker-d", "Planner task", "From planner")
listOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"list",
"--created-by", "leader",
"--assigned-to", "worker-d",
)
var listResp map[string]any
mustDecodeJSON(t, listOut, &listResp)
threads, ok := nestedValue(t, listResp, "data", "threads").([]any)
if !ok || len(threads) == 0 {
t.Fatalf("expected matching leader-created threads, got %#v", nestedValue(t, listResp, "data", "threads"))
}
for _, raw := range threads {
thread, ok := raw.(map[string]any)
if !ok {
t.Fatalf("expected thread object, got %#v", raw)
}
if got := thread["created_by"]; got != "leader" {
t.Fatalf("expected created_by leader, got %#v", got)
}
}
}
func TestListFiltersByAssignedTo(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
createThreadForList(t, dbPath, "leader", "worker-d", "Worker D task", "For worker-d")
createThreadForList(t, dbPath, "leader", "worker-e", "Worker E task", "For worker-e")
listOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"list",
"--assigned-to", "worker-d",
"--status", "pending",
)
var listResp map[string]any
mustDecodeJSON(t, listOut, &listResp)
threads, ok := nestedValue(t, listResp, "data", "threads").([]any)
if !ok || len(threads) == 0 {
t.Fatalf("expected assigned-to match, got %#v", nestedValue(t, listResp, "data", "threads"))
}
for _, raw := range threads {
thread, ok := raw.(map[string]any)
if !ok {
t.Fatalf("expected thread object, got %#v", raw)
}
if got := thread["assigned_to"]; got != "worker-d" {
t.Fatalf("expected assigned_to worker-d, got %#v", got)
}
if got := thread["status"]; got != "pending" {
t.Fatalf("expected pending status, got %#v", got)
}
}
}
func TestListRespectsLimit(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
createThreadForList(t, dbPath, "leader", "worker-d", "Task 1", "Earlier task")
time.Sleep(20 * time.Millisecond)
createThreadForList(t, dbPath, "leader", "worker-d", "Task 2", "Latest task")
listOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"list",
"--assigned-to", "worker-d",
"--limit", "1",
)
var listResp map[string]any
mustDecodeJSON(t, listOut, &listResp)
threads, ok := nestedValue(t, listResp, "data", "threads").([]any)
if !ok {
t.Fatalf("expected threads array, got %#v", nestedValue(t, listResp, "data", "threads"))
}
if len(threads) != 1 {
t.Fatalf("expected exactly one row for limit=1, got %d", len(threads))
}
thread, ok := threads[0].(map[string]any)
if !ok {
t.Fatalf("expected thread object, got %#v", threads[0])
}
if got := thread["subject"]; got != "Task 2" {
t.Fatalf("expected latest thread subject Task 2, got %#v", got)
}
}
func createThreadForList(t *testing.T, dbPath, from, to, subject, summary string) string {
t.Helper()
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", from,
"--to", to,
"--subject", subject,
"--summary", summary,
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
return nestedString(t, sendResp, "data", "thread", "thread_id")
}
@@ -0,0 +1,76 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type renewOptions struct {
agent string
threadID string
leaseSeconds int
}
func newRenewCmd(root *rootOptions) *cobra.Command {
opts := &renewOptions{}
cmd := &cobra.Command{
Use: "renew",
Short: "Extend an existing lease",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
if agent == "" {
return protocol.InvalidInput("agent is required", nil)
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
result, err := s.RenewLease(ctx, store.RenewInput{
ThreadID: opts.threadID,
Agent: agent,
LeaseSeconds: opts.leaseSeconds,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "renew",
Data: map[string]any{
"thread": result.Thread,
"message": result.Message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "renewed lease on thread %s\n", result.Thread.ThreadID)
return err
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Lease owner")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().IntVar(&opts.leaseSeconds, "lease-seconds", 900, "Lease duration in seconds")
_ = cmd.MarkFlagRequired("thread")
return cmd
}
@@ -0,0 +1,103 @@
package inbox
import "testing"
func TestRenewExtendsActiveLease(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-c", "Renew lease", "Need renew coverage")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-c",
"--thread", threadID,
"--lease-seconds", "300",
)
renewOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"renew",
"--agent", "worker-c",
"--thread", threadID,
"--lease-seconds", "600",
)
var renewResp map[string]any
mustDecodeJSON(t, renewOut, &renewResp)
if got := nestedString(t, renewResp, "data", "thread", "status"); got != "claimed" {
t.Fatalf("expected status to stay claimed, got %q", got)
}
if got := nestedString(t, renewResp, "data", "message", "kind"); got != "event" {
t.Fatalf("expected event message kind, got %q", got)
}
if got := nestedString(t, renewResp, "data", "message", "summary"); got != "lease renewed" {
t.Fatalf("expected lease renewed summary, got %q", got)
}
payload, ok := nestedValue(t, renewResp, "data", "message", "payload_json").(map[string]any)
if !ok {
t.Fatalf("expected payload_json object, got %#v", nestedValue(t, renewResp, "data", "message", "payload_json"))
}
leaseSeconds, ok := payload["lease_seconds"].(float64)
if !ok || int(leaseSeconds) != 600 {
t.Fatalf("expected lease_seconds 600, got %#v", payload["lease_seconds"])
}
leaseToken, _ := payload["lease_token"].(string)
if leaseToken == "" {
t.Fatalf("expected non-empty lease_token, got %#v", payload["lease_token"])
}
}
func TestRenewRejectsNonOwner(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-c", "Renew non-owner", "Reject non-owner renew")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-c",
"--thread", threadID,
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"renew",
"--agent", "worker-x",
"--thread", threadID,
"--lease-seconds", "600",
)
if exitCode != 20 {
t.Fatalf("expected exit code 20, got %d", exitCode)
}
assertErrorJSON(t, stdout, "lease_conflict")
}
func TestRenewRejectsWithoutActiveLease(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-c", "Renew without lease", "Should fail without active lease")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"renew",
"--agent", "worker-c",
"--thread", threadID,
"--lease-seconds", "600",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d", exitCode)
}
assertErrorJSON(t, stdout, "invalid_state")
}
@@ -0,0 +1,104 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type replyOptions struct {
from string
to string
threadID string
kind string
summary string
body string
bodyFile string
payloadJSON string
artifacts artifactOptions
}
func newReplyCmd(root *rootOptions) *cobra.Command {
opts := &replyOptions{}
cmd := &cobra.Command{
Use: "reply",
Short: "Reply inside an existing thread",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
from := opts.from
if from == "" {
from = root.agent
}
if from == "" {
return protocol.InvalidInput("from agent is required", nil)
}
body, err := resolveBodyValue(opts.body, opts.bodyFile)
if err != nil {
return err
}
artifacts, err := resolveArtifacts(opts.artifacts)
if err != nil {
return err
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
thread, message, err := s.ReplyToThread(ctx, store.ReplyInput{
ThreadID: opts.threadID,
FromAgent: from,
ToAgent: opts.to,
Kind: opts.kind,
Summary: opts.summary,
Body: body,
PayloadJSON: opts.payloadJSON,
Artifacts: artifacts,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "reply",
Data: map[string]any{
"thread": thread,
"message": message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "replied on thread %s\n", thread.ThreadID)
return err
},
}
cmd.Flags().StringVar(&opts.from, "from", "", "Replying agent")
cmd.Flags().StringVar(&opts.to, "to", "", "Receiving agent")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().StringVar(&opts.kind, "kind", "answer", "Reply kind")
cmd.Flags().StringVar(&opts.summary, "summary", "", "Short reply summary")
cmd.Flags().StringVar(&opts.body, "body", "", "Reply body")
cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read reply body from file")
cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string")
addArtifactFlags(cmd, &opts.artifacts)
_ = cmd.MarkFlagRequired("thread")
_ = cmd.MarkFlagRequired("to")
_ = cmd.MarkFlagRequired("summary")
return cmd
}
@@ -0,0 +1,138 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func TestReplyAddsAnswerMessage(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
replyOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-a",
"--thread", threadID,
"--summary", "Retry read timeouts",
"--body", "Yes, include read timeouts in the retry policy.",
)
var replyResp map[string]any
mustDecodeJSON(t, replyOut, &replyResp)
if kind := nestedString(t, replyResp, "data", "message", "kind"); kind != "answer" {
t.Fatalf("expected answer message kind, got %q", kind)
}
if gotThreadID := nestedString(t, replyResp, "data", "thread", "thread_id"); gotThreadID != threadID {
t.Fatalf("expected thread_id %q, got %q", threadID, gotThreadID)
}
if status := nestedString(t, replyResp, "data", "thread", "status"); status != "pending" {
t.Fatalf("expected thread status pending, got %q", status)
}
}
func TestReplySupportsControlKind(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
replyOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-a",
"--thread", threadID,
"--kind", "control",
"--summary", "Pause rollout",
"--body", "Pause rollout until QA confirms the fix.",
)
var replyResp map[string]any
mustDecodeJSON(t, replyOut, &replyResp)
if kind := nestedString(t, replyResp, "data", "message", "kind"); kind != "control" {
t.Fatalf("expected control message kind, got %q", kind)
}
}
func TestReplyAttachesArtifact(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
decisionPath := filepath.Join(tempDir, "decision.md")
if err := os.WriteFile(decisionPath, []byte("Decision note."), 0o644); err != nil {
t.Fatalf("write decision file: %v", err)
}
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
replyOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-a",
"--thread", threadID,
"--summary", "Retry read timeouts",
"--artifact", decisionPath,
"--artifact-kind", "brief",
"--artifact-metadata-json", `{"label":"decision"}`,
)
var replyResp map[string]any
mustDecodeJSON(t, replyOut, &replyResp)
artifactsValue := nestedValue(t, replyResp, "data", "message", "artifacts")
artifacts, ok := artifactsValue.([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %#v", artifactsValue)
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if gotPath, _ := artifact["path"].(string); gotPath != decisionPath {
t.Fatalf("expected artifact path %q, got %#v", decisionPath, artifact["path"])
}
if gotKind, _ := artifact["kind"].(string); gotKind != "brief" {
t.Fatalf("expected artifact kind brief, got %#v", artifact["kind"])
}
metadata, ok := artifact["metadata_json"].(map[string]any)
if !ok {
t.Fatalf("expected metadata_json object, got %#v", artifact["metadata_json"])
}
if gotLabel := metadata["label"]; gotLabel != "decision" {
t.Fatalf("expected metadata label decision, got %#v", gotLabel)
}
}
func TestReplyRejectsInvalidPayloadJSON(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedThreadForInboxTests(t, dbPath, "leader", "worker-a")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-a",
"--thread", threadID,
"--summary", "Retry read timeouts",
"--payload-json", "not-json",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "invalid_input")
}
@@ -0,0 +1,43 @@
package inbox
import (
"github.com/spf13/cobra"
)
type rootOptions struct {
dbPath string
json bool
agent string
}
func NewRootCmd() *cobra.Command {
opts := &rootOptions{}
cmd := &cobra.Command{
Use: "inbox",
Short: "Worker-facing durable coordination bus",
SilenceErrors: true,
SilenceUsage: true,
}
cmd.PersistentFlags().StringVar(&opts.dbPath, "db", ".agents/coord.db", "SQLite database path")
cmd.PersistentFlags().BoolVar(&opts.json, "json", false, "Emit machine-readable JSON")
cmd.PersistentFlags().StringVar(&opts.agent, "agent", "", "Agent identity")
cmd.AddCommand(newInitCmd(opts))
cmd.AddCommand(newSendCmd(opts))
cmd.AddCommand(newFetchCmd(opts))
cmd.AddCommand(newClaimCmd(opts))
cmd.AddCommand(newRenewCmd(opts))
cmd.AddCommand(newUpdateCmd(opts))
cmd.AddCommand(newReplyCmd(opts))
cmd.AddCommand(newDoneCmd(opts))
cmd.AddCommand(newFailCmd(opts))
cmd.AddCommand(newCancelCmd(opts))
cmd.AddCommand(newListCmd(opts))
cmd.AddCommand(newWatchCmd(opts))
cmd.AddCommand(newWaitReplyCmd(opts))
cmd.AddCommand(newShowCmd(opts))
return cmd
}
@@ -0,0 +1,117 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type sendOptions struct {
from string
to string
threadID string
runID string
taskID string
subject string
kind string
summary string
body string
bodyFile string
payloadJSON string
priority string
artifacts artifactOptions
}
func newSendCmd(root *rootOptions) *cobra.Command {
opts := &sendOptions{}
cmd := &cobra.Command{
Use: "send",
Short: "Create a thread with an initial directed message",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
from := opts.from
if from == "" {
from = root.agent
}
if from == "" {
return protocol.InvalidInput("from agent is required", nil)
}
if opts.threadID == "" && opts.subject == "" {
return protocol.InvalidInput("subject is required when creating a new thread", nil)
}
body, err := resolveBodyValue(opts.body, opts.bodyFile)
if err != nil {
return err
}
artifacts, err := resolveArtifacts(opts.artifacts)
if err != nil {
return err
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
thread, message, err := s.Send(ctx, store.SendInput{
ThreadID: opts.threadID,
RunID: opts.runID,
TaskID: opts.taskID,
Subject: opts.subject,
FromAgent: from,
ToAgent: opts.to,
Kind: opts.kind,
Summary: opts.summary,
Body: body,
PayloadJSON: opts.payloadJSON,
Priority: opts.priority,
Artifacts: artifacts,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "send",
Data: map[string]any{
"thread": thread,
"message": message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "created thread %s\n", thread.ThreadID)
return err
},
}
cmd.Flags().StringVar(&opts.from, "from", "", "Sending agent")
cmd.Flags().StringVar(&opts.to, "to", "", "Receiving agent")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Optional thread ID override")
cmd.Flags().StringVar(&opts.runID, "run", "", "Optional run ID override")
cmd.Flags().StringVar(&opts.taskID, "task", "", "Optional task ID override")
cmd.Flags().StringVar(&opts.subject, "subject", "", "Thread subject")
cmd.Flags().StringVar(&opts.kind, "kind", "task", "Initial message kind")
cmd.Flags().StringVar(&opts.summary, "summary", "", "Short message summary")
cmd.Flags().StringVar(&opts.body, "body", "", "Message body")
cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read message body from file")
cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string")
cmd.Flags().StringVar(&opts.priority, "priority", "normal", "Thread priority")
addArtifactFlags(cmd, &opts.artifacts)
_ = cmd.MarkFlagRequired("to")
return cmd
}
@@ -0,0 +1,230 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func TestSendCreatesNewThread(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--subject", "Implement feature X",
"--summary", "Add retry policy",
"--body", "Implement retry handling for the HTTP client.",
"--run", "run_blog_001",
"--task", "T1",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
if got := nestedString(t, sendResp, "data", "thread", "thread_id"); got == "" {
t.Fatalf("expected thread_id, got empty")
}
if got := nestedString(t, sendResp, "data", "thread", "status"); got != "pending" {
t.Fatalf("expected pending status, got %q", got)
}
if got := nestedString(t, sendResp, "data", "thread", "created_by"); got != "leader" {
t.Fatalf("expected created_by leader, got %q", got)
}
if got := nestedString(t, sendResp, "data", "thread", "assigned_to"); got != "worker-a" {
t.Fatalf("expected assigned_to worker-a, got %q", got)
}
if got := nestedString(t, sendResp, "data", "message", "kind"); got != "task" {
t.Fatalf("expected message kind task, got %q", got)
}
}
func TestSendAppendsMessageToExistingThread(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
threadID := sendPendingThread(t, dbPath, "leader", "worker-d", "Build editor", "Create editor v1")
appendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-d",
"--thread", threadID,
"--summary", "Use a markdown editor",
"--body", "Prefer a textarea-based markdown editor for v1.",
)
var appendResp map[string]any
mustDecodeJSON(t, appendOut, &appendResp)
if got := nestedString(t, appendResp, "data", "thread", "thread_id"); got != threadID {
t.Fatalf("expected same thread_id %q, got %q", threadID, got)
}
if got := nestedString(t, appendResp, "data", "thread", "status"); got != "pending" {
t.Fatalf("expected thread status to stay pending, got %q", got)
}
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
messages, ok := nestedValue(t, showResp, "data", "messages").([]any)
if !ok || len(messages) != 2 {
t.Fatalf("expected two messages after append, got %#v", nestedValue(t, showResp, "data", "messages"))
}
}
func TestSendReadsBodyFromBodyFile(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
bodyPath := filepath.Join(tempDir, "task.md")
bodyContent := "Create the first editor screen.\nUse markdown syntax."
if err := os.WriteFile(bodyPath, []byte(bodyContent), 0o644); err != nil {
t.Fatalf("write body file: %v", err)
}
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-d",
"--subject", "Build admin editor",
"--summary", "Create the first editor screen",
"--body-file", bodyPath,
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
messages, ok := nestedValue(t, showResp, "data", "messages").([]any)
if !ok || len(messages) != 1 {
t.Fatalf("expected one message, got %#v", nestedValue(t, showResp, "data", "messages"))
}
message, ok := messages[0].(map[string]any)
if !ok {
t.Fatalf("expected message object, got %#v", messages[0])
}
if got, _ := message["body"].(string); got != bodyContent {
t.Fatalf("expected body %q, got %#v", bodyContent, message["body"])
}
}
func TestSendAttachesArtifactWithMetadata(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
artifactPath := filepath.Join(tempDir, "task.md")
if err := os.WriteFile(artifactPath, []byte("task brief"), 0o644); err != nil {
t.Fatalf("write artifact file: %v", err)
}
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-d",
"--subject", "Build admin editor",
"--summary", "Create the first editor screen",
"--artifact", artifactPath,
"--artifact-kind", "brief",
"--artifact-metadata-json", `{"label":"task-brief"}`,
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
artifacts, ok := nestedValue(t, sendResp, "data", "message", "artifacts").([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %#v", nestedValue(t, sendResp, "data", "message", "artifacts"))
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if got, _ := artifact["path"].(string); got != artifactPath {
t.Fatalf("expected artifact path %q, got %#v", artifactPath, artifact["path"])
}
if got, _ := artifact["kind"].(string); got != "brief" {
t.Fatalf("expected artifact kind brief, got %#v", artifact["kind"])
}
metadata, ok := artifact["metadata_json"].(map[string]any)
if !ok {
t.Fatalf("expected metadata_json object, got %#v", artifact["metadata_json"])
}
if got, _ := metadata["label"].(string); got != "task-brief" {
t.Fatalf("expected metadata_json.label task-brief, got %#v", metadata["label"])
}
}
func TestSendRejectsInvalidPayloadJSON(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-z",
"--subject", "Invalid payload json",
"--payload-json", "not-json",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d", exitCode)
}
assertErrorJSON(t, stdout, "invalid_input")
}
func TestSendRejectsInvalidArtifactMetadataJSON(t *testing.T) {
t.Parallel()
dbPath := initCommandTestDB(t)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-z",
"--subject", "Invalid artifact json",
"--artifact", "/tmp/report.md",
"--artifact-metadata-json", "not-json",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d", exitCode)
}
assertErrorJSON(t, stdout, "invalid_input")
}
@@ -0,0 +1,78 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type showOptions struct {
threadID string
markRead bool
}
func newShowCmd(root *rootOptions) *cobra.Command {
opts := &showOptions{}
cmd := &cobra.Command{
Use: "show",
Short: "Show one thread with message history",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
agent := root.agent
if opts.markRead && agent == "" {
return protocol.InvalidInput("agent is required when using --mark-read", nil)
}
detail, err := s.GetThreadForAgent(ctx, opts.threadID, agent, opts.markRead)
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "show",
Data: map[string]any{
"thread": detail.Thread,
"messages": detail.Messages,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", detail.Thread.ThreadID, detail.Thread.Status, detail.Thread.Subject); err != nil {
return err
}
for _, message := range detail.Messages {
if _, err := fmt.Fprintf(cmd.OutOrStdout(), "- %s\t%s\t%s\n", message.MessageID, message.Kind, message.Summary); err != nil {
return err
}
for _, artifact := range message.Artifacts {
if _, err := fmt.Fprintf(cmd.OutOrStdout(), " artifact\t%s\t%s\n", artifact.Kind, artifact.Path); err != nil {
return err
}
}
}
return nil
},
}
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().BoolVar(&opts.markRead, "mark-read", false, "Advance the caller's read cursor to the latest message")
_ = cmd.MarkFlagRequired("thread")
return cmd
}
@@ -0,0 +1,196 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func TestShowReturnsThreadAndMessageHistory(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--subject", "Implement feature X",
"--summary", "Initial request",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--thread", threadID,
"--summary", "Follow-up request",
"--body", "Please include request logging.",
)
showOut := runInboxCommand(t, "--db", dbPath, "--json", "show", "--thread", threadID)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
if got := nestedString(t, showResp, "data", "thread", "thread_id"); got != threadID {
t.Fatalf("expected thread %q, got %q", threadID, got)
}
messages, ok := nestedValue(t, showResp, "data", "messages").([]any)
if !ok || len(messages) != 2 {
t.Fatalf("expected two ordered messages, got %#v", nestedValue(t, showResp, "data", "messages"))
}
first, ok := messages[0].(map[string]any)
if !ok {
t.Fatalf("expected first message object, got %#v", messages[0])
}
second, ok := messages[1].(map[string]any)
if !ok {
t.Fatalf("expected second message object, got %#v", messages[1])
}
if got := first["summary"]; got != "Initial request" {
t.Fatalf("expected first summary Initial request, got %#v", got)
}
if got := second["summary"]; got != "Follow-up request" {
t.Fatalf("expected second summary Follow-up request, got %#v", got)
}
}
func TestShowIncludesArtifactsPerMessage(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
artifactPath := filepath.Join(tempDir, "task.md")
if err := os.WriteFile(artifactPath, []byte("task brief"), 0o644); err != nil {
t.Fatalf("write artifact file: %v", err)
}
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-a",
"--subject", "Artifact task",
"--summary", "Attach brief",
"--artifact", artifactPath,
"--artifact-kind", "brief",
"--artifact-metadata-json", `{"label":"task-brief"}`,
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
showOut := runInboxCommand(t, "--db", dbPath, "--json", "show", "--thread", threadID)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
messages, ok := nestedValue(t, showResp, "data", "messages").([]any)
if !ok || len(messages) == 0 {
t.Fatalf("expected messages with artifacts, got %#v", nestedValue(t, showResp, "data", "messages"))
}
first, ok := messages[0].(map[string]any)
if !ok {
t.Fatalf("expected message object, got %#v", messages[0])
}
artifacts, ok := first["artifacts"].([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %#v", first["artifacts"])
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if got := artifact["path"]; got != artifactPath {
t.Fatalf("expected artifact path %q, got %#v", artifactPath, got)
}
if got := artifact["kind"]; got != "brief" {
t.Fatalf("expected artifact kind brief, got %#v", got)
}
}
func TestShowMarkReadAdvancesReadCursor(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-e",
"--subject", "Review nav copy",
"--summary", "Check wording",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
runInboxCommand(
t,
"--db", dbPath,
"--agent", "worker-e",
"--json",
"show",
"--thread", threadID,
"--mark-read",
)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"fetch",
"--agent", "worker-e",
"--status", "pending",
"--unread",
)
if exitCode != 10 {
t.Fatalf("expected unread fetch to be empty after mark-read, got exit=%d with %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "no_matching_work")
}
func TestShowRejectsWhenThreadMissing(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"show",
"--thread", "thr_missing",
)
if exitCode != 40 {
t.Fatalf("expected not-found exit code 40, got %d with %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "not_found")
}
@@ -0,0 +1,78 @@
package inbox
import (
"bytes"
"encoding/json"
"testing"
)
func runInboxCommand(t *testing.T, args ...string) string {
t.Helper()
stdout, stderr, exitCode := executeInboxCommand(args...)
if exitCode != 0 {
t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, exitCode, stderr, stdout)
}
return stdout
}
func executeInboxCommand(args ...string) (string, string, int) {
var stdout bytes.Buffer
var stderr bytes.Buffer
exitCode := Execute(args, &stdout, &stderr)
return stdout.String(), stderr.String(), exitCode
}
func mustDecodeJSON(t *testing.T, raw string, target any) {
t.Helper()
if err := json.Unmarshal([]byte(raw), target); err != nil {
t.Fatalf("decode json %q: %v", raw, err)
}
}
func nestedString(t *testing.T, value map[string]any, keys ...string) string {
t.Helper()
current := nestedValue(t, value, keys...)
str, ok := current.(string)
if !ok {
t.Fatalf("expected string at %v, got %#v", keys, current)
}
return str
}
func nestedValue(t *testing.T, value map[string]any, keys ...string) any {
t.Helper()
var current any = value
for _, key := range keys {
obj, ok := current.(map[string]any)
if !ok {
t.Fatalf("expected object at %q in %v, got %#v", key, keys, current)
}
current, ok = obj[key]
if !ok {
t.Fatalf("missing key %q in %v", key, keys)
}
}
return current
}
func assertErrorJSON(t *testing.T, raw string, expectedCode string) {
t.Helper()
var payload map[string]any
mustDecodeJSON(t, raw, &payload)
if ok, _ := payload["ok"].(bool); ok {
t.Fatalf("expected ok=false error payload, got %#v", payload)
}
errorValue, ok := payload["error"].(map[string]any)
if !ok {
t.Fatalf("expected error object, got %#v", payload["error"])
}
if code, _ := errorValue["code"].(string); code != expectedCode {
t.Fatalf("expected error code %q, got %#v", expectedCode, errorValue["code"])
}
}
@@ -0,0 +1,101 @@
package inbox
import (
"fmt"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type updateOptions struct {
agent string
threadID string
status string
summary string
body string
bodyFile string
payloadJSON string
artifacts artifactOptions
}
func newUpdateCmd(root *rootOptions) *cobra.Command {
opts := &updateOptions{}
cmd := &cobra.Command{
Use: "update",
Short: "Append a progress or blocked update to a thread",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
if agent == "" {
return protocol.InvalidInput("agent is required", nil)
}
body, err := resolveBodyValue(opts.body, opts.bodyFile)
if err != nil {
return err
}
artifacts, err := resolveArtifacts(opts.artifacts)
if err != nil {
return err
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
thread, message, err := s.UpdateThreadStatus(ctx, store.UpdateInput{
ThreadID: opts.threadID,
Agent: agent,
Status: opts.status,
Summary: opts.summary,
Body: body,
PayloadJSON: opts.payloadJSON,
Artifacts: artifacts,
})
if err != nil {
return err
}
resp := protocol.Success{
OK: true,
Command: "update",
Data: map[string]any{
"thread": thread,
"message": message,
},
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "updated thread %s to %s\n", thread.ThreadID, thread.Status)
return err
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Updating agent")
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().StringVar(&opts.status, "status", "", "New status: in_progress or blocked")
cmd.Flags().StringVar(&opts.summary, "summary", "", "Short update summary")
cmd.Flags().StringVar(&opts.body, "body", "", "Update body")
cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read update body from file")
cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string")
addArtifactFlags(cmd, &opts.artifacts)
_ = cmd.MarkFlagRequired("thread")
_ = cmd.MarkFlagRequired("status")
_ = cmd.MarkFlagRequired("summary")
return cmd
}
@@ -0,0 +1,226 @@
package inbox
import (
"os"
"path/filepath"
"testing"
)
func seedThreadForInboxTests(t *testing.T, dbPath, from, to string) string {
t.Helper()
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", from,
"--to", to,
"--subject", "Implement feature X",
"--summary", "Add retry policy",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
return nestedString(t, sendResp, "data", "thread", "thread_id")
}
func seedClaimedThreadForInboxTests(t *testing.T, dbPath, from, to, claimer string) string {
t.Helper()
threadID := seedThreadForInboxTests(t, dbPath, from, to)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", claimer,
"--thread", threadID,
"--lease-seconds", "300",
)
return threadID
}
func lastThreadMessageFromShow(t *testing.T, showResp map[string]any) map[string]any {
t.Helper()
messagesValue := nestedValue(t, showResp, "data", "messages")
messages, ok := messagesValue.([]any)
if !ok || len(messages) == 0 {
t.Fatalf("expected non-empty messages, got %#v", messagesValue)
}
lastMessage, ok := messages[len(messages)-1].(map[string]any)
if !ok {
t.Fatalf("expected message object, got %#v", messages[len(messages)-1])
}
return lastMessage
}
func TestUpdateMovesThreadToInProgress(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
updateOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "in_progress",
"--summary", "Implementation started",
"--body", "Scanning current HTTP client usage.",
)
var updateResp map[string]any
mustDecodeJSON(t, updateOut, &updateResp)
if status := nestedString(t, updateResp, "data", "thread", "status"); status != "in_progress" {
t.Fatalf("expected in_progress thread status, got %q", status)
}
if kind := nestedString(t, updateResp, "data", "message", "kind"); kind != "progress" {
t.Fatalf("expected progress message kind, got %q", kind)
}
if toAgent := nestedString(t, updateResp, "data", "message", "to_agent"); toAgent != "leader" {
t.Fatalf("expected message to_agent leader, got %q", toAgent)
}
}
func TestUpdateMovesThreadToBlockedWithPayload(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
updateOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need timeout decision",
"--payload-json", `{"question":"Should retries apply to read timeouts?"}`,
)
var updateResp map[string]any
mustDecodeJSON(t, updateOut, &updateResp)
if status := nestedString(t, updateResp, "data", "thread", "status"); status != "blocked" {
t.Fatalf("expected blocked thread status, got %q", status)
}
if kind := nestedString(t, updateResp, "data", "message", "kind"); kind != "question" {
t.Fatalf("expected question message kind, got %q", kind)
}
payload, ok := nestedValue(t, updateResp, "data", "message", "payload_json").(map[string]any)
if !ok {
t.Fatalf("expected payload_json object, got %#v", nestedValue(t, updateResp, "data", "message", "payload_json"))
}
if got := payload["question"]; got != "Should retries apply to read timeouts?" {
t.Fatalf("expected payload question, got %#v", got)
}
}
func TestUpdateAcceptsBodyFileAndArtifact(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "coord.db")
progressPath := filepath.Join(tempDir, "progress.md")
body := "Progress update from file."
if err := os.WriteFile(progressPath, []byte(body), 0o644); err != nil {
t.Fatalf("write progress file: %v", err)
}
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "in_progress",
"--summary", "Implementation started",
"--body-file", progressPath,
"--artifact", progressPath,
"--artifact-kind", "note",
)
showOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"show",
"--thread", threadID,
)
var showResp map[string]any
mustDecodeJSON(t, showOut, &showResp)
lastMessage := lastThreadMessageFromShow(t, showResp)
if gotBody, _ := lastMessage["body"].(string); gotBody != body {
t.Fatalf("expected body %q, got %#v", body, lastMessage["body"])
}
artifacts, ok := lastMessage["artifacts"].([]any)
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %#v", lastMessage["artifacts"])
}
artifact, ok := artifacts[0].(map[string]any)
if !ok {
t.Fatalf("expected artifact object, got %#v", artifacts[0])
}
if gotPath, _ := artifact["path"].(string); gotPath != progressPath {
t.Fatalf("expected artifact path %q, got %#v", progressPath, artifact["path"])
}
if gotKind, _ := artifact["kind"].(string); gotKind != "note" {
t.Fatalf("expected artifact kind note, got %#v", artifact["kind"])
}
}
func TestUpdateRejectsInvalidPayloadJSON(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"update",
"--agent", "worker-a",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need timeout decision",
"--payload-json", "not-json",
)
if exitCode != 30 {
t.Fatalf("expected exit code 30, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "invalid_input")
}
func TestUpdateRejectsNonOwner(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID := seedClaimedThreadForInboxTests(t, dbPath, "leader", "worker-a", "worker-a")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"update",
"--agent", "worker-b",
"--thread", threadID,
"--status", "in_progress",
"--summary", "Implementation started",
)
if exitCode != 20 {
t.Fatalf("expected exit code 20, got %d with output %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "lease_conflict")
}
@@ -0,0 +1,85 @@
package inbox
import (
"fmt"
"time"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type waitReplyOptions struct {
threadID string
afterMessageID string
afterEventID int64
kinds string
timeoutSeconds int
}
func newWaitReplyCmd(root *rootOptions) *cobra.Command {
opts := &waitReplyOptions{}
cmd := &cobra.Command{
Use: "wait-reply",
Short: "Block until a reply-like message appears in a thread",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
agent := root.agent
result, err := s.WaitReply(ctx, store.WaitReplyInput{
ThreadID: opts.threadID,
AfterMessageID: opts.afterMessageID,
AfterEventID: opts.afterEventID,
Kinds: parseCSV(opts.kinds),
Agent: agent,
Timeout: time.Duration(opts.timeoutSeconds) * time.Second,
})
if err != nil {
return err
}
if !result.Woke {
return protocol.NoMatchingWork("no matching reply before timeout")
}
data := map[string]any{
"woke": result.Woke,
"next_event_id": result.NextEventID,
}
if result.Message != nil {
data["message"] = result.Message
}
resp := protocol.Success{
OK: true,
Command: "wait-reply",
Data: data,
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "reply received on thread %s at event %d\n", result.Message.ThreadID, result.NextEventID)
return err
},
}
cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID")
cmd.Flags().StringVar(&opts.afterMessageID, "after-message", "", "Resume after a known message ID")
cmd.Flags().Int64Var(&opts.afterEventID, "after-event", 0, "Resume after a known event ID")
cmd.Flags().StringVar(&opts.kinds, "kinds", "answer,control,result", "Comma-separated message kinds to wake on")
cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait; 0 waits forever")
_ = cmd.MarkFlagRequired("thread")
return cmd
}
@@ -0,0 +1,221 @@
package inbox
import (
"path/filepath"
"strconv"
"testing"
"time"
)
type waitReplyCommandResult struct {
stdout string
stderr string
exit int
}
func TestWaitReplyWakesOnAnswerAfterMessage(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath)
waitCh := make(chan waitReplyCommandResult, 1)
go func() {
stdout, stderr, exitCode := executeInboxCommand(
"--db", dbPath,
"--agent", "worker-c",
"--json",
"wait-reply",
"--thread", threadID,
"--after-message", blockedMessageID,
"--timeout-seconds", "2",
)
waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
}()
time.Sleep(200 * time.Millisecond)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-c",
"--thread", threadID,
"--summary", "Redirect to login",
"--body", "Redirect guests to login for the MVP.",
)
var waitResult waitReplyCommandResult
select {
case waitResult = <-waitCh:
case <-time.After(3 * time.Second):
t.Fatal("wait-reply command did not return")
}
if waitResult.exit != 0 {
t.Fatalf("wait-reply failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
}
var waitResp map[string]any
mustDecodeJSON(t, waitResult.stdout, &waitResp)
if woke, ok := nestedValue(t, waitResp, "data", "woke").(bool); !ok || !woke {
t.Fatalf("expected wait-reply wake, got %#v", nestedValue(t, waitResp, "data", "woke"))
}
if kind := nestedString(t, waitResp, "data", "message", "kind"); kind != "answer" {
t.Fatalf("expected answer wake message, got %q", kind)
}
}
func TestWaitReplyCanStartFromAfterEvent(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID, blockedMessageID := seedBlockedThreadForWaitReply(t, dbPath)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-c",
"--thread", threadID,
"--summary", "First answer",
"--body", "First reply payload.",
)
firstWaitOut := runInboxCommand(
t,
"--db", dbPath,
"--agent", "worker-c",
"--json",
"wait-reply",
"--thread", threadID,
"--after-message", blockedMessageID,
"--timeout-seconds", "2",
)
var firstWaitResp map[string]any
mustDecodeJSON(t, firstWaitOut, &firstWaitResp)
firstEventIDFloat, ok := nestedValue(t, firstWaitResp, "data", "next_event_id").(float64)
if !ok {
t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, firstWaitResp, "data", "next_event_id"))
}
firstEventID := int64(firstEventIDFloat)
waitCh := make(chan waitReplyCommandResult, 1)
go func() {
stdout, stderr, exitCode := executeInboxCommand(
"--db", dbPath,
"--agent", "worker-c",
"--json",
"wait-reply",
"--thread", threadID,
"--after-event", strconv.FormatInt(firstEventID, 10),
"--timeout-seconds", "2",
)
waitCh <- waitReplyCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
}()
time.Sleep(200 * time.Millisecond)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"reply",
"--from", "leader",
"--to", "worker-c",
"--thread", threadID,
"--summary", "Second answer",
"--body", "Second reply payload.",
)
var waitResult waitReplyCommandResult
select {
case waitResult = <-waitCh:
case <-time.After(3 * time.Second):
t.Fatal("wait-reply after-event command did not return")
}
if waitResult.exit != 0 {
t.Fatalf("wait-reply after-event failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout)
}
var waitResp map[string]any
mustDecodeJSON(t, waitResult.stdout, &waitResp)
if got := nestedString(t, waitResp, "data", "message", "summary"); got != "Second answer" {
t.Fatalf("expected second answer wake message, got %q", got)
}
secondEventIDFloat, ok := nestedValue(t, waitResp, "data", "next_event_id").(float64)
if !ok {
t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, waitResp, "data", "next_event_id"))
}
if int64(secondEventIDFloat) <= firstEventID {
t.Fatalf("expected second event id > first event id, got %d <= %d", int64(secondEventIDFloat), firstEventID)
}
}
func TestWaitReplyTimesOutWhenNoReply(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
threadID, _ := seedBlockedThreadForWaitReply(t, dbPath)
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--agent", "worker-c",
"--json",
"wait-reply",
"--thread", threadID,
"--timeout-seconds", "1",
)
if exitCode != 10 {
t.Fatalf("expected wait-reply timeout exit code 10, got %d with %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "no_matching_work")
}
func seedBlockedThreadForWaitReply(t *testing.T, dbPath string) (threadID string, blockedMessageID string) {
t.Helper()
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-c",
"--subject", "Investigate auth edge case",
"--summary", "Check auth redirect behavior",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID = nestedString(t, sendResp, "data", "thread", "thread_id")
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-c",
"--thread", threadID,
)
blockedOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-c",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need policy decision",
)
var blockedResp map[string]any
mustDecodeJSON(t, blockedOut, &blockedResp)
blockedMessageID = nestedString(t, blockedResp, "data", "message", "message_id")
return threadID, blockedMessageID
}
@@ -0,0 +1,90 @@
package inbox
import (
"fmt"
"time"
"ai-workflow-skill/packages/coord-core/protocol"
"ai-workflow-skill/packages/coord-core/store"
"github.com/spf13/cobra"
)
type watchOptions struct {
agent string
statuses string
timeoutSeconds int
afterEventID int64
}
func newWatchCmd(root *rootOptions) *cobra.Command {
opts := &watchOptions{}
cmd := &cobra.Command{
Use: "watch",
Short: "Block until new matching activity appears",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
agent := opts.agent
if agent == "" {
agent = root.agent
}
sqlDB, err := openInboxDB(ctx, root.dbPath)
if err != nil {
return err
}
defer sqlDB.Close()
s := store.NewInboxStore(sqlDB)
result, err := s.WatchThreads(ctx, store.WatchInput{
Agent: agent,
Statuses: parseCSV(opts.statuses),
AfterEventID: opts.afterEventID,
StartFromNow: !cmd.Flags().Changed("after-event"),
Timeout: time.Duration(opts.timeoutSeconds) * time.Second,
})
if err != nil {
return err
}
if !result.Woke {
return protocol.NoMatchingWork("no matching work before watch timeout")
}
data := map[string]any{
"woke": result.Woke,
"next_event_id": result.NextEventID,
}
if result.Thread != nil {
data["thread"] = result.Thread
}
if result.Message != nil {
data["message"] = result.Message
}
if result.Event != nil {
data["event"] = result.Event
}
resp := protocol.Success{
OK: true,
Command: "watch",
Data: data,
}
if root.json {
return protocol.WriteJSON(cmd.OutOrStdout(), resp)
}
_, err = fmt.Fprintf(cmd.OutOrStdout(), "watch woke on thread %s at event %d\n", result.Thread.ThreadID, result.NextEventID)
return err
},
}
cmd.Flags().StringVar(&opts.agent, "agent", "", "Assigned agent filter")
cmd.Flags().StringVar(&opts.statuses, "status", "pending,blocked,done,failed", "Comma-separated status filter")
cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait; 0 waits forever")
cmd.Flags().Int64Var(&opts.afterEventID, "after-event", 0, "Resume after a known event ID")
return cmd
}
@@ -0,0 +1,171 @@
package inbox
import (
"path/filepath"
"testing"
"time"
)
type watchCommandResult struct {
stdout string
stderr string
exit int
}
func TestWatchWakesOnMatchingThread(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
watchCh := make(chan watchCommandResult, 1)
go func() {
stdout, stderr, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"watch",
"--agent", "worker-d",
"--status", "pending",
"--timeout-seconds", "2",
)
watchCh <- watchCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
}()
time.Sleep(200 * time.Millisecond)
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-d",
"--subject", "Build admin editor",
"--summary", "Create the first editor screen",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
var watchResult watchCommandResult
select {
case watchResult = <-watchCh:
case <-time.After(3 * time.Second):
t.Fatal("watch command did not return")
}
if watchResult.exit != 0 {
t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", watchResult.exit, watchResult.stderr, watchResult.stdout)
}
var watchResp map[string]any
mustDecodeJSON(t, watchResult.stdout, &watchResp)
if woke, ok := nestedValue(t, watchResp, "data", "woke").(bool); !ok || !woke {
t.Fatalf("expected watch to wake, got %#v", nestedValue(t, watchResp, "data", "woke"))
}
if got := nestedString(t, watchResp, "data", "thread", "thread_id"); got != threadID {
t.Fatalf("expected woken thread %q, got %q", threadID, got)
}
nextEventID, ok := nestedValue(t, watchResp, "data", "next_event_id").(float64)
if !ok {
t.Fatalf("expected numeric next_event_id, got %#v", nestedValue(t, watchResp, "data", "next_event_id"))
}
eventID, ok := nestedValue(t, watchResp, "data", "event", "event_id").(float64)
if !ok {
t.Fatalf("expected numeric event_id, got %#v", nestedValue(t, watchResp, "data", "event", "event_id"))
}
if nextEventID != eventID {
t.Fatalf("expected next_event_id == event.event_id, got %v vs %v", nextEventID, eventID)
}
}
func TestWatchRespectsStatusFilter(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
sendOut := runInboxCommand(
t,
"--db", dbPath,
"--json",
"send",
"--from", "leader",
"--to", "worker-c",
"--subject", "Investigate policy edge case",
"--summary", "Initial request",
)
var sendResp map[string]any
mustDecodeJSON(t, sendOut, &sendResp)
threadID := nestedString(t, sendResp, "data", "thread", "thread_id")
watchCh := make(chan watchCommandResult, 1)
go func() {
stdout, stderr, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"watch",
"--agent", "worker-c",
"--status", "blocked",
"--timeout-seconds", "2",
)
watchCh <- watchCommandResult{stdout: stdout, stderr: stderr, exit: exitCode}
}()
time.Sleep(200 * time.Millisecond)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"claim",
"--agent", "worker-c",
"--thread", threadID,
)
runInboxCommand(
t,
"--db", dbPath,
"--json",
"update",
"--agent", "worker-c",
"--thread", threadID,
"--status", "blocked",
"--summary", "Need policy decision",
)
var watchResult watchCommandResult
select {
case watchResult = <-watchCh:
case <-time.After(3 * time.Second):
t.Fatal("watch command did not return")
}
if watchResult.exit != 0 {
t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", watchResult.exit, watchResult.stderr, watchResult.stdout)
}
var watchResp map[string]any
mustDecodeJSON(t, watchResult.stdout, &watchResp)
if status := nestedString(t, watchResp, "data", "thread", "status"); status != "blocked" {
t.Fatalf("expected blocked status wake, got %q", status)
}
}
func TestWatchTimesOutWithNoActivity(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
runInboxCommand(t, "--db", dbPath, "--json", "init")
stdout, _, exitCode := executeInboxCommand(
"--db", dbPath,
"--json",
"watch",
"--agent", "worker-d",
"--status", "pending",
"--timeout-seconds", "1",
)
if exitCode != 10 {
t.Fatalf("expected watch timeout exit code 10, got %d with %s", exitCode, stdout)
}
assertErrorJSON(t, stdout, "no_matching_work")
}