Complete inbox CLI implementation

This commit is contained in:
2026-03-19 03:15:17 +08:00
parent 11bee52ff4
commit c3314cd9cf
15 changed files with 1524 additions and 43 deletions
+730 -18
View File
@@ -13,6 +13,8 @@ import (
)
var ErrLeaseConflict = errors.New("thread already claimed by another worker")
var ErrThreadNotFound = errors.New("thread not found")
var ErrNoActiveLease = errors.New("no active lease")
type InboxStore struct {
db *sql.DB
@@ -49,6 +51,19 @@ type ThreadDetail struct {
Messages []Message `json:"messages"`
}
type Event struct {
EventID int64 `json:"event_id"`
RunID string `json:"run_id"`
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
Source string `json:"source"`
EventType string `json:"event_type"`
MessageID string `json:"message_id,omitempty"`
Summary string `json:"summary"`
PayloadJSON json.RawMessage `json:"payload_json"`
CreatedAt time.Time `json:"created_at"`
}
type SendInput struct {
ThreadID string
RunID string
@@ -67,6 +82,7 @@ type FetchInput struct {
Agent string
Statuses []string
Limit int
Unread bool
}
type ClaimInput struct {
@@ -75,6 +91,12 @@ type ClaimInput struct {
LeaseSeconds int
}
type RenewInput struct {
ThreadID string
Agent string
LeaseSeconds int
}
type ClaimResult struct {
Thread Thread `json:"thread"`
Message Message `json:"message"`
@@ -108,11 +130,70 @@ type CompleteInput struct {
Failed bool
}
type CancelInput struct {
ThreadID string
Agent string
Reason string
}
type ListInput struct {
Agent string
Statuses []string
CreatedBy string
AssignedTo string
Limit int
Unread bool
}
type WatchInput struct {
Agent string
Statuses []string
AfterEventID int64
StartFromNow bool
Timeout time.Duration
}
type WatchResult struct {
Woke bool `json:"woke"`
NextEventID int64 `json:"next_event_id"`
Thread *Thread `json:"thread,omitempty"`
Message *Message `json:"message,omitempty"`
Event *Event `json:"event,omitempty"`
}
type WaitReplyInput struct {
ThreadID string
AfterMessageID string
AfterEventID int64
Kinds []string
Timeout time.Duration
}
type WaitReplyResult struct {
Woke bool `json:"woke"`
NextEventID int64 `json:"next_event_id"`
Message *Message `json:"message,omitempty"`
}
func NewInboxStore(db *sql.DB) *InboxStore {
return &InboxStore{db: db}
}
func (s *InboxStore) Send(ctx context.Context, input SendInput) (Thread, Message, error) {
if input.ThreadID != "" {
thread, err := selectThread(ctx, s.db, input.ThreadID)
if err == nil {
return s.appendThreadMessage(ctx, thread, input)
}
if !errors.Is(err, ErrThreadNotFound) {
return Thread{}, Message{}, err
}
}
return s.createThread(ctx, input)
}
func (s *InboxStore) createThread(ctx context.Context, input SendInput) (Thread, Message, error) {
now := nowUTC()
threadID := defaultID(input.ThreadID, "thr")
@@ -217,43 +298,146 @@ func (s *InboxStore) Send(ctx context.Context, input SendInput) (Thread, Message
return thread, message, nil
}
func (s *InboxStore) appendThreadMessage(ctx context.Context, existing Thread, input SendInput) (Thread, Message, error) {
now := nowUTC()
messageID := newID("msg")
payload := normalizeJSON(input.PayloadJSON)
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return Thread{}, Message{}, fmt.Errorf("begin append transaction: %w", err)
}
defer tx.Rollback()
thread, err := selectThreadForUpdate(ctx, tx, existing.ThreadID)
if err != nil {
return Thread{}, Message{}, err
}
if isTerminalStatus(thread.Status) {
return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", thread.ThreadID)
}
assignedTo := thread.AssignedTo
if input.ToAgent != "" {
assignedTo = input.ToAgent
}
message := Message{
MessageID: messageID,
ThreadID: thread.ThreadID,
FromAgent: input.FromAgent,
ToAgent: defaultString(input.ToAgent, thread.AssignedTo),
Kind: defaultString(input.Kind, "task"),
Summary: defaultString(input.Summary, thread.Subject),
Body: input.Body,
PayloadJSON: json.RawMessage(payload),
CreatedAt: now,
}
if err := insertMessage(ctx, tx, message); err != nil {
return Thread{}, Message{}, err
}
if err := updateThreadState(ctx, tx, thread.ThreadID, thread.Status, assignedTo, message.MessageID, now); err != nil {
return Thread{}, Message{}, err
}
if err := insertEvent(ctx, tx, eventInput{
RunID: thread.RunID,
TaskID: thread.TaskID,
ThreadID: thread.ThreadID,
Source: "inbox",
EventType: "thread_message_sent",
MessageID: message.MessageID,
Summary: message.Summary,
PayloadJSON: payload,
CreatedAt: now,
}); err != nil {
return Thread{}, Message{}, err
}
if err := tx.Commit(); err != nil {
return Thread{}, Message{}, fmt.Errorf("commit append transaction: %w", err)
}
thread.AssignedTo = assignedTo
thread.LatestMessageID = message.MessageID
thread.UpdatedAt = now
return thread, message, nil
}
func (s *InboxStore) FetchThreads(ctx context.Context, input FetchInput) ([]Thread, error) {
statuses := input.Statuses
if len(statuses) == 0 {
statuses = []string{"pending"}
}
return s.ListThreads(ctx, ListInput{
Agent: input.Agent,
Statuses: statuses,
Limit: input.Limit,
Unread: input.Unread,
})
}
func (s *InboxStore) ListThreads(ctx context.Context, input ListInput) ([]Thread, error) {
limit := input.Limit
if limit <= 0 {
limit = 20
}
var args []any
var conditions []string
var (
args []any
conditions []string
joins []string
)
if input.Agent != "" {
conditions = append(conditions, "assigned_to = ?")
assignedTo := input.AssignedTo
if assignedTo == "" {
assignedTo = input.Agent
}
if assignedTo != "" {
conditions = append(conditions, "t.assigned_to = ?")
args = append(args, assignedTo)
}
if input.CreatedBy != "" {
conditions = append(conditions, "t.created_by = ?")
args = append(args, input.CreatedBy)
}
if len(input.Statuses) > 0 {
conditions = append(conditions, "t.status IN ("+placeholders(len(input.Statuses))+")")
for _, status := range input.Statuses {
args = append(args, status)
}
}
if input.Unread {
if input.Agent == "" {
return nil, fmt.Errorf("agent is required when filtering unread threads")
}
joins = append(joins, "JOIN messages lm ON lm.message_id = t.latest_message_id")
conditions = append(conditions, "lm.to_agent = ?")
args = append(args, input.Agent)
conditions = append(conditions, "lm.from_agent <> ?")
args = append(args, input.Agent)
}
conditions = append(conditions, "status IN ("+placeholders(len(statuses))+")")
for _, status := range statuses {
args = append(args, status)
}
args = append(args, limit)
query := `SELECT
thread_id, run_id, task_id, subject, created_by, assigned_to, status,
priority, latest_message_id, created_at, updated_at
FROM threads`
t.thread_id, t.run_id, t.task_id, t.subject, t.created_by, t.assigned_to, t.status,
t.priority, t.latest_message_id, t.created_at, t.updated_at
FROM threads t`
if len(joins) > 0 {
query += " " + strings.Join(joins, " ")
}
if len(conditions) > 0 {
query += " WHERE " + strings.Join(conditions, " AND ")
}
query += " ORDER BY updated_at DESC LIMIT ?"
query += " ORDER BY t.updated_at DESC LIMIT ?"
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("fetch threads: %w", err)
return nil, fmt.Errorf("list threads: %w", err)
}
defer rows.Close()
@@ -409,6 +593,92 @@ func (s *InboxStore) ClaimThread(ctx context.Context, input ClaimInput) (ClaimRe
}, nil
}
func (s *InboxStore) RenewLease(ctx context.Context, input RenewInput) (ClaimResult, error) {
if input.LeaseSeconds <= 0 {
input.LeaseSeconds = 900
}
now := nowUTC()
expiresAt := now.Add(time.Duration(input.LeaseSeconds) * time.Second)
leaseToken := newID("lease")
messageID := newID("msg")
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return ClaimResult{}, fmt.Errorf("begin renew transaction: %w", err)
}
defer tx.Rollback()
thread, err := selectThreadForUpdate(ctx, tx, input.ThreadID)
if err != nil {
return ClaimResult{}, err
}
if isTerminalStatus(thread.Status) {
return ClaimResult{}, fmt.Errorf("thread %s is already terminal", input.ThreadID)
}
if _, err := requireActiveLease(ctx, tx, input.ThreadID, input.Agent, now); err != nil {
return ClaimResult{}, err
}
if _, err := tx.ExecContext(
ctx,
`UPDATE leases
SET lease_token = ?, expires_at = ?, released_at = NULL
WHERE thread_id = ?`,
leaseToken,
formatTime(expiresAt),
input.ThreadID,
); err != nil {
return ClaimResult{}, fmt.Errorf("renew lease: %w", err)
}
message := Message{
MessageID: messageID,
ThreadID: input.ThreadID,
FromAgent: input.Agent,
ToAgent: input.Agent,
Kind: "event",
Summary: "lease renewed",
Body: "",
PayloadJSON: json.RawMessage(fmt.Sprintf(`{"lease_seconds":%d,"lease_token":"%s"}`, input.LeaseSeconds, leaseToken)),
CreatedAt: now,
}
if err := insertMessage(ctx, tx, message); err != nil {
return ClaimResult{}, err
}
if err := updateThreadState(ctx, tx, thread.ThreadID, thread.Status, thread.AssignedTo, message.MessageID, now); err != nil {
return ClaimResult{}, err
}
if err := insertEvent(ctx, tx, eventInput{
RunID: thread.RunID,
TaskID: thread.TaskID,
ThreadID: thread.ThreadID,
Source: "inbox",
EventType: "thread_renewed",
MessageID: message.MessageID,
Summary: message.Summary,
PayloadJSON: string(message.PayloadJSON),
CreatedAt: now,
}); err != nil {
return ClaimResult{}, err
}
if err := tx.Commit(); err != nil {
return ClaimResult{}, fmt.Errorf("commit renew transaction: %w", err)
}
thread.LatestMessageID = message.MessageID
thread.UpdatedAt = now
return ClaimResult{
Thread: thread,
Message: message,
}, nil
}
func (s *InboxStore) UpdateThreadStatus(ctx context.Context, input UpdateInput) (Thread, Message, error) {
now := nowUTC()
messageID := newID("msg")
@@ -427,10 +697,12 @@ func (s *InboxStore) UpdateThreadStatus(ctx context.Context, input UpdateInput)
if err != nil {
return Thread{}, Message{}, err
}
if thread.Status == "done" || thread.Status == "failed" || thread.Status == "cancelled" {
if isTerminalStatus(thread.Status) {
return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID)
}
if _, err := requireActiveLease(ctx, tx, input.ThreadID, input.Agent, now); err != nil {
return Thread{}, Message{}, err
}
kind := "progress"
if input.Status == "blocked" {
@@ -495,6 +767,9 @@ func (s *InboxStore) ReplyToThread(ctx context.Context, input ReplyInput) (Threa
if err != nil {
return Thread{}, Message{}, err
}
if isTerminalStatus(thread.Status) {
return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID)
}
message := Message{
MessageID: messageID,
@@ -561,6 +836,12 @@ func (s *InboxStore) CompleteThread(ctx context.Context, input CompleteInput) (T
if err != nil {
return Thread{}, Message{}, err
}
if isTerminalStatus(thread.Status) {
return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID)
}
if _, err := requireActiveLease(ctx, tx, input.ThreadID, input.Agent, now); err != nil {
return Thread{}, Message{}, err
}
message := Message{
MessageID: messageID,
@@ -618,6 +899,81 @@ func (s *InboxStore) CompleteThread(ctx context.Context, input CompleteInput) (T
return thread, message, nil
}
func (s *InboxStore) CancelThread(ctx context.Context, input CancelInput) (Thread, Message, error) {
now := nowUTC()
messageID := newID("msg")
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return Thread{}, Message{}, fmt.Errorf("begin cancel transaction: %w", err)
}
defer tx.Rollback()
thread, err := selectThreadForUpdate(ctx, tx, input.ThreadID)
if err != nil {
return Thread{}, Message{}, err
}
if isTerminalStatus(thread.Status) {
return Thread{}, Message{}, fmt.Errorf("thread %s is already terminal", input.ThreadID)
}
summary := defaultString(input.Reason, "thread cancelled")
message := Message{
MessageID: messageID,
ThreadID: thread.ThreadID,
FromAgent: input.Agent,
ToAgent: thread.AssignedTo,
Kind: "control",
Summary: summary,
Body: input.Reason,
PayloadJSON: json.RawMessage(`{}`),
CreatedAt: now,
}
if err := insertMessage(ctx, tx, message); err != nil {
return Thread{}, Message{}, err
}
if err := updateThreadState(ctx, tx, thread.ThreadID, "cancelled", thread.AssignedTo, message.MessageID, now); err != nil {
return Thread{}, Message{}, err
}
if _, err := tx.ExecContext(
ctx,
`UPDATE leases
SET released_at = ?
WHERE thread_id = ?
AND released_at IS NULL`,
formatTime(now),
thread.ThreadID,
); err != nil {
return Thread{}, Message{}, fmt.Errorf("release lease on cancel: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: thread.RunID,
TaskID: thread.TaskID,
ThreadID: thread.ThreadID,
Source: "inbox",
EventType: "thread_cancelled",
MessageID: message.MessageID,
Summary: message.Summary,
PayloadJSON: string(message.PayloadJSON),
CreatedAt: now,
}); err != nil {
return Thread{}, Message{}, err
}
if err := tx.Commit(); err != nil {
return Thread{}, Message{}, fmt.Errorf("commit cancel transaction: %w", err)
}
thread.Status = "cancelled"
thread.LatestMessageID = message.MessageID
thread.UpdatedAt = now
return thread, message, nil
}
func (s *InboxStore) GetThread(ctx context.Context, threadID string) (ThreadDetail, error) {
thread, err := selectThread(ctx, s.db, threadID)
if err != nil {
@@ -658,6 +1014,107 @@ func (s *InboxStore) GetThread(ctx context.Context, threadID string) (ThreadDeta
}, nil
}
func (s *InboxStore) WatchThreads(ctx context.Context, input WatchInput) (WatchResult, error) {
cursor := input.AfterEventID
if input.StartFromNow && cursor == 0 {
current, err := s.currentMaxEventID(ctx)
if err != nil {
return WatchResult{}, err
}
cursor = current
}
waitCtx := ctx
cancel := func() {}
if input.Timeout > 0 {
waitCtx, cancel = context.WithTimeout(ctx, input.Timeout)
}
defer cancel()
for {
thread, message, event, found, err := s.findWatchEventAfter(waitCtx, input, cursor)
if err != nil {
if isDeadlineExceeded(waitCtx) {
return WatchResult{Woke: false, NextEventID: cursor}, nil
}
return WatchResult{}, err
}
if found {
return WatchResult{
Woke: true,
NextEventID: event.EventID,
Thread: &thread,
Message: &message,
Event: &event,
}, nil
}
ok, err := waitForNextPoll(waitCtx, 200*time.Millisecond)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return WatchResult{Woke: false, NextEventID: cursor}, nil
}
return WatchResult{}, err
}
if !ok {
return WatchResult{Woke: false, NextEventID: cursor}, nil
}
}
}
func (s *InboxStore) WaitReply(ctx context.Context, input WaitReplyInput) (WaitReplyResult, error) {
cursor := input.AfterEventID
if input.AfterMessageID != "" {
eventID, err := s.lookupEventIDForMessage(ctx, input.ThreadID, input.AfterMessageID)
if err != nil {
return WaitReplyResult{}, err
}
if eventID > cursor {
cursor = eventID
}
}
kinds := input.Kinds
if len(kinds) == 0 {
kinds = []string{"answer", "control", "result"}
}
waitCtx := ctx
cancel := func() {}
if input.Timeout > 0 {
waitCtx, cancel = context.WithTimeout(ctx, input.Timeout)
}
defer cancel()
for {
message, eventID, found, err := s.findReplyAfter(waitCtx, input.ThreadID, cursor, kinds)
if err != nil {
if isDeadlineExceeded(waitCtx) {
return WaitReplyResult{Woke: false, NextEventID: cursor}, nil
}
return WaitReplyResult{}, err
}
if found {
return WaitReplyResult{
Woke: true,
NextEventID: eventID,
Message: &message,
}, nil
}
ok, err := waitForNextPoll(waitCtx, 200*time.Millisecond)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return WaitReplyResult{Woke: false, NextEventID: cursor}, nil
}
return WaitReplyResult{}, err
}
if !ok {
return WaitReplyResult{Woke: false, NextEventID: cursor}, nil
}
}
}
type threadScanner interface {
Scan(dest ...any) error
}
@@ -719,6 +1176,36 @@ func scanMessage(scanner threadScanner) (Message, error) {
return message, nil
}
func scanEvent(scanner threadScanner) (Event, error) {
var (
event Event
messageID sql.NullString
payload, createdAt string
)
if err := scanner.Scan(
&event.EventID,
&event.RunID,
&event.TaskID,
&event.ThreadID,
&event.Source,
&event.EventType,
&messageID,
&event.Summary,
&payload,
&createdAt,
); err != nil {
return Event{}, fmt.Errorf("scan event: %w", err)
}
if messageID.Valid {
event.MessageID = messageID.String
}
event.PayloadJSON = json.RawMessage(payload)
event.CreatedAt = parseTime(createdAt)
return event, nil
}
func selectThread(ctx context.Context, db queryRower, threadID string) (Thread, error) {
row := db.QueryRowContext(
ctx,
@@ -732,7 +1219,7 @@ func selectThread(ctx context.Context, db queryRower, threadID string) (Thread,
thread, err := scanThread(row)
if errors.Is(err, sql.ErrNoRows) {
return Thread{}, fmt.Errorf("thread %s not found", threadID)
return Thread{}, fmt.Errorf("%w: %s", ErrThreadNotFound, threadID)
}
return thread, err
}
@@ -821,6 +1308,231 @@ func updateThreadState(ctx context.Context, tx *sql.Tx, threadID, status, assign
return nil
}
func requireActiveLease(ctx context.Context, tx *sql.Tx, threadID, agent string, now time.Time) (string, error) {
var (
activeAgent string
leaseToken string
expiresAt string
releasedAt sql.NullString
)
err := tx.QueryRowContext(
ctx,
`SELECT agent_id, lease_token, expires_at, released_at
FROM leases
WHERE thread_id = ?`,
threadID,
).Scan(&activeAgent, &leaseToken, &expiresAt, &releasedAt)
if errors.Is(err, sql.ErrNoRows) {
return "", ErrNoActiveLease
}
if err != nil {
return "", fmt.Errorf("read lease: %w", err)
}
if releasedAt.Valid || !parseTime(expiresAt).After(now) {
return "", ErrNoActiveLease
}
if activeAgent != agent {
return "", ErrLeaseConflict
}
return leaseToken, nil
}
func (s *InboxStore) lookupEventIDForMessage(ctx context.Context, threadID, messageID string) (int64, error) {
var eventID int64
err := s.db.QueryRowContext(
ctx,
`SELECT event_id
FROM events
WHERE thread_id = ?
AND message_id = ?
ORDER BY event_id DESC
LIMIT 1`,
threadID,
messageID,
).Scan(&eventID)
if errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("message %s not found in thread %s", messageID, threadID)
}
if err != nil {
return 0, fmt.Errorf("lookup message event: %w", err)
}
return eventID, nil
}
func (s *InboxStore) currentMaxEventID(ctx context.Context) (int64, error) {
var maxEventID int64
if err := s.db.QueryRowContext(ctx, `SELECT COALESCE(MAX(event_id), 0) FROM events`).Scan(&maxEventID); err != nil {
return 0, fmt.Errorf("query max event id: %w", err)
}
return maxEventID, nil
}
func (s *InboxStore) findReplyAfter(ctx context.Context, threadID string, afterEventID int64, kinds []string) (Message, int64, bool, error) {
args := []any{threadID, afterEventID}
query := `SELECT
e.event_id,
m.message_id, m.thread_id, m.from_agent, m.to_agent, m.kind, m.summary, m.body, m.payload_json, m.created_at
FROM events e
JOIN messages m ON m.message_id = e.message_id
WHERE e.thread_id = ?
AND e.event_id > ?`
if len(kinds) > 0 {
query += " AND m.kind IN (" + placeholders(len(kinds)) + ")"
for _, kind := range kinds {
args = append(args, kind)
}
}
query += " ORDER BY e.event_id ASC LIMIT 1"
row := s.db.QueryRowContext(ctx, query, args...)
var (
eventID int64
message Message
payload string
created string
)
err := row.Scan(
&eventID,
&message.MessageID,
&message.ThreadID,
&message.FromAgent,
&message.ToAgent,
&message.Kind,
&message.Summary,
&message.Body,
&payload,
&created,
)
if errors.Is(err, sql.ErrNoRows) {
return Message{}, 0, false, nil
}
if err != nil {
return Message{}, 0, false, fmt.Errorf("query reply after event %d: %w", afterEventID, err)
}
message.PayloadJSON = json.RawMessage(payload)
message.CreatedAt = parseTime(created)
return message, eventID, true, nil
}
func (s *InboxStore) findWatchEventAfter(ctx context.Context, input WatchInput, afterEventID int64) (Thread, Message, Event, bool, error) {
args := []any{afterEventID}
query := `SELECT
t.thread_id, t.run_id, t.task_id, t.subject, t.created_by, t.assigned_to, t.status,
t.priority, t.latest_message_id, t.created_at, t.updated_at,
e.event_id, e.run_id, e.task_id, e.thread_id, e.source, e.event_type, e.message_id, e.summary, e.payload_json, e.created_at,
m.message_id, m.thread_id, m.from_agent, m.to_agent, m.kind, m.summary, m.body, m.payload_json, m.created_at
FROM events e
JOIN threads t ON t.thread_id = e.thread_id
JOIN messages m ON m.message_id = e.message_id
WHERE e.event_id > ?`
if input.Agent != "" {
query += " AND t.assigned_to = ?"
args = append(args, input.Agent)
}
if len(input.Statuses) > 0 {
query += " AND t.status IN (" + placeholders(len(input.Statuses)) + ")"
for _, status := range input.Statuses {
args = append(args, status)
}
}
query += " ORDER BY e.event_id ASC LIMIT 1"
row := s.db.QueryRowContext(ctx, query, args...)
var (
thread Thread
threadCreatedAt string
threadUpdatedAt string
threadLatestMessage sql.NullString
event Event
eventMessageID sql.NullString
eventPayload string
eventCreatedAt string
message Message
messagePayload string
messageCreatedAt string
)
err := row.Scan(
&thread.ThreadID,
&thread.RunID,
&thread.TaskID,
&thread.Subject,
&thread.CreatedBy,
&thread.AssignedTo,
&thread.Status,
&thread.Priority,
&threadLatestMessage,
&threadCreatedAt,
&threadUpdatedAt,
&event.EventID,
&event.RunID,
&event.TaskID,
&event.ThreadID,
&event.Source,
&event.EventType,
&eventMessageID,
&event.Summary,
&eventPayload,
&eventCreatedAt,
&message.MessageID,
&message.ThreadID,
&message.FromAgent,
&message.ToAgent,
&message.Kind,
&message.Summary,
&message.Body,
&messagePayload,
&messageCreatedAt,
)
if errors.Is(err, sql.ErrNoRows) {
return Thread{}, Message{}, Event{}, false, nil
}
if err != nil {
return Thread{}, Message{}, Event{}, false, fmt.Errorf("query watch event after %d: %w", afterEventID, err)
}
if threadLatestMessage.Valid {
thread.LatestMessageID = threadLatestMessage.String
}
thread.CreatedAt = parseTime(threadCreatedAt)
thread.UpdatedAt = parseTime(threadUpdatedAt)
if eventMessageID.Valid {
event.MessageID = eventMessageID.String
}
event.PayloadJSON = json.RawMessage(eventPayload)
event.CreatedAt = parseTime(eventCreatedAt)
message.PayloadJSON = json.RawMessage(messagePayload)
message.CreatedAt = parseTime(messageCreatedAt)
return thread, message, event, true, nil
}
func waitForNextPoll(ctx context.Context, interval time.Duration) (bool, error) {
timer := time.NewTimer(interval)
defer timer.Stop()
select {
case <-ctx.Done():
return false, ctx.Err()
case <-timer.C:
return true, nil
}
}
func isTerminalStatus(status string) bool {
return status == "done" || status == "failed" || status == "cancelled"
}
func isDeadlineExceeded(ctx context.Context) bool {
return ctx.Err() != nil && errors.Is(ctx.Err(), context.DeadlineExceeded)
}
func defaultID(value, prefix string) string {
if value != "" {
return value