package inbox import ( "fmt" "time" "ai-workflow-skill/packages/coord-core/protocol" "ai-workflow-skill/packages/coord-core/store" "github.com/spf13/cobra" ) type waitReplyOptions struct { threadID string afterMessageID string afterEventID int64 kinds string timeoutSeconds int } func newWaitReplyCmd(root *rootOptions) *cobra.Command { opts := &waitReplyOptions{} cmd := &cobra.Command{ Use: "wait-reply", Short: "Block until a reply-like message appears in a thread", Long: helpLong( "Use wait-reply after a worker has marked a thread blocked and needs a leader response.", "This is the worker-side blocking primitive; prefer it over ad hoc sleep loops.", "Resume with --after-event or --after-message when you already know the last event or message you processed.", "wait-reply watches one thread, unlike watch which can observe broader inbox activity.", ), Example: ` inbox --db .agents/coord.db wait-reply --thread thr_123 inbox --db .agents/coord.db wait-reply --thread thr_123 --after-event 42 --timeout-seconds 900 inbox --db .agents/coord.db wait-reply --thread thr_123 --kinds answer,result`, RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() sqlDB, err := openInboxDB(ctx, root.dbPath) if err != nil { return err } defer sqlDB.Close() s := store.NewInboxStore(sqlDB) agent := root.agent result, err := s.WaitReply(ctx, store.WaitReplyInput{ ThreadID: opts.threadID, AfterMessageID: opts.afterMessageID, AfterEventID: opts.afterEventID, Kinds: parseCSV(opts.kinds), Agent: agent, Timeout: time.Duration(opts.timeoutSeconds) * time.Second, }) if err != nil { return err } if !result.Woke { return protocol.NoMatchingWork("no matching reply before timeout") } data := map[string]any{ "woke": result.Woke, "next_event_id": result.NextEventID, } if result.Message != nil { data["message"] = result.Message } resp := protocol.Success{ OK: true, Command: "wait-reply", Data: data, } if root.json { return protocol.WriteJSON(cmd.OutOrStdout(), resp) } _, err = fmt.Fprintf(cmd.OutOrStdout(), "reply received on thread %s at event %d\n", result.Message.ThreadID, result.NextEventID) return err }, } cmd.Flags().StringVar(&opts.threadID, "thread", "", "Thread ID") cmd.Flags().StringVar(&opts.afterMessageID, "after-message", "", "Resume after a known message ID") cmd.Flags().Int64Var(&opts.afterEventID, "after-event", 0, "Resume after a known event ID") cmd.Flags().StringVar(&opts.kinds, "kinds", "answer,control,result", "Comma-separated message kinds to wake on") cmd.Flags().IntVar(&opts.timeoutSeconds, "timeout-seconds", 0, "Maximum time to wait; 0 waits forever") _ = cmd.MarkFlagRequired("thread") return cmd }