From 19279305707423c162723e8f528f8afaba765587 Mon Sep 17 00:00:00 2001 From: kurihada Date: Thu, 19 Mar 2026 03:43:10 +0800 Subject: [PATCH] Implement inbox read cursors for unread threads --- docs/implementation-roadmap.md | 2 +- docs/inbox-cli.md | 17 ++++ internal/cli/inbox/cancel.go | 3 +- internal/cli/inbox/claim.go | 3 +- internal/cli/inbox/db.go | 22 ++++++ internal/cli/inbox/done.go | 3 +- internal/cli/inbox/fetch.go | 3 +- internal/cli/inbox/init.go | 7 +- internal/cli/inbox/integration_test.go | 103 +++++++++++++++++++++++++ internal/cli/inbox/list.go | 3 +- internal/cli/inbox/renew.go | 3 +- internal/cli/inbox/reply.go | 3 +- internal/cli/inbox/send.go | 3 +- internal/cli/inbox/show.go | 12 ++- internal/cli/inbox/update.go | 3 +- internal/cli/inbox/wait_reply.go | 5 +- internal/cli/inbox/watch.go | 3 +- internal/db/schema/005_inbox_reads.sql | 12 +++ internal/store/inbox.go | 68 ++++++++++++++-- 19 files changed, 240 insertions(+), 38 deletions(-) create mode 100644 internal/cli/inbox/db.go create mode 100644 internal/db/schema/005_inbox_reads.sql diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index 55fa495..e8f92f0 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -18,7 +18,7 @@ As of now: - `inbox` and `orch` both compile - shared SQLite schema initialization exists - `inbox` is implemented end-to-end, including send/fetch/claim/renew/update/reply/done/fail/cancel/list/show/watch/wait-reply -- `inbox` supports blocking waits, lease renewal, unread fetches, `--body-file`, artifact attachments, and structured JSON errors with stable exit codes +- `inbox` supports blocking waits, lease renewal, unread fetches backed by per-agent read cursors, `--body-file`, artifact attachments, and structured JSON errors with stable exit codes - integration tests cover the main inbox lifecycle, wait/watch flows, artifact persistence, and JSON error contracts - `orch` currently exists as a command skeleton only - no scheduler workflows have been implemented yet diff --git a/docs/inbox-cli.md b/docs/inbox-cli.md index c87063d..f4c5adf 100644 --- a/docs/inbox-cli.md +++ b/docs/inbox-cli.md @@ -39,6 +39,7 @@ Those decisions belong to `orch`. - `lease`: an exclusive worker claim for a thread - `artifact`: a path or file reference attached to a message - `event`: a monotonic record used to wake blocking waiters +- `read cursor`: the last thread message an agent has explicitly consumed ## Required Fields @@ -161,6 +162,8 @@ Suggested flags: - `--limit N` - `--unread` +`--unread` should use a per-agent thread read cursor, not a latest-message heuristic alone. + ### `inbox claim` Acquire a lease on a thread. @@ -280,6 +283,7 @@ Suggested flags: - `--thread THREAD_ID` - `--json` +- `--mark-read` `show` should include per-message artifact references when present. @@ -420,6 +424,16 @@ CREATE TABLE IF NOT EXISTS artifacts ( FOREIGN KEY(message_id) REFERENCES messages(message_id) ); +CREATE TABLE IF NOT EXISTS thread_reads ( + thread_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + last_read_message_id TEXT NOT NULL, + last_read_at TEXT NOT NULL, + PRIMARY KEY(thread_id, agent_id), + FOREIGN KEY(thread_id) REFERENCES threads(thread_id), + FOREIGN KEY(last_read_message_id) REFERENCES messages(message_id) +); + CREATE TABLE IF NOT EXISTS events ( event_id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL, @@ -441,6 +455,9 @@ CREATE INDEX IF NOT EXISTS idx_messages_thread_created CREATE INDEX IF NOT EXISTS idx_events_thread_event ON events(thread_id, event_id); + +CREATE INDEX IF NOT EXISTS idx_thread_reads_agent + ON thread_reads(agent_id, last_read_at); ``` ## Concurrency Notes diff --git a/internal/cli/inbox/cancel.go b/internal/cli/inbox/cancel.go index 9b6ddef..0e3ad82 100644 --- a/internal/cli/inbox/cancel.go +++ b/internal/cli/inbox/cancel.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -38,7 +37,7 @@ func newCancelCmd(root *rootOptions) *cobra.Command { return err } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/claim.go b/internal/cli/inbox/claim.go index c882bfb..c43c87c 100644 --- a/internal/cli/inbox/claim.go +++ b/internal/cli/inbox/claim.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -34,7 +33,7 @@ func newClaimCmd(root *rootOptions) *cobra.Command { return protocol.InvalidInput("agent is required", nil) } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/db.go b/internal/cli/inbox/db.go new file mode 100644 index 0000000..a26d111 --- /dev/null +++ b/internal/cli/inbox/db.go @@ -0,0 +1,22 @@ +package inbox + +import ( + "context" + "database/sql" + + "ai-workflow-skill/internal/db" +) + +func openInboxDB(ctx context.Context, dbPath string) (*sql.DB, error) { + sqlDB, err := db.Open(ctx, dbPath) + if err != nil { + return nil, err + } + + if err := db.ApplyMigrations(ctx, sqlDB); err != nil { + _ = sqlDB.Close() + return nil, err + } + + return sqlDB, nil +} diff --git a/internal/cli/inbox/done.go b/internal/cli/inbox/done.go index b98a4b4..093aa5c 100644 --- a/internal/cli/inbox/done.go +++ b/internal/cli/inbox/done.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -54,7 +53,7 @@ func newCompleteCmd(root *rootOptions, mode string) *cobra.Command { return err } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/fetch.go b/internal/cli/inbox/fetch.go index 98e12dc..98f5cd1 100644 --- a/internal/cli/inbox/fetch.go +++ b/internal/cli/inbox/fetch.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -32,7 +31,7 @@ func newFetchCmd(root *rootOptions) *cobra.Command { agent = root.agent } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/init.go b/internal/cli/inbox/init.go index 08519fa..da5ad71 100644 --- a/internal/cli/inbox/init.go +++ b/internal/cli/inbox/init.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "github.com/spf13/cobra" @@ -16,16 +15,12 @@ func newInitCmd(opts *rootOptions) *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - sqlDB, err := db.Open(ctx, opts.dbPath) + sqlDB, err := openInboxDB(ctx, opts.dbPath) if err != nil { return err } defer sqlDB.Close() - if err := db.ApplyMigrations(ctx, sqlDB); err != nil { - return err - } - resp := protocol.Success{ OK: true, Command: "init", diff --git a/internal/cli/inbox/integration_test.go b/internal/cli/inbox/integration_test.go index 2ce6827..59eddc6 100644 --- a/internal/cli/inbox/integration_test.go +++ b/internal/cli/inbox/integration_test.go @@ -316,6 +316,7 @@ func TestInboxRenewWaitReplyAndCancel(t *testing.T) { go func() { stdout, stderr, exitCode := executeInboxCommand( "--db", dbPath, + "--agent", "worker-c", "--json", "wait-reply", "--thread", threadID, @@ -359,6 +360,18 @@ func TestInboxRenewWaitReplyAndCancel(t *testing.T) { t.Fatalf("expected answer wake message, got %q", kind) } + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--agent", "worker-c", + "--json", + "fetch", + "--status", "blocked", + "--unread", + ) + if exitCode != 10 { + t.Fatalf("expected blocked unread list to be cleared after wait-reply, got exit %d with %s", exitCode, stdout) + } + cancelOut := runInboxCommand( t, "--db", dbPath, @@ -533,6 +546,96 @@ func TestInboxWatchListUnreadAndAppend(t *testing.T) { } } +func TestInboxUnreadReadCursor(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runInboxCommand(t, "--db", dbPath, "--json", "init") + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-e", + "--subject", "Review navbar copy", + "--summary", "Check top nav wording", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + + fetchOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + + var fetchResp map[string]any + mustDecodeJSON(t, fetchOut, &fetchResp) + threads, ok := nestedValue(t, fetchResp, "data", "threads").([]any) + if !ok || len(threads) != 1 { + t.Fatalf("expected one unread pending thread, got %#v", nestedValue(t, fetchResp, "data", "threads")) + } + + runInboxCommand( + t, + "--db", dbPath, + "--agent", "worker-e", + "--json", + "show", + "--thread", threadID, + "--mark-read", + ) + + stdout, _, exitCode := executeInboxCommand( + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + if exitCode != 10 { + t.Fatalf("expected unread fetch to clear after mark-read, got exit %d with %s", exitCode, stdout) + } + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-e", + "--thread", threadID, + "--summary", "Use sentence case", + "--body", "Keep the nav labels in sentence case.", + ) + + fetchOut = runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-e", + "--status", "pending", + "--unread", + ) + + mustDecodeJSON(t, fetchOut, &fetchResp) + threads, ok = nestedValue(t, fetchResp, "data", "threads").([]any) + if !ok || len(threads) != 1 { + t.Fatalf("expected unread thread to reappear after new message, got %#v", nestedValue(t, fetchResp, "data", "threads")) + } +} + func TestInboxJSONErrorsAndExitCodes(t *testing.T) { t.Parallel() diff --git a/internal/cli/inbox/list.go b/internal/cli/inbox/list.go index ffb3ebf..081c833 100644 --- a/internal/cli/inbox/list.go +++ b/internal/cli/inbox/list.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -32,7 +31,7 @@ func newListCmd(root *rootOptions) *cobra.Command { agent = root.agent } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/renew.go b/internal/cli/inbox/renew.go index 29dca60..c0f9f69 100644 --- a/internal/cli/inbox/renew.go +++ b/internal/cli/inbox/renew.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -33,7 +32,7 @@ func newRenewCmd(root *rootOptions) *cobra.Command { return protocol.InvalidInput("agent is required", nil) } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/reply.go b/internal/cli/inbox/reply.go index e420648..f8b10c5 100644 --- a/internal/cli/inbox/reply.go +++ b/internal/cli/inbox/reply.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -48,7 +47,7 @@ func newReplyCmd(root *rootOptions) *cobra.Command { return err } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/send.go b/internal/cli/inbox/send.go index 01e07a3..e079039 100644 --- a/internal/cli/inbox/send.go +++ b/internal/cli/inbox/send.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -55,7 +54,7 @@ func newSendCmd(root *rootOptions) *cobra.Command { return err } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/show.go b/internal/cli/inbox/show.go index f3ace1d..6171cff 100644 --- a/internal/cli/inbox/show.go +++ b/internal/cli/inbox/show.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -12,6 +11,7 @@ import ( type showOptions struct { threadID string + markRead bool } func newShowCmd(root *rootOptions) *cobra.Command { @@ -23,14 +23,19 @@ func newShowCmd(root *rootOptions) *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } defer sqlDB.Close() s := store.NewInboxStore(sqlDB) - detail, err := s.GetThread(ctx, opts.threadID) + agent := root.agent + if opts.markRead && agent == "" { + return protocol.InvalidInput("agent is required when using --mark-read", nil) + } + + detail, err := s.GetThreadForAgent(ctx, opts.threadID, agent, opts.markRead) if err != nil { return err } @@ -66,6 +71,7 @@ func newShowCmd(root *rootOptions) *cobra.Command { } cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID") + cmd.Flags().BoolVar(&opts.markRead, "mark-read", false, "Advance the caller's read cursor to the latest message") _ = cmd.MarkFlagRequired("thread") return cmd diff --git a/internal/cli/inbox/update.go b/internal/cli/inbox/update.go index 54b3315..0e88fbd 100644 --- a/internal/cli/inbox/update.go +++ b/internal/cli/inbox/update.go @@ -3,7 +3,6 @@ package inbox import ( "fmt" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -47,7 +46,7 @@ func newUpdateCmd(root *rootOptions) *cobra.Command { return err } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/cli/inbox/wait_reply.go b/internal/cli/inbox/wait_reply.go index 5a564f7..33fc432 100644 --- a/internal/cli/inbox/wait_reply.go +++ b/internal/cli/inbox/wait_reply.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -28,18 +27,20 @@ func newWaitReplyCmd(root *rootOptions) *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } defer sqlDB.Close() s := store.NewInboxStore(sqlDB) + agent := root.agent result, err := s.WaitReply(ctx, store.WaitReplyInput{ ThreadID: opts.threadID, AfterMessageID: opts.afterMessageID, AfterEventID: opts.afterEventID, Kinds: parseCSV(opts.kinds), + Agent: agent, Timeout: time.Duration(opts.timeoutSeconds) * time.Second, }) if err != nil { diff --git a/internal/cli/inbox/watch.go b/internal/cli/inbox/watch.go index 8d89749..a48cb0b 100644 --- a/internal/cli/inbox/watch.go +++ b/internal/cli/inbox/watch.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "ai-workflow-skill/internal/db" "ai-workflow-skill/internal/protocol" "ai-workflow-skill/internal/store" @@ -32,7 +31,7 @@ func newWatchCmd(root *rootOptions) *cobra.Command { agent = root.agent } - sqlDB, err := db.Open(ctx, root.dbPath) + sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } diff --git a/internal/db/schema/005_inbox_reads.sql b/internal/db/schema/005_inbox_reads.sql new file mode 100644 index 0000000..d830226 --- /dev/null +++ b/internal/db/schema/005_inbox_reads.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS thread_reads ( + thread_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + last_read_message_id TEXT NOT NULL, + last_read_at TEXT NOT NULL, + PRIMARY KEY(thread_id, agent_id), + FOREIGN KEY(thread_id) REFERENCES threads(thread_id), + FOREIGN KEY(last_read_message_id) REFERENCES messages(message_id) +); + +CREATE INDEX IF NOT EXISTS idx_thread_reads_agent + ON thread_reads(agent_id, last_read_at); diff --git a/internal/store/inbox.go b/internal/store/inbox.go index 5120cd6..727cae8 100644 --- a/internal/store/inbox.go +++ b/internal/store/inbox.go @@ -191,6 +191,7 @@ type WaitReplyInput struct { AfterMessageID string AfterEventID int64 Kinds []string + Agent string Timeout time.Duration } @@ -412,7 +413,8 @@ func (s *InboxStore) ListThreads(ctx context.Context, input ListInput) ([]Thread } var ( - args []any + joinArgs []any + whereArgs []any conditions []string joins []string ) @@ -424,16 +426,16 @@ func (s *InboxStore) ListThreads(ctx context.Context, input ListInput) ([]Thread if assignedTo != "" { conditions = append(conditions, "t.assigned_to = ?") - args = append(args, assignedTo) + whereArgs = append(whereArgs, assignedTo) } if input.CreatedBy != "" { conditions = append(conditions, "t.created_by = ?") - args = append(args, input.CreatedBy) + whereArgs = append(whereArgs, input.CreatedBy) } if len(input.Statuses) > 0 { conditions = append(conditions, "t.status IN ("+placeholders(len(input.Statuses))+")") for _, status := range input.Statuses { - args = append(args, status) + whereArgs = append(whereArgs, status) } } if input.Unread { @@ -441,10 +443,13 @@ func (s *InboxStore) ListThreads(ctx context.Context, input ListInput) ([]Thread return nil, fmt.Errorf("%w: agent is required when filtering unread threads", ErrInvalidInput) } joins = append(joins, "JOIN messages lm ON lm.message_id = t.latest_message_id") + joins = append(joins, "LEFT JOIN thread_reads tr ON tr.thread_id = t.thread_id AND tr.agent_id = ?") + joinArgs = append(joinArgs, input.Agent) conditions = append(conditions, "lm.to_agent = ?") - args = append(args, input.Agent) + whereArgs = append(whereArgs, input.Agent) conditions = append(conditions, "lm.from_agent <> ?") - args = append(args, input.Agent) + whereArgs = append(whereArgs, input.Agent) + conditions = append(conditions, "(tr.last_read_message_id IS NULL OR tr.last_read_message_id <> t.latest_message_id)") } query := `SELECT @@ -458,6 +463,7 @@ func (s *InboxStore) ListThreads(ctx context.Context, input ListInput) ([]Thread query += " WHERE " + strings.Join(conditions, " AND ") } query += " ORDER BY t.updated_at DESC LIMIT ?" + args := append(joinArgs, whereArgs...) args = append(args, limit) rows, err := s.db.QueryContext(ctx, query, args...) @@ -1078,6 +1084,10 @@ func (s *InboxStore) CancelThread(ctx context.Context, input CancelInput) (Threa } func (s *InboxStore) GetThread(ctx context.Context, threadID string) (ThreadDetail, error) { + return s.GetThreadForAgent(ctx, threadID, "", false) +} + +func (s *InboxStore) GetThreadForAgent(ctx context.Context, threadID, agent string, markRead bool) (ThreadDetail, error) { thread, err := selectThread(ctx, s.db, threadID) if err != nil { return ThreadDetail{}, err @@ -1117,6 +1127,12 @@ func (s *InboxStore) GetThread(ctx context.Context, threadID string) (ThreadDeta } attachArtifacts(messages, artifactsByMessageID) + if markRead { + if err := markThreadRead(ctx, s.db, thread.ThreadID, agent, thread.LatestMessageID, nowUTC()); err != nil { + return ThreadDetail{}, err + } + } + return ThreadDetail{ Thread: thread, Messages: messages, @@ -1204,6 +1220,11 @@ func (s *InboxStore) WaitReply(ctx context.Context, input WaitReplyInput) (WaitR return WaitReplyResult{}, err } if found { + if shouldMarkMessageRead(message, input.Agent) { + if err := markThreadRead(waitCtx, s.db, input.ThreadID, input.Agent, message.MessageID, nowUTC()); err != nil { + return WaitReplyResult{}, err + } + } return WaitReplyResult{ Woke: true, NextEventID: eventID, @@ -1363,6 +1384,10 @@ type queryRower interface { QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row } +type execContexter interface { + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) +} + type eventInput struct { RunID string TaskID string @@ -1482,6 +1507,30 @@ func updateThreadState(ctx context.Context, tx *sql.Tx, threadID, status, assign return nil } +func markThreadRead(ctx context.Context, execer execContexter, threadID, agent, messageID string, readAt time.Time) error { + if agent == "" || messageID == "" { + return nil + } + + _, err := execer.ExecContext( + ctx, + `INSERT INTO thread_reads ( + thread_id, agent_id, last_read_message_id, last_read_at + ) VALUES (?, ?, ?, ?) + ON CONFLICT(thread_id, agent_id) DO UPDATE SET + last_read_message_id = excluded.last_read_message_id, + last_read_at = excluded.last_read_at`, + threadID, + agent, + messageID, + formatTime(readAt), + ) + if err != nil { + return fmt.Errorf("mark thread read: %w", err) + } + return nil +} + func loadArtifactsForMessageIDs(ctx context.Context, db *sql.DB, messageIDs []string) (map[string][]Artifact, error) { result := make(map[string][]Artifact) if len(messageIDs) == 0 { @@ -1809,6 +1858,13 @@ func isSQLiteBusyError(err error) bool { strings.Contains(message, "database table is locked") } +func shouldMarkMessageRead(message Message, agent string) bool { + if agent == "" { + return false + } + return message.ToAgent == agent && message.FromAgent != agent +} + func defaultID(value, prefix string) string { if value != "" { return value