From f315d2330d672319ee6f771ae014654e44e3f7b1 Mon Sep 17 00:00:00 2001 From: kurihada Date: Thu, 19 Mar 2026 03:25:06 +0800 Subject: [PATCH] Finalize inbox artifacts and error protocol --- cmd/inbox/main.go | 4 +- docs/implementation-roadmap.md | 4 +- docs/inbox-cli.md | 20 +++ internal/cli/inbox/artifact.go | 78 ++++++++ internal/cli/inbox/body.go | 7 +- internal/cli/inbox/cancel.go | 21 ++- internal/cli/inbox/claim.go | 2 +- internal/cli/inbox/done.go | 9 +- internal/cli/inbox/execute.go | 113 ++++++++++++ internal/cli/inbox/fetch.go | 3 + internal/cli/inbox/integration_test.go | 156 +++++++++++++--- internal/cli/inbox/list.go | 3 + internal/cli/inbox/renew.go | 2 +- internal/cli/inbox/reply.go | 9 +- internal/cli/inbox/root.go | 6 +- internal/cli/inbox/send.go | 11 +- internal/cli/inbox/show.go | 5 + internal/cli/inbox/update.go | 9 +- internal/cli/inbox/wait_reply.go | 7 +- internal/cli/inbox/watch.go | 7 +- internal/protocol/cli_error.go | 33 ++++ internal/store/inbox.go | 236 +++++++++++++++++++++---- 22 files changed, 659 insertions(+), 86 deletions(-) create mode 100644 internal/cli/inbox/artifact.go create mode 100644 internal/cli/inbox/execute.go create mode 100644 internal/protocol/cli_error.go diff --git a/cmd/inbox/main.go b/cmd/inbox/main.go index 0679c6b..3c45b94 100644 --- a/cmd/inbox/main.go +++ b/cmd/inbox/main.go @@ -7,7 +7,5 @@ import ( ) func main() { - if err := inboxcli.NewRootCmd().Execute(); err != nil { - os.Exit(1) - } + os.Exit(inboxcli.Execute(os.Args[1:], os.Stdout, os.Stderr)) } diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index f9c317f..55fa495 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -18,8 +18,8 @@ As of now: - `inbox` and `orch` both compile - shared SQLite schema initialization exists - `inbox` is implemented end-to-end, including send/fetch/claim/renew/update/reply/done/fail/cancel/list/show/watch/wait-reply -- `inbox` supports blocking waits, lease renewal, unread fetches, and `--body-file` -- integration tests cover the main inbox lifecycle plus wait/watch flows +- `inbox` supports blocking waits, lease renewal, unread fetches, `--body-file`, artifact attachments, and structured JSON errors with stable exit codes +- integration tests cover the main inbox lifecycle, wait/watch flows, artifact persistence, and JSON error contracts - `orch` currently exists as a command skeleton only - no scheduler workflows have been implemented yet diff --git a/docs/inbox-cli.md b/docs/inbox-cli.md index 93e500e..c87063d 100644 --- a/docs/inbox-cli.md +++ b/docs/inbox-cli.md @@ -146,6 +146,9 @@ Suggested flags: - `--body-file PATH` - `--payload-json STRING` - `--priority low|normal|high` +- `--artifact PATH` +- `--artifact-kind KIND` +- `--artifact-metadata-json STRING` ### `inbox fetch` @@ -191,6 +194,9 @@ Suggested flags: - `--body TEXT` - `--body-file PATH` - `--payload-json STRING` +- `--artifact PATH` +- `--artifact-kind KIND` +- `--artifact-metadata-json STRING` ### `inbox reply` @@ -206,6 +212,9 @@ Suggested flags: - `--body TEXT` - `--body-file PATH` - `--payload-json STRING` +- `--artifact PATH` +- `--artifact-kind KIND` +- `--artifact-metadata-json STRING` ### `inbox done` @@ -219,6 +228,9 @@ Suggested flags: - `--body TEXT` - `--body-file PATH` - `--payload-json STRING` +- `--artifact PATH` +- `--artifact-kind KIND` +- `--artifact-metadata-json STRING` ### `inbox fail` @@ -231,6 +243,9 @@ Suggested flags: - `--summary TEXT` - `--body TEXT` - `--payload-json STRING` +- `--artifact PATH` +- `--artifact-kind KIND` +- `--artifact-metadata-json STRING` ### `inbox cancel` @@ -241,6 +256,9 @@ Suggested flags: - `--agent AGENT` - `--thread THREAD_ID` - `--reason TEXT` +- `--artifact PATH` +- `--artifact-kind KIND` +- `--artifact-metadata-json STRING` ### `inbox list` @@ -263,6 +281,8 @@ Suggested flags: - `--thread THREAD_ID` - `--json` +`show` should include per-message artifact references when present. + ### `inbox watch` Block until new matching activity appears. diff --git a/internal/cli/inbox/artifact.go b/internal/cli/inbox/artifact.go new file mode 100644 index 0000000..5c886df --- /dev/null +++ b/internal/cli/inbox/artifact.go @@ -0,0 +1,78 @@ +package inbox + +import ( + "strings" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type artifactOptions struct { + paths []string + kinds []string + metadataJSONs []string +} + +func addArtifactFlags(cmd *cobra.Command, opts *artifactOptions) { + cmd.Flags().StringArrayVar(&opts.paths, "artifact", nil, "Artifact path to attach; may be repeated") + cmd.Flags().StringArrayVar(&opts.kinds, "artifact-kind", nil, "Artifact kind; one value applies to all, or match artifact count") + cmd.Flags().StringArrayVar(&opts.metadataJSONs, "artifact-metadata-json", nil, "Artifact metadata JSON; one value applies to all, or match artifact count") +} + +func resolveArtifacts(opts artifactOptions) ([]store.ArtifactInput, error) { + if len(opts.paths) == 0 { + if len(opts.kinds) > 0 || len(opts.metadataJSONs) > 0 { + return nil, protocol.InvalidInput("artifact-kind and artifact-metadata-json require at least one artifact path", nil) + } + return nil, nil + } + + kinds, err := expandArtifactValues(opts.kinds, len(opts.paths), "artifact-kind") + if err != nil { + return nil, err + } + metadataJSONs, err := expandArtifactValues(opts.metadataJSONs, len(opts.paths), "artifact-metadata-json") + if err != nil { + return nil, err + } + + artifacts := make([]store.ArtifactInput, 0, len(opts.paths)) + for i, path := range opts.paths { + if strings.TrimSpace(path) == "" { + return nil, protocol.InvalidInput("artifact path cannot be empty", nil) + } + + artifact := store.ArtifactInput{ + Path: path, + Kind: "file", + } + if len(kinds) > 0 { + artifact.Kind = kinds[i] + } + if len(metadataJSONs) > 0 { + artifact.MetadataJSON = metadataJSONs[i] + } + artifacts = append(artifacts, artifact) + } + + return artifacts, nil +} + +func expandArtifactValues(values []string, target int, flagName string) ([]string, error) { + switch len(values) { + case 0: + return nil, nil + case 1: + out := make([]string, target) + for i := range out { + out[i] = values[0] + } + return out, nil + case target: + return values, nil + default: + return nil, protocol.InvalidInput(flagName+" must be specified once or once per artifact", nil) + } +} diff --git a/internal/cli/inbox/body.go b/internal/cli/inbox/body.go index f646688..d3bb85d 100644 --- a/internal/cli/inbox/body.go +++ b/internal/cli/inbox/body.go @@ -1,13 +1,14 @@ package inbox import ( - "fmt" "os" + + "ai-workflow-skill/internal/protocol" ) func resolveBodyValue(body, bodyFile string) (string, error) { if body != "" && bodyFile != "" { - return "", fmt.Errorf("body and body-file are mutually exclusive") + return "", protocol.InvalidInput("body and body-file are mutually exclusive", nil) } if bodyFile == "" { return body, nil @@ -15,7 +16,7 @@ func resolveBodyValue(body, bodyFile string) (string, error) { content, err := os.ReadFile(bodyFile) if err != nil { - return "", fmt.Errorf("read body file %q: %w", bodyFile, err) + return "", protocol.InvalidInput("failed to read body-file", err) } return string(content), nil } diff --git a/internal/cli/inbox/cancel.go b/internal/cli/inbox/cancel.go index b4bc7a9..9b6ddef 100644 --- a/internal/cli/inbox/cancel.go +++ b/internal/cli/inbox/cancel.go @@ -11,9 +11,10 @@ import ( ) type cancelOptions struct { - agent string - threadID string - reason string + agent string + threadID string + reason string + artifacts artifactOptions } func newCancelCmd(root *rootOptions) *cobra.Command { @@ -30,7 +31,11 @@ func newCancelCmd(root *rootOptions) *cobra.Command { agent = root.agent } if agent == "" { - return fmt.Errorf("agent is required") + return protocol.InvalidInput("agent is required", nil) + } + artifacts, err := resolveArtifacts(opts.artifacts) + if err != nil { + return err } sqlDB, err := db.Open(ctx, root.dbPath) @@ -41,9 +46,10 @@ func newCancelCmd(root *rootOptions) *cobra.Command { s := store.NewInboxStore(sqlDB) thread, message, err := s.CancelThread(ctx, store.CancelInput{ - ThreadID: opts.threadID, - Agent: agent, - Reason: opts.reason, + ThreadID: opts.threadID, + Agent: agent, + Reason: opts.reason, + Artifacts: artifacts, }) if err != nil { return err @@ -70,6 +76,7 @@ func newCancelCmd(root *rootOptions) *cobra.Command { cmd.Flags().StringVar(&opts.agent, "agent", "", "Acting agent") cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID") cmd.Flags().StringVar(&opts.reason, "reason", "", "Cancellation reason") + addArtifactFlags(cmd, &opts.artifacts) _ = cmd.MarkFlagRequired("thread") diff --git a/internal/cli/inbox/claim.go b/internal/cli/inbox/claim.go index e512354..c882bfb 100644 --- a/internal/cli/inbox/claim.go +++ b/internal/cli/inbox/claim.go @@ -31,7 +31,7 @@ func newClaimCmd(root *rootOptions) *cobra.Command { agent = root.agent } if agent == "" { - return fmt.Errorf("agent is required") + return protocol.InvalidInput("agent is required", nil) } sqlDB, err := db.Open(ctx, root.dbPath) diff --git a/internal/cli/inbox/done.go b/internal/cli/inbox/done.go index 6f25f9c..b98a4b4 100644 --- a/internal/cli/inbox/done.go +++ b/internal/cli/inbox/done.go @@ -17,6 +17,7 @@ type completeOptions struct { body string bodyFile string payloadJSON string + artifacts artifactOptions } func newDoneCmd(root *rootOptions) *cobra.Command { @@ -41,13 +42,17 @@ func newCompleteCmd(root *rootOptions, mode string) *cobra.Command { agent = root.agent } if agent == "" { - return fmt.Errorf("agent is required") + return protocol.InvalidInput("agent is required", nil) } body, err := resolveBodyValue(opts.body, opts.bodyFile) if err != nil { return err } + artifacts, err := resolveArtifacts(opts.artifacts) + if err != nil { + return err + } sqlDB, err := db.Open(ctx, root.dbPath) if err != nil { @@ -63,6 +68,7 @@ func newCompleteCmd(root *rootOptions, mode string) *cobra.Command { Body: body, PayloadJSON: opts.payloadJSON, Failed: mode == "fail", + Artifacts: artifacts, }) if err != nil { return err @@ -92,6 +98,7 @@ func newCompleteCmd(root *rootOptions, mode string) *cobra.Command { cmd.Flags().StringVar(&opts.body, "body", "", "Completion body") cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read completion body from file") cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string") + addArtifactFlags(cmd, &opts.artifacts) _ = cmd.MarkFlagRequired("thread") _ = cmd.MarkFlagRequired("summary") diff --git a/internal/cli/inbox/execute.go b/internal/cli/inbox/execute.go new file mode 100644 index 0000000..88374a9 --- /dev/null +++ b/internal/cli/inbox/execute.go @@ -0,0 +1,113 @@ +package inbox + +import ( + "errors" + "fmt" + "io" + "strings" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" +) + +func Execute(args []string, stdout, stderr io.Writer) int { + cmd := NewRootCmd() + cmd.SetOut(stdout) + cmd.SetErr(stderr) + cmd.SetArgs(args) + + if err := cmd.Execute(); err != nil { + jsonOutput := hasJSONFlag(args) + renderError(stdout, stderr, jsonOutput, err) + return exitCodeForError(err) + } + + return 0 +} + +func exitCodeForError(err error) int { + var cliErr *protocol.CLIError + if errors.As(err, &cliErr) { + return cliErr.ExitCode + } + + switch { + case isUsageError(err): + return 30 + case errors.Is(err, store.ErrLeaseConflict): + return 20 + case errors.Is(err, store.ErrThreadNotFound), errors.Is(err, store.ErrMessageNotFound): + return 40 + case errors.Is(err, store.ErrInvalidInput), errors.Is(err, store.ErrInvalidState), errors.Is(err, store.ErrNoActiveLease): + return 30 + default: + return 50 + } +} + +func errorCodeForError(err error) string { + var cliErr *protocol.CLIError + if errors.As(err, &cliErr) { + return cliErr.Code + } + + switch { + case isUsageError(err): + return "invalid_input" + case errors.Is(err, store.ErrLeaseConflict): + return "lease_conflict" + case errors.Is(err, store.ErrThreadNotFound), errors.Is(err, store.ErrMessageNotFound): + return "not_found" + case errors.Is(err, store.ErrInvalidInput): + return "invalid_input" + case errors.Is(err, store.ErrInvalidState), errors.Is(err, store.ErrNoActiveLease): + return "invalid_state" + default: + return "internal_error" + } +} + +func renderError(stdout, stderr io.Writer, jsonOutput bool, err error) { + message := errorMessage(err) + if jsonOutput { + _ = protocol.WriteJSON(stdout, protocol.Error{ + OK: false, + Error: protocol.ErrorPayload{ + Code: errorCodeForError(err), + Message: message, + }, + }) + return + } + + _, _ = fmt.Fprintln(stderr, message) +} + +func errorMessage(err error) string { + var cliErr *protocol.CLIError + if errors.As(err, &cliErr) { + return cliErr.Message + } + return err.Error() +} + +func hasJSONFlag(args []string) bool { + for _, arg := range args { + if arg == "--json" { + return true + } + if strings.HasPrefix(arg, "--json=") { + return !strings.HasSuffix(arg, "=false") + } + } + return false +} + +func isUsageError(err error) bool { + message := err.Error() + return strings.HasPrefix(message, "required flag(s)") || + strings.HasPrefix(message, "unknown flag:") || + strings.HasPrefix(message, "unknown command ") || + strings.Contains(message, " accepts ") || + strings.Contains(message, "invalid argument ") +} diff --git a/internal/cli/inbox/fetch.go b/internal/cli/inbox/fetch.go index 3a75430..98e12dc 100644 --- a/internal/cli/inbox/fetch.go +++ b/internal/cli/inbox/fetch.go @@ -48,6 +48,9 @@ func newFetchCmd(root *rootOptions) *cobra.Command { if err != nil { return err } + if len(threads) == 0 { + return protocol.NoMatchingWork("no matching work") + } resp := protocol.Success{ OK: true, diff --git a/internal/cli/inbox/integration_test.go b/internal/cli/inbox/integration_test.go index 552ebc5..b916fb2 100644 --- a/internal/cli/inbox/integration_test.go +++ b/internal/cli/inbox/integration_test.go @@ -309,12 +309,12 @@ func TestInboxRenewWaitReplyAndCancel(t *testing.T) { type commandResult struct { stdout string stderr string - err error + exit int } waitCh := make(chan commandResult, 1) go func() { - stdout, stderr, err := executeInboxCommand( + stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--json", "wait-reply", @@ -322,7 +322,7 @@ func TestInboxRenewWaitReplyAndCancel(t *testing.T) { "--after-message", blockedMessageID, "--timeout-seconds", "2", ) - waitCh <- commandResult{stdout: stdout, stderr: stderr, err: err} + waitCh <- commandResult{stdout: stdout, stderr: stderr, exit: exitCode} }() time.Sleep(200 * time.Millisecond) @@ -346,8 +346,8 @@ func TestInboxRenewWaitReplyAndCancel(t *testing.T) { t.Fatal("wait-reply command did not return") } - if waitResult.err != nil { - t.Fatalf("wait-reply failed: %v\nstderr:\n%s", waitResult.err, waitResult.stderr) + if waitResult.exit != 0 { + t.Fatalf("wait-reply failed with exit=%d\nstderr:\n%s\nstdout:\n%s", waitResult.exit, waitResult.stderr, waitResult.stdout) } var waitResp map[string]any @@ -392,12 +392,12 @@ func TestInboxWatchListUnreadAndAppend(t *testing.T) { type commandResult struct { stdout string stderr string - err error + exit int } watchCh := make(chan commandResult, 1) go func() { - stdout, stderr, err := executeInboxCommand( + stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, "--json", "watch", @@ -405,7 +405,7 @@ func TestInboxWatchListUnreadAndAppend(t *testing.T) { "--status", "pending", "--timeout-seconds", "2", ) - watchCh <- commandResult{stdout: stdout, stderr: stderr, err: err} + watchCh <- commandResult{stdout: stdout, stderr: stderr, exit: exitCode} }() time.Sleep(200 * time.Millisecond) @@ -420,6 +420,9 @@ func TestInboxWatchListUnreadAndAppend(t *testing.T) { "--subject", "Build admin editor", "--summary", "Create the first editor screen", "--body-file", bodyPath, + "--artifact", bodyPath, + "--artifact-kind", "brief", + "--artifact-metadata-json", `{"label":"task-brief"}`, "--run", "run_blog_004", "--task", "T4", ) @@ -435,8 +438,8 @@ func TestInboxWatchListUnreadAndAppend(t *testing.T) { t.Fatal("watch command did not return") } - if watchResult.err != nil { - t.Fatalf("watch failed: %v\nstderr:\n%s", watchResult.err, watchResult.stderr) + if watchResult.exit != 0 { + t.Fatalf("watch failed with exit=%d\nstderr:\n%s\nstdout:\n%s", watchResult.exit, watchResult.stderr, watchResult.stdout) } var watchResp map[string]any @@ -514,29 +517,123 @@ func TestInboxWatchListUnreadAndAppend(t *testing.T) { if firstMessage["body"] != "Implement the initial admin post editor." { t.Fatalf("expected body-file content in first message, got %#v", firstMessage["body"]) } + artifacts, ok := firstMessage["artifacts"].([]any) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact on first message, got %#v", firstMessage["artifacts"]) + } + firstArtifact, ok := artifacts[0].(map[string]any) + if !ok { + t.Fatalf("expected artifact object, got %#v", artifacts[0]) + } + if firstArtifact["path"] != bodyPath { + t.Fatalf("expected artifact path %q, got %#v", bodyPath, firstArtifact["path"]) + } + if firstArtifact["kind"] != "brief" { + t.Fatalf("expected artifact kind brief, got %#v", firstArtifact["kind"]) + } +} + +func TestInboxJSONErrorsAndExitCodes(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + if _, _, exitCode := executeInboxCommand("--db", dbPath, "--json", "init"); exitCode != 0 { + t.Fatalf("expected init exit code 0, got %d", exitCode) + } + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-z", + "--status", "pending", + ) + if exitCode != 10 { + t.Fatalf("expected fetch no-match exit code 10, got %d", exitCode) + } + assertErrorJSON(t, stdout, "no_matching_work") + + stdout, _, exitCode = executeInboxCommand( + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-z", + "--thread", "thr_missing", + ) + if exitCode != 40 { + t.Fatalf("expected claim missing-thread exit code 40, got %d", exitCode) + } + assertErrorJSON(t, stdout, "not_found") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-z", + "--subject", "Review cache settings", + "--summary", "Check cache config", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-z", + "--thread", threadID, + ) + + stdout, _, exitCode = executeInboxCommand( + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-y", + "--thread", threadID, + ) + if exitCode != 20 { + t.Fatalf("expected lease conflict exit code 20, got %d", exitCode) + } + assertErrorJSON(t, stdout, "lease_conflict") + + stdout, _, exitCode = executeInboxCommand( + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-z", + "--subject", "Invalid body flags", + "--body", "inline", + "--body-file", filepath.Join(t.TempDir(), "missing.md"), + ) + if exitCode != 30 { + t.Fatalf("expected invalid input exit code 30, got %d", exitCode) + } + assertErrorJSON(t, stdout, "invalid_input") } func runInboxCommand(t *testing.T, args ...string) string { t.Helper() - stdout, stderr, err := executeInboxCommand(args...) - if err != nil { - t.Fatalf("execute inbox command %v: %v\nstderr:\n%s", args, err, stderr) + stdout, stderr, exitCode := executeInboxCommand(args...) + if exitCode != 0 { + t.Fatalf("execute inbox command %v: exit=%d\nstderr:\n%s\nstdout:\n%s", args, exitCode, stderr, stdout) } return stdout } -func executeInboxCommand(args ...string) (string, string, error) { - cmd := NewRootCmd() +func executeInboxCommand(args ...string) (string, string, int) { var stdout bytes.Buffer var stderr bytes.Buffer - cmd.SetOut(&stdout) - cmd.SetErr(&stderr) - cmd.SetArgs(args) - - err := cmd.Execute() - return stdout.String(), stderr.String(), err + exitCode := Execute(args, &stdout, &stderr) + return stdout.String(), stderr.String(), exitCode } func mustDecodeJSON(t *testing.T, raw string, target any) { @@ -574,3 +671,20 @@ func nestedValue(t *testing.T, value map[string]any, keys ...string) any { } return current } + +func assertErrorJSON(t *testing.T, raw string, expectedCode string) { + t.Helper() + + var payload map[string]any + mustDecodeJSON(t, raw, &payload) + if ok, _ := payload["ok"].(bool); ok { + t.Fatalf("expected ok=false error payload, got %#v", payload) + } + errorValue, ok := payload["error"].(map[string]any) + if !ok { + t.Fatalf("expected error object, got %#v", payload["error"]) + } + if code, _ := errorValue["code"].(string); code != expectedCode { + t.Fatalf("expected error code %q, got %#v", expectedCode, errorValue["code"]) + } +} diff --git a/internal/cli/inbox/list.go b/internal/cli/inbox/list.go index 65c9bbe..ffb3ebf 100644 --- a/internal/cli/inbox/list.go +++ b/internal/cli/inbox/list.go @@ -49,6 +49,9 @@ func newListCmd(root *rootOptions) *cobra.Command { if err != nil { return err } + if len(threads) == 0 { + return protocol.NoMatchingWork("no matching work") + } resp := protocol.Success{ OK: true, diff --git a/internal/cli/inbox/renew.go b/internal/cli/inbox/renew.go index 5b26761..29dca60 100644 --- a/internal/cli/inbox/renew.go +++ b/internal/cli/inbox/renew.go @@ -30,7 +30,7 @@ func newRenewCmd(root *rootOptions) *cobra.Command { agent = root.agent } if agent == "" { - return fmt.Errorf("agent is required") + return protocol.InvalidInput("agent is required", nil) } sqlDB, err := db.Open(ctx, root.dbPath) diff --git a/internal/cli/inbox/reply.go b/internal/cli/inbox/reply.go index 3c835c8..e420648 100644 --- a/internal/cli/inbox/reply.go +++ b/internal/cli/inbox/reply.go @@ -19,6 +19,7 @@ type replyOptions struct { body string bodyFile string payloadJSON string + artifacts artifactOptions } func newReplyCmd(root *rootOptions) *cobra.Command { @@ -35,13 +36,17 @@ func newReplyCmd(root *rootOptions) *cobra.Command { from = root.agent } if from == "" { - return fmt.Errorf("from agent is required") + return protocol.InvalidInput("from agent is required", nil) } body, err := resolveBodyValue(opts.body, opts.bodyFile) if err != nil { return err } + artifacts, err := resolveArtifacts(opts.artifacts) + if err != nil { + return err + } sqlDB, err := db.Open(ctx, root.dbPath) if err != nil { @@ -58,6 +63,7 @@ func newReplyCmd(root *rootOptions) *cobra.Command { Summary: opts.summary, Body: body, PayloadJSON: opts.payloadJSON, + Artifacts: artifacts, }) if err != nil { return err @@ -89,6 +95,7 @@ func newReplyCmd(root *rootOptions) *cobra.Command { cmd.Flags().StringVar(&opts.body, "body", "", "Reply body") cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read reply body from file") cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string") + addArtifactFlags(cmd, &opts.artifacts) _ = cmd.MarkFlagRequired("thread") _ = cmd.MarkFlagRequired("to") diff --git a/internal/cli/inbox/root.go b/internal/cli/inbox/root.go index f5a88c9..56045e6 100644 --- a/internal/cli/inbox/root.go +++ b/internal/cli/inbox/root.go @@ -14,8 +14,10 @@ func NewRootCmd() *cobra.Command { opts := &rootOptions{} cmd := &cobra.Command{ - Use: "inbox", - Short: "Worker-facing durable coordination bus", + Use: "inbox", + Short: "Worker-facing durable coordination bus", + SilenceErrors: true, + SilenceUsage: true, } cmd.PersistentFlags().StringVar(&opts.dbPath, "db", ".agents/coord.db", "SQLite database path") diff --git a/internal/cli/inbox/send.go b/internal/cli/inbox/send.go index 4168002..01e07a3 100644 --- a/internal/cli/inbox/send.go +++ b/internal/cli/inbox/send.go @@ -23,6 +23,7 @@ type sendOptions struct { bodyFile string payloadJSON string priority string + artifacts artifactOptions } func newSendCmd(root *rootOptions) *cobra.Command { @@ -39,16 +40,20 @@ func newSendCmd(root *rootOptions) *cobra.Command { from = root.agent } if from == "" { - return fmt.Errorf("from agent is required") + return protocol.InvalidInput("from agent is required", nil) } if opts.threadID == "" && opts.subject == "" { - return fmt.Errorf("subject is required when creating a new thread") + return protocol.InvalidInput("subject is required when creating a new thread", nil) } body, err := resolveBodyValue(opts.body, opts.bodyFile) if err != nil { return err } + artifacts, err := resolveArtifacts(opts.artifacts) + if err != nil { + return err + } sqlDB, err := db.Open(ctx, root.dbPath) if err != nil { @@ -69,6 +74,7 @@ func newSendCmd(root *rootOptions) *cobra.Command { Body: body, PayloadJSON: opts.payloadJSON, Priority: opts.priority, + Artifacts: artifacts, }) if err != nil { return err @@ -104,6 +110,7 @@ func newSendCmd(root *rootOptions) *cobra.Command { cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read message body from file") cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string") cmd.Flags().StringVar(&opts.priority, "priority", "normal", "Thread priority") + addArtifactFlags(cmd, &opts.artifacts) _ = cmd.MarkFlagRequired("to") diff --git a/internal/cli/inbox/show.go b/internal/cli/inbox/show.go index 952a981..f3ace1d 100644 --- a/internal/cli/inbox/show.go +++ b/internal/cli/inbox/show.go @@ -55,6 +55,11 @@ func newShowCmd(root *rootOptions) *cobra.Command { if _, err := fmt.Fprintf(cmd.OutOrStdout(), "- %s\t%s\t%s\n", message.MessageID, message.Kind, message.Summary); err != nil { return err } + for _, artifact := range message.Artifacts { + if _, err := fmt.Fprintf(cmd.OutOrStdout(), " artifact\t%s\t%s\n", artifact.Kind, artifact.Path); err != nil { + return err + } + } } return nil }, diff --git a/internal/cli/inbox/update.go b/internal/cli/inbox/update.go index f1f6229..54b3315 100644 --- a/internal/cli/inbox/update.go +++ b/internal/cli/inbox/update.go @@ -18,6 +18,7 @@ type updateOptions struct { body string bodyFile string payloadJSON string + artifacts artifactOptions } func newUpdateCmd(root *rootOptions) *cobra.Command { @@ -34,13 +35,17 @@ func newUpdateCmd(root *rootOptions) *cobra.Command { agent = root.agent } if agent == "" { - return fmt.Errorf("agent is required") + return protocol.InvalidInput("agent is required", nil) } body, err := resolveBodyValue(opts.body, opts.bodyFile) if err != nil { return err } + artifacts, err := resolveArtifacts(opts.artifacts) + if err != nil { + return err + } sqlDB, err := db.Open(ctx, root.dbPath) if err != nil { @@ -56,6 +61,7 @@ func newUpdateCmd(root *rootOptions) *cobra.Command { Summary: opts.summary, Body: body, PayloadJSON: opts.payloadJSON, + Artifacts: artifacts, }) if err != nil { return err @@ -86,6 +92,7 @@ func newUpdateCmd(root *rootOptions) *cobra.Command { cmd.Flags().StringVar(&opts.body, "body", "", "Update body") cmd.Flags().StringVar(&opts.bodyFile, "body-file", "", "Read update body from file") cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string") + addArtifactFlags(cmd, &opts.artifacts) _ = cmd.MarkFlagRequired("thread") _ = cmd.MarkFlagRequired("status") diff --git a/internal/cli/inbox/wait_reply.go b/internal/cli/inbox/wait_reply.go index 27f364d..5a564f7 100644 --- a/internal/cli/inbox/wait_reply.go +++ b/internal/cli/inbox/wait_reply.go @@ -45,6 +45,9 @@ func newWaitReplyCmd(root *rootOptions) *cobra.Command { if err != nil { return err } + if !result.Woke { + return protocol.NoMatchingWork("no matching reply before timeout") + } data := map[string]any{ "woke": result.Woke, @@ -63,10 +66,6 @@ func newWaitReplyCmd(root *rootOptions) *cobra.Command { if root.json { return protocol.WriteJSON(cmd.OutOrStdout(), resp) } - if !result.Woke { - _, err = fmt.Fprintln(cmd.OutOrStdout(), "wait-reply timed out") - return err - } _, err = fmt.Fprintf(cmd.OutOrStdout(), "reply received on thread %s at event %d\n", result.Message.ThreadID, result.NextEventID) return err diff --git a/internal/cli/inbox/watch.go b/internal/cli/inbox/watch.go index addd35a..8d89749 100644 --- a/internal/cli/inbox/watch.go +++ b/internal/cli/inbox/watch.go @@ -49,6 +49,9 @@ func newWatchCmd(root *rootOptions) *cobra.Command { if err != nil { return err } + if !result.Woke { + return protocol.NoMatchingWork("no matching work before watch timeout") + } data := map[string]any{ "woke": result.Woke, @@ -73,10 +76,6 @@ func newWatchCmd(root *rootOptions) *cobra.Command { if root.json { return protocol.WriteJSON(cmd.OutOrStdout(), resp) } - if !result.Woke { - _, err = fmt.Fprintln(cmd.OutOrStdout(), "watch timed out") - return err - } _, err = fmt.Fprintf(cmd.OutOrStdout(), "watch woke on thread %s at event %d\n", result.Thread.ThreadID, result.NextEventID) return err diff --git a/internal/protocol/cli_error.go b/internal/protocol/cli_error.go new file mode 100644 index 0000000..75fd04d --- /dev/null +++ b/internal/protocol/cli_error.go @@ -0,0 +1,33 @@ +package protocol + +type CLIError struct { + Code string + ExitCode int + Message string + Err error +} + +func (e *CLIError) Error() string { + return e.Message +} + +func (e *CLIError) Unwrap() error { + return e.Err +} + +func NewCLIError(code string, exitCode int, message string, err error) error { + return &CLIError{ + Code: code, + ExitCode: exitCode, + Message: message, + Err: err, + } +} + +func InvalidInput(message string, err error) error { + return NewCLIError("invalid_input", 30, message, err) +} + +func NoMatchingWork(message string) error { + return NewCLIError("no_matching_work", 10, message, nil) +} diff --git a/internal/store/inbox.go b/internal/store/inbox.go index ea93e28..23f364c 100644 --- a/internal/store/inbox.go +++ b/internal/store/inbox.go @@ -14,7 +14,10 @@ import ( var ErrLeaseConflict = errors.New("thread already claimed by another worker") var ErrThreadNotFound = errors.New("thread not found") +var ErrMessageNotFound = errors.New("message not found") var ErrNoActiveLease = errors.New("no active lease") +var ErrInvalidInput = errors.New("invalid input") +var ErrInvalidState = errors.New("invalid state") type InboxStore struct { db *sql.DB @@ -44,6 +47,22 @@ type Message struct { Body string `json:"body"` PayloadJSON json.RawMessage `json:"payload_json"` CreatedAt time.Time `json:"created_at"` + Artifacts []Artifact `json:"artifacts,omitempty"` +} + +type Artifact struct { + ArtifactID string `json:"artifact_id"` + MessageID string `json:"message_id"` + Path string `json:"path"` + Kind string `json:"kind"` + MetadataJSON json.RawMessage `json:"metadata_json"` + CreatedAt time.Time `json:"created_at"` +} + +type ArtifactInput struct { + Path string + Kind string + MetadataJSON string } type ThreadDetail struct { @@ -76,6 +95,7 @@ type SendInput struct { Body string PayloadJSON string Priority string + Artifacts []ArtifactInput } type FetchInput struct { @@ -109,6 +129,7 @@ type UpdateInput struct { Summary string Body string PayloadJSON string + Artifacts []ArtifactInput } type ReplyInput struct { @@ -119,6 +140,7 @@ type ReplyInput struct { Summary string Body string PayloadJSON string + Artifacts []ArtifactInput } type CompleteInput struct { @@ -128,12 +150,14 @@ type CompleteInput struct { Body string PayloadJSON string Failed bool + Artifacts []ArtifactInput } type CancelInput struct { - ThreadID string - Agent string - Reason string + ThreadID string + Agent string + Reason string + Artifacts []ArtifactInput } type ListInput struct { @@ -257,25 +281,14 @@ func (s *InboxStore) createThread(ctx context.Context, input SendInput) (Thread, PayloadJSON: json.RawMessage(payload), CreatedAt: now, } - - if _, err := tx.ExecContext( - ctx, - `INSERT INTO messages ( - message_id, thread_id, from_agent, to_agent, kind, summary, body, - payload_json, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, - message.MessageID, - message.ThreadID, - message.FromAgent, - message.ToAgent, - message.Kind, - message.Summary, - message.Body, - string(message.PayloadJSON), - formatTime(message.CreatedAt), - ); err != nil { - return Thread{}, Message{}, fmt.Errorf("insert message: %w", err) + if err := insertMessage(ctx, tx, message); err != nil { + return Thread{}, Message{}, err } + artifacts, err := insertArtifacts(ctx, tx, message.MessageID, input.Artifacts, now) + if err != nil { + return Thread{}, Message{}, err + } + message.Artifacts = artifacts if err := insertEvent(ctx, tx, eventInput{ RunID: thread.RunID, @@ -314,7 +327,7 @@ func (s *InboxStore) appendThreadMessage(ctx context.Context, existing Thread, i return Thread{}, Message{}, err } if isTerminalStatus(thread.Status) { - return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", thread.ThreadID) + return Thread{}, Message{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, thread.ThreadID) } assignedTo := thread.AssignedTo @@ -337,6 +350,11 @@ func (s *InboxStore) appendThreadMessage(ctx context.Context, existing Thread, i if err := insertMessage(ctx, tx, message); err != nil { return Thread{}, Message{}, err } + artifacts, err := insertArtifacts(ctx, tx, message.MessageID, input.Artifacts, now) + if err != nil { + return Thread{}, Message{}, err + } + message.Artifacts = artifacts if err := updateThreadState(ctx, tx, thread.ThreadID, thread.Status, assignedTo, message.MessageID, now); err != nil { return Thread{}, Message{}, err @@ -413,7 +431,7 @@ func (s *InboxStore) ListThreads(ctx context.Context, input ListInput) ([]Thread } if input.Unread { if input.Agent == "" { - return nil, fmt.Errorf("agent is required when filtering unread threads") + return nil, fmt.Errorf("%w: agent is required when filtering unread threads", ErrInvalidInput) } joins = append(joins, "JOIN messages lm ON lm.message_id = t.latest_message_id") conditions = append(conditions, "lm.to_agent = ?") @@ -477,9 +495,8 @@ func (s *InboxStore) ClaimThread(ctx context.Context, input ClaimInput) (ClaimRe if err != nil { return ClaimResult{}, err } - - if thread.Status != "pending" { - return ClaimResult{}, fmt.Errorf("thread %s is not pending", input.ThreadID) + if isTerminalStatus(thread.Status) { + return ClaimResult{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, input.ThreadID) } var activeLease string @@ -498,6 +515,9 @@ func (s *InboxStore) ClaimThread(ctx context.Context, input ClaimInput) (ClaimRe if activeLease != "" { return ClaimResult{}, ErrLeaseConflict } + if thread.Status != "pending" { + return ClaimResult{}, fmt.Errorf("%w: thread %s is not pending", ErrInvalidState, input.ThreadID) + } if _, err := tx.ExecContext( ctx, @@ -614,7 +634,7 @@ func (s *InboxStore) RenewLease(ctx context.Context, input RenewInput) (ClaimRes return ClaimResult{}, err } if isTerminalStatus(thread.Status) { - return ClaimResult{}, fmt.Errorf("thread %s is already terminal", input.ThreadID) + return ClaimResult{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, input.ThreadID) } if _, err := requireActiveLease(ctx, tx, input.ThreadID, input.Agent, now); err != nil { @@ -684,7 +704,7 @@ func (s *InboxStore) UpdateThreadStatus(ctx context.Context, input UpdateInput) messageID := newID("msg") if input.Status != "in_progress" && input.Status != "blocked" { - return Thread{}, Message{}, fmt.Errorf("unsupported update status %q", input.Status) + return Thread{}, Message{}, fmt.Errorf("%w: unsupported update status %q", ErrInvalidInput, input.Status) } tx, err := s.db.BeginTx(ctx, nil) @@ -698,7 +718,7 @@ func (s *InboxStore) UpdateThreadStatus(ctx context.Context, input UpdateInput) return Thread{}, Message{}, err } if isTerminalStatus(thread.Status) { - return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID) + return Thread{}, Message{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, input.ThreadID) } if _, err := requireActiveLease(ctx, tx, input.ThreadID, input.Agent, now); err != nil { return Thread{}, Message{}, err @@ -724,6 +744,11 @@ func (s *InboxStore) UpdateThreadStatus(ctx context.Context, input UpdateInput) if err := insertMessage(ctx, tx, message); err != nil { return Thread{}, Message{}, err } + artifacts, err := insertArtifacts(ctx, tx, message.MessageID, input.Artifacts, now) + if err != nil { + return Thread{}, Message{}, err + } + message.Artifacts = artifacts if err := updateThreadState(ctx, tx, thread.ThreadID, input.Status, thread.AssignedTo, message.MessageID, now); err != nil { return Thread{}, Message{}, err @@ -768,7 +793,7 @@ func (s *InboxStore) ReplyToThread(ctx context.Context, input ReplyInput) (Threa return Thread{}, Message{}, err } if isTerminalStatus(thread.Status) { - return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID) + return Thread{}, Message{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, input.ThreadID) } message := Message{ @@ -786,6 +811,11 @@ func (s *InboxStore) ReplyToThread(ctx context.Context, input ReplyInput) (Threa if err := insertMessage(ctx, tx, message); err != nil { return Thread{}, Message{}, err } + artifacts, err := insertArtifacts(ctx, tx, message.MessageID, input.Artifacts, now) + if err != nil { + return Thread{}, Message{}, err + } + message.Artifacts = artifacts if err := updateThreadState(ctx, tx, thread.ThreadID, thread.Status, thread.AssignedTo, message.MessageID, now); err != nil { return Thread{}, Message{}, err @@ -837,7 +867,7 @@ func (s *InboxStore) CompleteThread(ctx context.Context, input CompleteInput) (T return Thread{}, Message{}, err } if isTerminalStatus(thread.Status) { - return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID) + return Thread{}, Message{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, input.ThreadID) } if _, err := requireActiveLease(ctx, tx, input.ThreadID, input.Agent, now); err != nil { return Thread{}, Message{}, err @@ -858,6 +888,11 @@ func (s *InboxStore) CompleteThread(ctx context.Context, input CompleteInput) (T if err := insertMessage(ctx, tx, message); err != nil { return Thread{}, Message{}, err } + artifacts, err := insertArtifacts(ctx, tx, message.MessageID, input.Artifacts, now) + if err != nil { + return Thread{}, Message{}, err + } + message.Artifacts = artifacts if err := updateThreadState(ctx, tx, thread.ThreadID, nextStatus, thread.AssignedTo, message.MessageID, now); err != nil { return Thread{}, Message{}, err @@ -914,7 +949,7 @@ func (s *InboxStore) CancelThread(ctx context.Context, input CancelInput) (Threa return Thread{}, Message{}, err } if isTerminalStatus(thread.Status) { - return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID) + return Thread{}, Message{}, fmt.Errorf("%w: thread %s is already terminal", ErrInvalidState, input.ThreadID) } summary := defaultString(input.Reason, "thread cancelled") @@ -933,6 +968,11 @@ func (s *InboxStore) CancelThread(ctx context.Context, input CancelInput) (Threa if err := insertMessage(ctx, tx, message); err != nil { return Thread{}, Message{}, err } + artifacts, err := insertArtifacts(ctx, tx, message.MessageID, input.Artifacts, now) + if err != nil { + return Thread{}, Message{}, err + } + message.Artifacts = artifacts if err := updateThreadState(ctx, tx, thread.ThreadID, "cancelled", thread.AssignedTo, message.MessageID, now); err != nil { return Thread{}, Message{}, err @@ -1008,6 +1048,12 @@ func (s *InboxStore) GetThread(ctx context.Context, threadID string) (ThreadDeta return ThreadDetail{}, fmt.Errorf("iterate thread messages: %w", err) } + artifactsByMessageID, err := loadArtifactsForMessageIDs(ctx, s.db, messageIDs(messages)) + if err != nil { + return ThreadDetail{}, err + } + attachArtifacts(messages, artifactsByMessageID) + return ThreadDetail{ Thread: thread, Messages: messages, @@ -1176,6 +1222,28 @@ func scanMessage(scanner threadScanner) (Message, error) { return message, nil } +func scanArtifact(scanner threadScanner) (Artifact, error) { + var ( + artifact Artifact + metadata, created string + ) + + if err := scanner.Scan( + &artifact.ArtifactID, + &artifact.MessageID, + &artifact.Path, + &artifact.Kind, + &metadata, + &created, + ); err != nil { + return Artifact{}, fmt.Errorf("scan artifact: %w", err) + } + + artifact.MetadataJSON = json.RawMessage(metadata) + artifact.CreatedAt = parseTime(created) + return artifact, nil +} + func scanEvent(scanner threadScanner) (Event, error) { var ( event Event @@ -1290,6 +1358,44 @@ func insertMessage(ctx context.Context, tx *sql.Tx, message Message) error { return nil } +func insertArtifacts(ctx context.Context, tx *sql.Tx, messageID string, inputs []ArtifactInput, createdAt time.Time) ([]Artifact, error) { + if len(inputs) == 0 { + return nil, nil + } + + artifacts := make([]Artifact, 0, len(inputs)) + for _, input := range inputs { + artifact := Artifact{ + ArtifactID: newID("art"), + MessageID: messageID, + Path: input.Path, + Kind: defaultString(input.Kind, "file"), + MetadataJSON: json.RawMessage(normalizeJSON(input.MetadataJSON)), + CreatedAt: createdAt, + } + + _, err := tx.ExecContext( + ctx, + `INSERT INTO artifacts ( + artifact_id, message_id, path, kind, metadata_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?)`, + artifact.ArtifactID, + artifact.MessageID, + artifact.Path, + artifact.Kind, + string(artifact.MetadataJSON), + formatTime(artifact.CreatedAt), + ) + if err != nil { + return nil, fmt.Errorf("insert artifact: %w", err) + } + + artifacts = append(artifacts, artifact) + } + + return artifacts, nil +} + func updateThreadState(ctx context.Context, tx *sql.Tx, threadID, status, assignedTo, latestMessageID string, updatedAt time.Time) error { _, err := tx.ExecContext( ctx, @@ -1308,6 +1414,60 @@ func updateThreadState(ctx context.Context, tx *sql.Tx, threadID, status, assign return nil } +func loadArtifactsForMessageIDs(ctx context.Context, db *sql.DB, messageIDs []string) (map[string][]Artifact, error) { + result := make(map[string][]Artifact) + if len(messageIDs) == 0 { + return result, nil + } + + args := make([]any, 0, len(messageIDs)) + for _, messageID := range messageIDs { + args = append(args, messageID) + } + + rows, err := db.QueryContext( + ctx, + `SELECT + artifact_id, message_id, path, kind, metadata_json, created_at + FROM artifacts + WHERE message_id IN (`+placeholders(len(messageIDs))+`) + ORDER BY created_at ASC`, + args..., + ) + if err != nil { + return nil, fmt.Errorf("query artifacts: %w", err) + } + defer rows.Close() + + for rows.Next() { + artifact, err := scanArtifact(rows) + if err != nil { + return nil, err + } + result[artifact.MessageID] = append(result[artifact.MessageID], artifact) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate artifacts: %w", err) + } + + return result, nil +} + +func attachArtifacts(messages []Message, artifactsByMessageID map[string][]Artifact) { + for i := range messages { + messages[i].Artifacts = artifactsByMessageID[messages[i].MessageID] + } +} + +func messageIDs(messages []Message) []string { + ids := make([]string, 0, len(messages)) + for _, message := range messages { + ids = append(ids, message.MessageID) + } + return ids +} + func requireActiveLease(ctx context.Context, tx *sql.Tx, threadID, agent string, now time.Time) (string, error) { var ( activeAgent string @@ -1354,7 +1514,7 @@ func (s *InboxStore) lookupEventIDForMessage(ctx context.Context, threadID, mess messageID, ).Scan(&eventID) if errors.Is(err, sql.ErrNoRows) { - return 0, fmt.Errorf("message %s not found in thread %s", messageID, threadID) + return 0, fmt.Errorf("%w: message %s not found in thread %s", ErrMessageNotFound, messageID, threadID) } if err != nil { return 0, fmt.Errorf("lookup message event: %w", err) @@ -1416,6 +1576,11 @@ func (s *InboxStore) findReplyAfter(ctx context.Context, threadID string, afterE message.PayloadJSON = json.RawMessage(payload) message.CreatedAt = parseTime(created) + artifactsByMessageID, err := loadArtifactsForMessageIDs(ctx, s.db, []string{message.MessageID}) + if err != nil { + return Message{}, 0, false, err + } + message.Artifacts = artifactsByMessageID[message.MessageID] return message, eventID, true, nil } @@ -1510,6 +1675,11 @@ func (s *InboxStore) findWatchEventAfter(ctx context.Context, input WatchInput, event.CreatedAt = parseTime(eventCreatedAt) message.PayloadJSON = json.RawMessage(messagePayload) message.CreatedAt = parseTime(messageCreatedAt) + artifactsByMessageID, err := loadArtifactsForMessageIDs(ctx, s.db, []string{message.MessageID}) + if err != nil { + return Thread{}, Message{}, Event{}, false, err + } + message.Artifacts = artifactsByMessageID[message.MessageID] return thread, message, event, true, nil }