diff --git a/internal/cli/inbox/claim.go b/internal/cli/inbox/claim.go new file mode 100644 index 0000000..e512354 --- /dev/null +++ b/internal/cli/inbox/claim.go @@ -0,0 +1,81 @@ +package inbox + +import ( + "errors" + "fmt" + + "ai-workflow-skill/internal/db" + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type claimOptions struct { + agent string + threadID string + leaseSeconds int +} + +func newClaimCmd(root *rootOptions) *cobra.Command { + opts := &claimOptions{} + + cmd := &cobra.Command{ + Use: "claim", + Short: "Acquire a lease on a pending thread", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + agent := opts.agent + if agent == "" { + agent = root.agent + } + if agent == "" { + return fmt.Errorf("agent is required") + } + + sqlDB, err := db.Open(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + s := store.NewInboxStore(sqlDB) + result, err := s.ClaimThread(ctx, store.ClaimInput{ + ThreadID: opts.threadID, + Agent: agent, + LeaseSeconds: opts.leaseSeconds, + }) + if err != nil { + if errors.Is(err, store.ErrLeaseConflict) { + return fmt.Errorf("lease conflict: %w", err) + } + return err + } + + resp := protocol.Success{ + OK: true, + Command: "claim", + Data: map[string]any{ + "thread": result.Thread, + "message": result.Message, + }, + } + + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "claimed thread %s\n", result.Thread.ThreadID) + return err + }, + } + + cmd.Flags().StringVar(&opts.agent, "agent", "", "Claiming agent") + cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID") + cmd.Flags().IntVar(&opts.leaseSeconds, "lease-seconds", 900, "Lease duration in seconds") + + _ = cmd.MarkFlagRequired("thread") + + return cmd +} diff --git a/internal/cli/inbox/fetch.go b/internal/cli/inbox/fetch.go new file mode 100644 index 0000000..c5dc157 --- /dev/null +++ b/internal/cli/inbox/fetch.go @@ -0,0 +1,92 @@ +package inbox + +import ( + "fmt" + "strings" + + "ai-workflow-skill/internal/db" + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type fetchOptions struct { + agent string + statuses string + limit int +} + +func newFetchCmd(root *rootOptions) *cobra.Command { + opts := &fetchOptions{} + + cmd := &cobra.Command{ + Use: "fetch", + Short: "List candidate threads for an agent", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + agent := opts.agent + if agent == "" { + agent = root.agent + } + + sqlDB, err := db.Open(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + s := store.NewInboxStore(sqlDB) + threads, err := s.FetchThreads(ctx, store.FetchInput{ + Agent: agent, + Statuses: parseCSV(opts.statuses), + Limit: opts.limit, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "fetch", + Data: map[string]any{ + "threads": threads, + }, + } + + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + for _, thread := range threads { + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", thread.ThreadID, thread.Status, thread.Subject); err != nil { + return err + } + } + return nil + }, + } + + cmd.Flags().StringVar(&opts.agent, "agent", "", "Assigned agent filter") + cmd.Flags().StringVar(&opts.statuses, "status", "pending", "Comma-separated status filter") + cmd.Flags().IntVar(&opts.limit, "limit", 20, "Maximum number of threads") + + return cmd +} + +func parseCSV(value string) []string { + if strings.TrimSpace(value) == "" { + return nil + } + + raw := strings.Split(value, ",") + out := make([]string, 0, len(raw)) + for _, entry := range raw { + entry = strings.TrimSpace(entry) + if entry != "" { + out = append(out, entry) + } + } + return out +} diff --git a/internal/cli/inbox/integration_test.go b/internal/cli/inbox/integration_test.go new file mode 100644 index 0000000..8f4eff4 --- /dev/null +++ b/internal/cli/inbox/integration_test.go @@ -0,0 +1,150 @@ +package inbox + +import ( + "bytes" + "encoding/json" + "path/filepath" + "testing" +) + +func TestInboxLifecycle(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + initOut := runInboxCommand(t, "--db", dbPath, "--json", "init") + var initResp map[string]any + mustDecodeJSON(t, initOut, &initResp) + if initResp["ok"] != true { + t.Fatalf("expected init ok=true, got %#v", initResp) + } + + sendOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "send", + "--from", "leader", + "--to", "worker-a", + "--subject", "Implement feature X", + "--summary", "Add retry policy", + "--body", "Implement retry handling for the HTTP client.", + "--run", "run_blog_001", + "--task", "T1", + ) + + var sendResp map[string]any + mustDecodeJSON(t, sendOut, &sendResp) + threadID := nestedString(t, sendResp, "data", "thread", "thread_id") + threadStatus := nestedString(t, sendResp, "data", "thread", "status") + if threadStatus != "pending" { + t.Fatalf("expected pending thread, got %q", threadStatus) + } + + fetchOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "fetch", + "--agent", "worker-a", + "--status", "pending", + ) + + var fetchResp map[string]any + mustDecodeJSON(t, fetchOut, &fetchResp) + threadsValue := nestedValue(t, fetchResp, "data", "threads") + threads, ok := threadsValue.([]any) + if !ok || len(threads) != 1 { + t.Fatalf("expected one fetched thread, got %#v", threadsValue) + } + + claimOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", "worker-a", + "--thread", threadID, + "--lease-seconds", "300", + ) + + var claimResp map[string]any + mustDecodeJSON(t, claimOut, &claimResp) + claimedStatus := nestedString(t, claimResp, "data", "thread", "status") + if claimedStatus != "claimed" { + t.Fatalf("expected claimed thread, got %q", claimedStatus) + } + + showOut := runInboxCommand( + t, + "--db", dbPath, + "--json", + "show", + "--thread", threadID, + ) + + var showResp map[string]any + mustDecodeJSON(t, showOut, &showResp) + showStatus := nestedString(t, showResp, "data", "thread", "status") + if showStatus != "claimed" { + t.Fatalf("expected show status claimed, got %q", showStatus) + } + messagesValue := nestedValue(t, showResp, "data", "messages") + messages, ok := messagesValue.([]any) + if !ok || len(messages) != 2 { + t.Fatalf("expected two messages in thread history, got %#v", messagesValue) + } +} + +func runInboxCommand(t *testing.T, args ...string) string { + t.Helper() + + cmd := NewRootCmd() + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd.SetOut(&stdout) + cmd.SetErr(&stderr) + cmd.SetArgs(args) + + if err := cmd.Execute(); err != nil { + t.Fatalf("execute inbox command %v: %v\nstderr:\n%s", args, err, stderr.String()) + } + + return stdout.String() +} + +func mustDecodeJSON(t *testing.T, raw string, target any) { + t.Helper() + + if err := json.Unmarshal([]byte(raw), target); err != nil { + t.Fatalf("decode json %q: %v", raw, err) + } +} + +func nestedString(t *testing.T, value map[string]any, keys ...string) string { + t.Helper() + + current := nestedValue(t, value, keys...) + str, ok := current.(string) + if !ok { + t.Fatalf("expected string at %v, got %#v", keys, current) + } + return str +} + +func nestedValue(t *testing.T, value map[string]any, keys ...string) any { + t.Helper() + + var current any = value + for _, key := range keys { + obj, ok := current.(map[string]any) + if !ok { + t.Fatalf("expected object at %q in %v, got %#v", key, keys, current) + } + current, ok = obj[key] + if !ok { + t.Fatalf("missing key %q in %v", key, keys) + } + } + return current +} diff --git a/internal/cli/inbox/root.go b/internal/cli/inbox/root.go index a04de5a..1cb2918 100644 --- a/internal/cli/inbox/root.go +++ b/internal/cli/inbox/root.go @@ -23,6 +23,10 @@ func NewRootCmd() *cobra.Command { cmd.PersistentFlags().StringVar(&opts.agent, "agent", "", "Agent identity") cmd.AddCommand(newInitCmd(opts)) + cmd.AddCommand(newSendCmd(opts)) + cmd.AddCommand(newFetchCmd(opts)) + cmd.AddCommand(newClaimCmd(opts)) + cmd.AddCommand(newShowCmd(opts)) return cmd } diff --git a/internal/cli/inbox/send.go b/internal/cli/inbox/send.go new file mode 100644 index 0000000..1371b5d --- /dev/null +++ b/internal/cli/inbox/send.go @@ -0,0 +1,95 @@ +package inbox + +import ( + "fmt" + + "ai-workflow-skill/internal/db" + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type sendOptions struct { + from string + to string + threadID string + runID string + taskID string + subject string + kind string + summary string + body string + payloadJSON string + priority string +} + +func newSendCmd(root *rootOptions) *cobra.Command { + opts := &sendOptions{} + + cmd := &cobra.Command{ + Use: "send", + Short: "Create a thread with an initial directed message", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := db.Open(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + s := store.NewInboxStore(sqlDB) + thread, message, err := s.Send(ctx, store.SendInput{ + ThreadID: opts.threadID, + RunID: opts.runID, + TaskID: opts.taskID, + Subject: opts.subject, + FromAgent: opts.from, + ToAgent: opts.to, + Kind: opts.kind, + Summary: opts.summary, + Body: opts.body, + PayloadJSON: opts.payloadJSON, + Priority: opts.priority, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "send", + Data: map[string]any{ + "thread": thread, + "message": message, + }, + } + + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "created thread %s\n", thread.ThreadID) + return err + }, + } + + cmd.Flags().StringVar(&opts.from, "from", "", "Sending agent") + cmd.Flags().StringVar(&opts.to, "to", "", "Receiving agent") + cmd.Flags().StringVar(&opts.threadID, "thread", "", "Optional thread ID override") + cmd.Flags().StringVar(&opts.runID, "run", "", "Optional run ID override") + cmd.Flags().StringVar(&opts.taskID, "task", "", "Optional task ID override") + cmd.Flags().StringVar(&opts.subject, "subject", "", "Thread subject") + cmd.Flags().StringVar(&opts.kind, "kind", "task", "Initial message kind") + cmd.Flags().StringVar(&opts.summary, "summary", "", "Short message summary") + cmd.Flags().StringVar(&opts.body, "body", "", "Message body") + cmd.Flags().StringVar(&opts.payloadJSON, "payload-json", "", "Structured payload JSON string") + cmd.Flags().StringVar(&opts.priority, "priority", "normal", "Thread priority") + + _ = cmd.MarkFlagRequired("from") + _ = cmd.MarkFlagRequired("to") + _ = cmd.MarkFlagRequired("subject") + + return cmd +} diff --git a/internal/cli/inbox/show.go b/internal/cli/inbox/show.go new file mode 100644 index 0000000..952a981 --- /dev/null +++ b/internal/cli/inbox/show.go @@ -0,0 +1,67 @@ +package inbox + +import ( + "fmt" + + "ai-workflow-skill/internal/db" + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type showOptions struct { + threadID string +} + +func newShowCmd(root *rootOptions) *cobra.Command { + opts := &showOptions{} + + cmd := &cobra.Command{ + Use: "show", + Short: "Show one thread with message history", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := db.Open(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + s := store.NewInboxStore(sqlDB) + detail, err := s.GetThread(ctx, opts.threadID) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "show", + Data: map[string]any{ + "thread": detail.Thread, + "messages": detail.Messages, + }, + } + + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", detail.Thread.ThreadID, detail.Thread.Status, detail.Thread.Subject); err != nil { + return err + } + for _, message := range detail.Messages { + if _, err := fmt.Fprintf(cmd.OutOrStdout(), "- %s\t%s\t%s\n", message.MessageID, message.Kind, message.Summary); err != nil { + return err + } + } + return nil + }, + } + + cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID") + _ = cmd.MarkFlagRequired("thread") + + return cmd +} diff --git a/internal/store/inbox.go b/internal/store/inbox.go new file mode 100644 index 0000000..ffcae45 --- /dev/null +++ b/internal/store/inbox.go @@ -0,0 +1,596 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/google/uuid" +) + +var ErrLeaseConflict = errors.New("thread already claimed by another worker") + +type InboxStore struct { + db *sql.DB +} + +type Thread struct { + ThreadID string `json:"thread_id"` + RunID string `json:"run_id"` + TaskID string `json:"task_id"` + Subject string `json:"subject"` + CreatedBy string `json:"created_by"` + AssignedTo string `json:"assigned_to"` + Status string `json:"status"` + Priority string `json:"priority"` + LatestMessageID string `json:"latest_message_id,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type Message struct { + MessageID string `json:"message_id"` + ThreadID string `json:"thread_id"` + FromAgent string `json:"from_agent"` + ToAgent string `json:"to_agent"` + Kind string `json:"kind"` + Summary string `json:"summary"` + Body string `json:"body"` + PayloadJSON json.RawMessage `json:"payload_json"` + CreatedAt time.Time `json:"created_at"` +} + +type ThreadDetail struct { + Thread Thread `json:"thread"` + Messages []Message `json:"messages"` +} + +type SendInput struct { + ThreadID string + RunID string + TaskID string + Subject string + FromAgent string + ToAgent string + Kind string + Summary string + Body string + PayloadJSON string + Priority string +} + +type FetchInput struct { + Agent string + Statuses []string + Limit int +} + +type ClaimInput struct { + ThreadID string + Agent string + LeaseSeconds int +} + +type ClaimResult struct { + Thread Thread `json:"thread"` + Message Message `json:"message"` +} + +func NewInboxStore(db *sql.DB) *InboxStore { + return &InboxStore{db: db} +} + +func (s *InboxStore) Send(ctx context.Context, input SendInput) (Thread, Message, error) { + now := nowUTC() + + threadID := defaultID(input.ThreadID, "thr") + runID := defaultID(input.RunID, "run") + taskID := defaultID(input.TaskID, "task") + kind := defaultString(input.Kind, "task") + priority := defaultString(input.Priority, "normal") + summary := defaultString(input.Summary, input.Subject) + payload := normalizeJSON(input.PayloadJSON) + messageID := newID("msg") + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return Thread{}, Message{}, fmt.Errorf("begin send transaction: %w", err) + } + defer tx.Rollback() + + thread := Thread{ + ThreadID: threadID, + RunID: runID, + TaskID: taskID, + Subject: input.Subject, + CreatedBy: input.FromAgent, + AssignedTo: input.ToAgent, + Status: "pending", + Priority: priority, + LatestMessageID: messageID, + CreatedAt: now, + UpdatedAt: now, + } + + if _, err := tx.ExecContext( + ctx, + `INSERT INTO threads ( + thread_id, run_id, task_id, subject, created_by, assigned_to, status, + priority, latest_message_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + thread.ThreadID, + thread.RunID, + thread.TaskID, + thread.Subject, + thread.CreatedBy, + thread.AssignedTo, + thread.Status, + thread.Priority, + thread.LatestMessageID, + formatTime(thread.CreatedAt), + formatTime(thread.UpdatedAt), + ); err != nil { + return Thread{}, Message{}, fmt.Errorf("insert thread: %w", err) + } + + message := Message{ + MessageID: messageID, + ThreadID: threadID, + FromAgent: input.FromAgent, + ToAgent: input.ToAgent, + Kind: kind, + Summary: summary, + Body: input.Body, + PayloadJSON: json.RawMessage(payload), + CreatedAt: now, + } + + if _, err := tx.ExecContext( + ctx, + `INSERT INTO messages ( + message_id, thread_id, from_agent, to_agent, kind, summary, body, + payload_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + message.MessageID, + message.ThreadID, + message.FromAgent, + message.ToAgent, + message.Kind, + message.Summary, + message.Body, + string(message.PayloadJSON), + formatTime(message.CreatedAt), + ); err != nil { + return Thread{}, Message{}, fmt.Errorf("insert message: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: thread.RunID, + TaskID: thread.TaskID, + ThreadID: thread.ThreadID, + Source: "inbox", + EventType: "thread_created", + MessageID: message.MessageID, + Summary: summary, + PayloadJSON: payload, + CreatedAt: now, + }); err != nil { + return Thread{}, Message{}, err + } + + if err := tx.Commit(); err != nil { + return Thread{}, Message{}, fmt.Errorf("commit send transaction: %w", err) + } + + return thread, message, nil +} + +func (s *InboxStore) FetchThreads(ctx context.Context, input FetchInput) ([]Thread, error) { + statuses := input.Statuses + if len(statuses) == 0 { + statuses = []string{"pending"} + } + + limit := input.Limit + if limit <= 0 { + limit = 20 + } + + var args []any + var conditions []string + + if input.Agent != "" { + conditions = append(conditions, "assigned_to = ?") + args = append(args, input.Agent) + } + + conditions = append(conditions, "status IN ("+placeholders(len(statuses))+")") + for _, status := range statuses { + args = append(args, status) + } + args = append(args, limit) + + query := `SELECT + thread_id, run_id, task_id, subject, created_by, assigned_to, status, + priority, latest_message_id, created_at, updated_at + FROM threads` + if len(conditions) > 0 { + query += " WHERE " + strings.Join(conditions, " AND ") + } + query += " ORDER BY updated_at DESC LIMIT ?" + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("fetch threads: %w", err) + } + defer rows.Close() + + var threads []Thread + for rows.Next() { + thread, err := scanThread(rows) + if err != nil { + return nil, err + } + threads = append(threads, thread) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate threads: %w", err) + } + + return threads, nil +} + +func (s *InboxStore) ClaimThread(ctx context.Context, input ClaimInput) (ClaimResult, error) { + if input.LeaseSeconds <= 0 { + input.LeaseSeconds = 900 + } + + now := nowUTC() + expiresAt := now.Add(time.Duration(input.LeaseSeconds) * time.Second) + leaseToken := newID("lease") + messageID := newID("msg") + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return ClaimResult{}, fmt.Errorf("begin claim transaction: %w", err) + } + defer tx.Rollback() + + thread, err := selectThreadForUpdate(ctx, tx, input.ThreadID) + if err != nil { + return ClaimResult{}, err + } + + if thread.Status != "pending" { + return ClaimResult{}, fmt.Errorf("thread %s is not pending", input.ThreadID) + } + + var activeLease string + err = tx.QueryRowContext( + ctx, + `SELECT agent_id FROM leases + WHERE thread_id = ? + AND released_at IS NULL + AND expires_at > ?`, + input.ThreadID, + formatTime(now), + ).Scan(&activeLease) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return ClaimResult{}, fmt.Errorf("check active lease: %w", err) + } + if activeLease != "" { + return ClaimResult{}, ErrLeaseConflict + } + + if _, err := tx.ExecContext( + ctx, + `INSERT INTO leases ( + thread_id, agent_id, lease_token, claimed_at, expires_at, released_at + ) VALUES (?, ?, ?, ?, ?, NULL) + ON CONFLICT(thread_id) DO UPDATE SET + agent_id = excluded.agent_id, + lease_token = excluded.lease_token, + claimed_at = excluded.claimed_at, + expires_at = excluded.expires_at, + released_at = NULL`, + input.ThreadID, + input.Agent, + leaseToken, + formatTime(now), + formatTime(expiresAt), + ); err != nil { + return ClaimResult{}, fmt.Errorf("upsert lease: %w", err) + } + + if _, err := tx.ExecContext( + ctx, + `UPDATE threads + SET status = ?, assigned_to = ?, latest_message_id = ?, updated_at = ? + WHERE thread_id = ?`, + "claimed", + input.Agent, + messageID, + formatTime(now), + input.ThreadID, + ); err != nil { + return ClaimResult{}, fmt.Errorf("update thread claim status: %w", err) + } + + message := Message{ + MessageID: messageID, + ThreadID: input.ThreadID, + FromAgent: input.Agent, + ToAgent: input.Agent, + Kind: "event", + Summary: "thread claimed", + Body: "", + PayloadJSON: json.RawMessage(fmt.Sprintf(`{"lease_seconds":%d,"lease_token":"%s"}`, input.LeaseSeconds, leaseToken)), + CreatedAt: now, + } + + if _, err := tx.ExecContext( + ctx, + `INSERT INTO messages ( + message_id, thread_id, from_agent, to_agent, kind, summary, body, + payload_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + message.MessageID, + message.ThreadID, + message.FromAgent, + message.ToAgent, + message.Kind, + message.Summary, + message.Body, + string(message.PayloadJSON), + formatTime(message.CreatedAt), + ); err != nil { + return ClaimResult{}, fmt.Errorf("insert claim event message: %w", err) + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: thread.RunID, + TaskID: thread.TaskID, + ThreadID: thread.ThreadID, + Source: "inbox", + EventType: "thread_claimed", + MessageID: message.MessageID, + Summary: message.Summary, + PayloadJSON: string(message.PayloadJSON), + CreatedAt: now, + }); err != nil { + return ClaimResult{}, err + } + + if err := tx.Commit(); err != nil { + return ClaimResult{}, fmt.Errorf("commit claim transaction: %w", err) + } + + thread.Status = "claimed" + thread.AssignedTo = input.Agent + thread.LatestMessageID = messageID + thread.UpdatedAt = now + + return ClaimResult{ + Thread: thread, + Message: message, + }, nil +} + +func (s *InboxStore) GetThread(ctx context.Context, threadID string) (ThreadDetail, error) { + thread, err := selectThread(ctx, s.db, threadID) + if err != nil { + return ThreadDetail{}, err + } + + rows, err := s.db.QueryContext( + ctx, + `SELECT + message_id, thread_id, from_agent, to_agent, kind, summary, body, + payload_json, created_at + FROM messages + WHERE thread_id = ? + ORDER BY created_at ASC`, + threadID, + ) + if err != nil { + return ThreadDetail{}, fmt.Errorf("query thread messages: %w", err) + } + defer rows.Close() + + var messages []Message + for rows.Next() { + message, err := scanMessage(rows) + if err != nil { + return ThreadDetail{}, err + } + messages = append(messages, message) + } + + if err := rows.Err(); err != nil { + return ThreadDetail{}, fmt.Errorf("iterate thread messages: %w", err) + } + + return ThreadDetail{ + Thread: thread, + Messages: messages, + }, nil +} + +type threadScanner interface { + Scan(dest ...any) error +} + +func scanThread(scanner threadScanner) (Thread, error) { + var ( + thread Thread + createdAt, updatedAt string + latestMessageID sql.NullString + ) + + if err := scanner.Scan( + &thread.ThreadID, + &thread.RunID, + &thread.TaskID, + &thread.Subject, + &thread.CreatedBy, + &thread.AssignedTo, + &thread.Status, + &thread.Priority, + &latestMessageID, + &createdAt, + &updatedAt, + ); err != nil { + return Thread{}, fmt.Errorf("scan thread: %w", err) + } + + thread.CreatedAt = parseTime(createdAt) + thread.UpdatedAt = parseTime(updatedAt) + if latestMessageID.Valid { + thread.LatestMessageID = latestMessageID.String + } + + return thread, nil +} + +func scanMessage(scanner threadScanner) (Message, error) { + var ( + message Message + payload, createdAt string + ) + + if err := scanner.Scan( + &message.MessageID, + &message.ThreadID, + &message.FromAgent, + &message.ToAgent, + &message.Kind, + &message.Summary, + &message.Body, + &payload, + &createdAt, + ); err != nil { + return Message{}, fmt.Errorf("scan message: %w", err) + } + + message.PayloadJSON = json.RawMessage(payload) + message.CreatedAt = parseTime(createdAt) + return message, nil +} + +func selectThread(ctx context.Context, db queryRower, threadID string) (Thread, error) { + row := db.QueryRowContext( + ctx, + `SELECT + thread_id, run_id, task_id, subject, created_by, assigned_to, status, + priority, latest_message_id, created_at, updated_at + FROM threads + WHERE thread_id = ?`, + threadID, + ) + + thread, err := scanThread(row) + if errors.Is(err, sql.ErrNoRows) { + return Thread{}, fmt.Errorf("thread %s not found", threadID) + } + return thread, err +} + +func selectThreadForUpdate(ctx context.Context, tx *sql.Tx, threadID string) (Thread, error) { + return selectThread(ctx, tx, threadID) +} + +type queryRower interface { + QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row +} + +type eventInput struct { + RunID string + TaskID string + ThreadID string + Source string + EventType string + MessageID string + Summary string + PayloadJSON string + CreatedAt time.Time +} + +func insertEvent(ctx context.Context, tx *sql.Tx, input eventInput) error { + _, err := tx.ExecContext( + ctx, + `INSERT INTO events ( + run_id, task_id, thread_id, source, event_type, message_id, summary, + payload_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + input.RunID, + input.TaskID, + input.ThreadID, + input.Source, + input.EventType, + input.MessageID, + input.Summary, + normalizeJSON(input.PayloadJSON), + formatTime(input.CreatedAt), + ) + if err != nil { + return fmt.Errorf("insert event: %w", err) + } + return nil +} + +func defaultID(value, prefix string) string { + if value != "" { + return value + } + return newID(prefix) +} + +func newID(prefix string) string { + return prefix + "_" + strings.ReplaceAll(uuid.NewString(), "-", "") +} + +func defaultString(value, fallback string) string { + if value != "" { + return value + } + return fallback +} + +func normalizeJSON(value string) string { + if strings.TrimSpace(value) == "" { + return "{}" + } + return value +} + +func placeholders(n int) string { + if n <= 0 { + return "" + } + parts := make([]string, n) + for i := range parts { + parts[i] = "?" + } + return strings.Join(parts, ",") +} + +func nowUTC() time.Time { + return time.Now().UTC() +} + +func formatTime(t time.Time) string { + return t.UTC().Format(time.RFC3339Nano) +} + +func parseTime(value string) time.Time { + parsed, err := time.Parse(time.RFC3339Nano, value) + if err != nil { + return time.Time{} + } + return parsed +}