Files

1435 lines
45 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package leaderloop
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"inbox/internal/app/runtimecodex"
"inbox/internal/app/runtimeconfig"
"inbox/internal/app/workspaceruntime"
"inbox/internal/base/slug"
"inbox/internal/base/timeutil"
"inbox/internal/domain/lane"
"inbox/internal/domain/message"
"inbox/internal/domain/task"
"inbox/internal/domain/taskgraph"
"inbox/internal/domain/topic"
"inbox/internal/domain/workflow"
"inbox/internal/domain/workspace"
)
const (
defaultPollInterval = 2 * time.Second
defaultLease = 2 * time.Minute
)
type Repository interface {
ListWorkspaces(ctx context.Context, projectID string) ([]workspace.Workspace, error)
ClaimNextDelivery(ctx context.Context, workspaceID string, roleNames []string, staleBefore string) (message.DeliveryClaim, error)
GetTopic(ctx context.Context, topicID string) (topic.Record, error)
UpdateTopic(ctx context.Context, value topic.Record) (topic.Record, error)
ListMessagesByTopic(ctx context.Context, topicID string) ([]message.Record, error)
CreateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error)
UpdateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error)
AppendWorkflowRunLog(ctx context.Context, value workflow.RunLog) (workflow.RunLog, error)
ArchiveDelivery(ctx context.Context, messageID, roleName string) error
CreateMessage(ctx context.Context, value message.Record) (message.Record, error)
ListLanesByTopic(ctx context.Context, topicID string) ([]lane.Record, error)
CreateLane(ctx context.Context, value lane.Record) (lane.Record, error)
ListTasksByTopic(ctx context.Context, topicID string) ([]task.Record, error)
CreateTask(ctx context.Context, value task.Record, dependencies []task.Dependency) (task.Record, error)
CreateTaskGraphVersion(ctx context.Context, value taskgraph.Record) (taskgraph.Record, error)
GetLatestTaskGraphVersionByTopic(ctx context.Context, topicID string) (taskgraph.Record, error)
UpdateTaskGraphVersion(ctx context.Context, value taskgraph.Record) (taskgraph.Record, error)
}
type RuntimeResolver interface {
ResolveRole(ctx context.Context, workspaceID, roleName string) (runtimeconfig.ResolvedRole, error)
}
type WorkspaceRuntime interface {
Ensure(ctx context.Context, workspaceID string) (workspace.Workspace, workspaceruntime.Runtime, error)
EnsureLane(ctx context.Context, laneID string) (lane.Record, error)
}
type CodexRunner interface {
Run(ctx context.Context, workspaceRoot string, resolved runtimeconfig.ResolvedRole, prompt, schemaJSON string) (RunResult, error)
}
type RunResult struct {
ExitCode int
Status workflow.RunStatus
ResultJSON string
ErrorMessage string
StdoutLines []string
StderrLines []string
}
type Service struct {
repo Repository
resolver RuntimeResolver
runtime WorkspaceRuntime
runner CodexRunner
projectRoot string
clock timeutil.Clock
pollInterval time.Duration
lease time.Duration
}
type Result struct {
Claim message.DeliveryClaim
Run workflow.Run
Reply *message.Record
Lanes []lane.Record
Tasks []task.Record
}
func NewService(repo Repository, resolver RuntimeResolver, runtime WorkspaceRuntime, runner CodexRunner, clock timeutil.Clock, projectRoot string) *Service {
if clock == nil {
clock = timeutil.SystemClock{}
}
if runner == nil {
runner = hostCodexRunner{projectRoot: strings.TrimSpace(projectRoot)}
}
return &Service{
repo: repo,
resolver: resolver,
runtime: runtime,
runner: runner,
projectRoot: strings.TrimSpace(projectRoot),
clock: clock,
pollInterval: defaultPollInterval,
lease: defaultLease,
}
}
func (s *Service) Run(ctx context.Context) error {
ticker := time.NewTicker(s.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, err := s.ProcessOnce(ctx); err != nil && err != context.Canceled {
// Keep the daemon alive; failures are persisted into workflow runs and reply messages.
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
func (s *Service) ProcessOnce(ctx context.Context) (*Result, error) {
workspaces, err := s.repo.ListWorkspaces(ctx, "")
if err != nil {
return nil, err
}
staleBefore := timeutil.FormatRFC3339(s.clock.Now().Add(-s.lease))
for _, ws := range workspaces {
claim, err := s.repo.ClaimNextDelivery(ctx, ws.ID, []string{"leader"}, staleBefore)
if err != nil {
continue
}
result, processErr := s.processClaim(ctx, claim)
if processErr != nil {
return nil, processErr
}
return result, nil
}
return nil, nil
}
func (s *Service) processClaim(ctx context.Context, claim message.DeliveryClaim) (*Result, error) {
ws, _, err := s.runtime.Ensure(ctx, claim.Message.WorkspaceID)
if err != nil {
return nil, err
}
topicRecord, err := s.repo.GetTopic(ctx, claim.Message.TopicID)
if err != nil {
return nil, err
}
if strings.TrimSpace(topicRecord.Status) == "cancelled" {
if err := s.repo.ArchiveDelivery(ctx, claim.Message.ID, claim.RecipientRoleName); err != nil {
return nil, err
}
return &Result{Claim: claim}, nil
}
messages, err := s.repo.ListMessagesByTopic(ctx, topicRecord.ID)
if err != nil {
return nil, err
}
existingLanes, err := s.repo.ListLanesByTopic(ctx, topicRecord.ID)
if err != nil {
return nil, err
}
existingTasks, err := s.repo.ListTasksByTopic(ctx, topicRecord.ID)
if err != nil {
return nil, err
}
shouldFreezeInitialPlan := len(existingLanes) == 0 &&
len(existingTasks) == 0 &&
claim.Message.FromRoleName == "user"
if shouldSkipLeaderPlanning(claim.Message, topicRecord, existingLanes, existingTasks) {
if err := s.repo.ArchiveDelivery(ctx, claim.Message.ID, claim.RecipientRoleName); err != nil {
return nil, err
}
return &Result{Claim: claim}, nil
}
resolved, err := s.resolver.ResolveRole(ctx, claim.Message.WorkspaceID, "leader")
if err != nil {
return nil, err
}
snapshot, err := resolved.SnapshotJSON()
if err != nil {
return nil, err
}
run, err := s.repo.CreateWorkflowRun(ctx, workflow.Run{
WorkspaceID: claim.Message.WorkspaceID,
TopicID: claim.Message.TopicID,
RoleName: "leader",
Stage: workflow.StagePlan,
Mode: "message",
Status: workflow.RunStatusRunning,
RequestMessageID: claim.Message.ID,
ConfigSnapshotJSON: snapshot,
CommandJSON: `{"mode":"leader"}`,
})
if err != nil {
return nil, err
}
prompt := buildLeaderPrompt(resolved, topicRecord, claim.Message, messages, existingLanes, existingTasks, shouldFreezeInitialPlan)
runResult, runErr := s.runner.Run(ctx, ws.RootPath, resolved, prompt, leaderOutputSchema(shouldFreezeInitialPlan))
if runErr != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = strings.TrimSpace(runErr.Error())
}
for _, line := range runResult.StdoutLines {
if strings.TrimSpace(line) == "" {
continue
}
_, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{
RunID: run.ID,
Stream: workflow.LogStreamStdout,
Content: line,
})
}
for _, line := range runResult.StderrLines {
if strings.TrimSpace(line) == "" {
continue
}
_, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{
RunID: run.ID,
Stream: workflow.LogStreamStderr,
Content: line,
})
}
var reply *message.Record
var createdLanes []lane.Record
var createdTasks []task.Record
graphVersionStatus := taskgraph.StatusActive
if runResult.Status == workflow.RunStatusSucceeded {
output, err := parseLeaderOutput(runResult.ResultJSON, shouldFreezeInitialPlan)
if err != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = err.Error()
} else {
freezeInitialGraph := shouldFreezeInitialPlan && len(output.Tasks) > 0
latestTopic, latestTopicErr := s.repo.GetTopic(ctx, claim.Message.TopicID)
if latestTopicErr != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = latestTopicErr.Error()
} else if strings.TrimSpace(latestTopic.Status) == "cancelled" {
runResult.Status = workflow.RunStatusCancelled
runResult.ExitCode = 130
runResult.ErrorMessage = "Topic was stopped before leader output could be applied."
} else {
if err := validateLeaderOutputForTopic(output, existingLanes, existingTasks); err != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = err.Error()
} else {
run.CommandJSON = mustEncodeLeaderCommand(output)
reply, createdLanes, createdTasks, err = s.applyLeaderOutput(ctx, ws, claim.Message, output)
if err != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = err.Error()
} else {
if err := s.recordTaskGraphVersion(ctx, claim.Message.TopicID, claim.Message.FromRoleName, output, freezeInitialGraph); err != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = err.Error()
} else if freezeInitialGraph {
topicRecord.Status = "awaiting_confirmation"
if _, err := s.repo.UpdateTopic(ctx, topicRecord); err != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = err.Error()
} else {
graphVersionStatus = taskgraph.StatusDraft
}
} else if output.ExecutionMode == "plan_and_start" {
autoStartLanes, startErr := s.resolveAutoStartLanes(ctx, claim.Message.TopicID, map[string]struct{}{})
if startErr != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = startErr.Error()
} else {
for idx := range autoStartLanes {
started, ensureErr := s.runtime.EnsureLane(ctx, autoStartLanes[idx].ID)
if ensureErr != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = ensureErr.Error()
break
}
for createdIdx := range createdLanes {
if createdLanes[createdIdx].ID == started.ID {
createdLanes[createdIdx] = started
}
}
}
}
}
}
}
}
}
}
if reply == nil && runResult.Status == workflow.RunStatusFailed {
failureBody := strings.TrimSpace(runResult.ErrorMessage)
if failureBody == "" {
failureBody = "Leader execution failed without an explicit error message."
}
item, msgErr := s.repo.CreateMessage(ctx, message.Record{
WorkspaceID: claim.Message.WorkspaceID,
TopicID: claim.Message.TopicID,
FromRoleName: "leader",
ToExpr: claim.Message.FromRoleName,
Type: message.TypeSummary,
Stage: claim.Message.Stage,
ReplyToMessageID: claim.Message.ID,
BodyMarkdown: "Leader execution failed.\n\n" + failureBody,
CreatedAt: timeutil.FormatRFC3339(s.clock.Now()),
})
if msgErr == nil {
reply = &item
}
}
run.Status = runResult.Status
run.ExitCode = runResult.ExitCode
run.ErrorMessage = strings.TrimSpace(runResult.ErrorMessage)
run.CompletedAt = timeutil.FormatRFC3339(s.clock.Now())
if shouldFreezeInitialPlan && run.Status == workflow.RunStatusSucceeded && graphVersionStatus == taskgraph.StatusDraft {
run.CommandJSON = mustEncodeLeaderCommandWithGraphStatus(run.CommandJSON, "draft")
}
if reply != nil {
run.ReplyMessageID = reply.ID
}
run, err = s.repo.UpdateWorkflowRun(ctx, run)
if err != nil {
return nil, err
}
if err := s.repo.ArchiveDelivery(ctx, claim.Message.ID, claim.RecipientRoleName); err != nil {
return nil, err
}
return &Result{
Claim: claim,
Run: run,
Reply: reply,
Lanes: createdLanes,
Tasks: createdTasks,
}, nil
}
func shouldSkipLeaderPlanning(msg message.Record, record topic.Record, lanes []lane.Record, tasks []task.Record) bool {
if msg.FromRoleName != "worker" {
return false
}
switch strings.TrimSpace(record.Status) {
case "awaiting_confirmation", "cancelled", "completed":
return true
}
return false
}
func hasRunningTask(items []task.Record) bool {
for _, item := range items {
if item.Status == task.StatusRunning {
return true
}
}
return false
}
func hasRunningLane(items []lane.Record) bool {
for _, item := range items {
if item.Status == lane.StatusRunning {
return true
}
}
return false
}
func hasFailedOrBlockedGraph(lanes []lane.Record, tasks []task.Record) bool {
for _, item := range lanes {
if item.Status == lane.StatusBlocked || item.Status == lane.StatusFailed {
return true
}
}
for _, item := range tasks {
if item.Status == task.StatusFailed || item.Status == task.StatusBlocked {
return true
}
}
return false
}
func allTasksTerminal(items []task.Record) bool {
if len(items) == 0 {
return false
}
for _, item := range items {
switch item.Status {
case task.StatusSucceeded, task.StatusFailed, task.StatusCancelled:
default:
return false
}
}
return true
}
type leaderOutput struct {
PlanSummaryMarkdown string `json:"plan_summary_markdown"`
PlanMode string `json:"plan_mode"`
ReplanReason string `json:"replan_reason"`
ExecutionMode string `json:"execution_mode"`
LeaderReply leaderReplySpec `json:"leader_reply"`
Tasks []leaderTaskSpec `json:"tasks"`
}
type leaderReplySpec struct {
Markdown string `json:"markdown"`
Type string `json:"type"`
}
type leaderTaskSpec struct {
Key string `json:"key"`
Title string `json:"title"`
BodyMarkdown string `json:"body_markdown"`
AcceptanceMarkdown string `json:"acceptance_markdown"`
Kind string `json:"kind"`
Deliverables []string `json:"deliverables"`
Priority int `json:"priority"`
TaskOrder int `json:"task_order"`
DependsOn []string `json:"depends_on"`
}
func parseLeaderOutput(raw string, initialFreeze bool) (leaderOutput, error) {
var output leaderOutput
if strings.TrimSpace(raw) == "" {
return output, fmt.Errorf("leader returned empty output")
}
decoder := json.NewDecoder(bytes.NewReader([]byte(raw)))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&output); err != nil {
return output, fmt.Errorf("decode leader output: %w", err)
}
if err := validateLeaderOutput(output, initialFreeze); err != nil {
return output, err
}
return output, nil
}
func validateLeaderOutputForTopic(output leaderOutput, existingLanes []lane.Record, existingTasks []task.Record) error {
hasExistingGraph := len(existingLanes) > 0 || len(existingTasks) > 0
switch strings.TrimSpace(output.PlanMode) {
case "initial":
if hasExistingGraph {
return fmt.Errorf("leader must use plan_mode=patch when topic already has lanes or tasks")
}
case "patch":
if !hasExistingGraph {
return fmt.Errorf("leader cannot use plan_mode=patch on an empty topic graph")
}
if strings.TrimSpace(output.ReplanReason) == "" {
return fmt.Errorf("leader patch mode requires replan_reason")
}
default:
return fmt.Errorf("leader output has invalid plan_mode %q", output.PlanMode)
}
return validateLeaderDependencyKeys(output, existingTasks)
}
func validateLeaderDependencyKeys(output leaderOutput, existingTasks []task.Record) error {
taskKeys := make(map[string]struct{}, len(output.Tasks)+len(existingTasks))
for _, item := range existingTasks {
taskKeys[item.ID] = struct{}{}
}
for _, item := range output.Tasks {
taskKeys[strings.TrimSpace(item.Key)] = struct{}{}
}
for _, item := range output.Tasks {
for _, dep := range item.DependsOn {
if _, ok := taskKeys[strings.TrimSpace(dep)]; !ok {
return fmt.Errorf("leader task %q references unknown dependency key %q", item.Key, dep)
}
}
}
return nil
}
func (s *Service) applyLeaderOutput(ctx context.Context, ws workspace.Workspace, source message.Record, output leaderOutput) (*message.Record, []lane.Record, []task.Record, error) {
var reply *message.Record
replyBody := strings.TrimSpace(output.LeaderReply.Markdown)
if replyBody == "" {
replyBody = strings.TrimSpace(output.PlanSummaryMarkdown)
}
if replyBody != "" {
replyType := message.Type(strings.TrimSpace(output.LeaderReply.Type))
if replyType == "" {
replyType = message.TypeSummary
}
item, err := s.repo.CreateMessage(ctx, message.Record{
WorkspaceID: source.WorkspaceID,
TopicID: source.TopicID,
FromRoleName: "leader",
ToExpr: source.FromRoleName,
Type: replyType,
Stage: source.Stage,
ReplyToMessageID: source.ID,
BodyMarkdown: replyBody,
CreatedAt: timeutil.FormatRFC3339(s.clock.Now()),
})
if err != nil {
return nil, nil, nil, err
}
reply = &item
}
existingLanes, err := s.repo.ListLanesByTopic(ctx, source.TopicID)
if err != nil {
return reply, nil, nil, err
}
existingTasks, err := s.repo.ListTasksByTopic(ctx, source.TopicID)
if err != nil {
return reply, nil, nil, err
}
usedLaneSlugs := map[string]struct{}{}
laneByID := map[string]lane.Record{}
for _, item := range existingLanes {
usedLaneSlugs[item.Slug] = struct{}{}
laneByID[item.ID] = item
}
createdLanes := make([]lane.Record, 0)
reducedDeps := reduceLeaderTaskDependencies(output.Tasks)
orderedTasks, err := topoSortLeaderTasks(output.Tasks, reducedDeps)
if err != nil {
return reply, nil, nil, err
}
for idx := range orderedTasks {
orderedTasks[idx] = normalizeLeaderTaskSpec(orderedTasks[idx], idx)
}
reducedDeps = reduceLeaderTaskDependencies(orderedTasks)
childCounts := countLeaderTaskChildren(reducedDeps)
laneSeeds := deriveLeaderTaskLaneSeeds(orderedTasks, reducedDeps, childCounts)
taskSpecByKey := make(map[string]leaderTaskSpec, len(orderedTasks))
for _, item := range orderedTasks {
taskSpecByKey[strings.TrimSpace(item.Key)] = item
}
taskKeyToID := map[string]string{}
for _, existing := range existingTasks {
taskKeyToID[existing.ID] = existing.ID
}
seedToLaneID := map[string]string{}
createdTasks := make([]task.Record, 0, len(orderedTasks))
hasGateTasks := false
for _, spec := range orderedTasks {
if strings.TrimSpace(spec.Kind) == "gate" {
hasGateTasks = true
break
}
}
for _, spec := range orderedTasks {
seed := laneSeeds[strings.TrimSpace(spec.Key)]
if seed == "" {
return reply, createdLanes, createdTasks, fmt.Errorf("missing derived lane seed for task %q", spec.Key)
}
laneSpec := laneSpecForSeed(taskSpecByKey, seed, spec)
laneRecord, laneCreated, err := s.ensureDerivedLane(ctx, ws, source, seed, laneSpec, usedLaneSlugs, seedToLaneID, laneByID)
if err != nil {
return reply, createdLanes, createdTasks, err
}
if laneCreated != nil {
createdLanes = append(createdLanes, *laneCreated)
laneByID[laneCreated.ID] = *laneCreated
}
status := task.StatusDraft
if len(reducedDeps[strings.TrimSpace(spec.Key)]) == 0 {
status = task.StatusReady
}
if hasGateTasks && strings.TrimSpace(spec.Kind) != "gate" {
status = task.StatusDraft
}
item, err := s.repo.CreateTask(ctx, task.Record{
WorkspaceID: source.WorkspaceID,
TopicID: source.TopicID,
LaneID: laneRecord.ID,
Title: spec.Title,
BodyMarkdown: spec.BodyMarkdown,
AcceptanceMarkdown: strings.TrimSpace(spec.AcceptanceMarkdown),
Kind: task.Kind(strings.TrimSpace(spec.Kind)),
Deliverables: append([]string(nil), spec.Deliverables...),
Status: status,
Priority: spec.Priority,
TaskOrder: spec.TaskOrder,
CreatedByRoleName: "leader",
}, nil)
if err != nil {
return reply, createdLanes, createdTasks, err
}
key := strings.TrimSpace(spec.Key)
if key == "" {
key = item.ID
}
taskKeyToID[key] = item.ID
createdTasks = append(createdTasks, item)
}
for idx, spec := range orderedTasks {
depRefs := reducedDeps[strings.TrimSpace(spec.Key)]
if len(depRefs) == 0 {
continue
}
deps := make([]task.Dependency, 0, len(depRefs))
for _, key := range depRefs {
depID, ok := taskKeyToID[strings.TrimSpace(key)]
if !ok {
return reply, createdLanes, createdTasks, fmt.Errorf("leader output references unknown task dependency key %q", key)
}
deps = append(deps, task.Dependency{DependsOnTaskID: depID})
}
updated, err := s.setTaskDependencies(ctx, createdTasks[idx], deps)
if err != nil {
return reply, createdLanes, createdTasks, err
}
createdTasks[idx] = updated
}
return reply, createdLanes, createdTasks, nil
}
func (s *Service) ensureDerivedLane(
ctx context.Context,
ws workspace.Workspace,
source message.Record,
seed string,
spec leaderTaskSpec,
usedLaneSlugs map[string]struct{},
seedToLaneID map[string]string,
laneByID map[string]lane.Record,
) (lane.Record, *lane.Record, error) {
if laneID, ok := seedToLaneID[seed]; ok {
if laneRecord, exists := laneByID[laneID]; exists {
return laneRecord, nil, nil
}
}
baseName := strings.TrimSpace(spec.Title)
if baseName == "" {
baseName = "Execution lane"
}
baseSlug := slug.Normalize(baseName)
if baseSlug == "" {
baseSlug = "lane"
}
uniqueSlug := baseSlug
for idx := 2; ; idx++ {
if _, ok := usedLaneSlugs[uniqueSlug]; !ok {
break
}
uniqueSlug = fmt.Sprintf("%s-%d", baseSlug, idx)
}
usedLaneSlugs[uniqueSlug] = struct{}{}
item, err := s.repo.CreateLane(ctx, lane.Record{
WorkspaceID: source.WorkspaceID,
TopicID: source.TopicID,
Name: baseName,
Slug: uniqueSlug,
Purpose: strings.TrimSpace(spec.AcceptanceMarkdown),
Status: lane.StatusReady,
BranchName: lane.DefaultBranchName(ws.Slug, source.TopicID, uniqueSlug),
WorktreePath: lane.DefaultWorktreePath(ws.RootPath, ws.Slug, source.TopicID, uniqueSlug),
ContainerName: lane.DefaultContainerName(ws.Slug, source.TopicID, uniqueSlug),
CreatedByRoleName: "leader",
ResultSummaryMarkdown: strings.TrimSpace(spec.BodyMarkdown),
})
if err != nil {
return lane.Record{}, nil, err
}
seedToLaneID[seed] = item.ID
return item, &item, nil
}
func deriveLeaderTaskLaneSeeds(tasks []leaderTaskSpec, depsByKey map[string][]string, childCounts map[string]int) map[string]string {
childrenByKey := make(map[string][]string, len(tasks))
specByKey := make(map[string]leaderTaskSpec, len(tasks))
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
specByKey[key] = spec
for _, dep := range depsByKey[key] {
childrenByKey[dep] = append(childrenByKey[dep], key)
}
}
seedByTask := make(map[string]string, len(tasks))
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
deps := depsByKey[key]
switch len(deps) {
case 0:
seedByTask[key] = key
case 1:
parent := deps[0]
if childCounts[parent] <= 1 {
if parentSeed := seedByTask[parent]; parentSeed != "" {
seedByTask[key] = parentSeed
break
}
}
seedByTask[key] = key
default:
seedByTask[key] = key
}
}
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
if !isNonCodeLeaderTask(spec.Kind) {
continue
}
if sharedSeed := firstDependencySeed(seedByTask, depsByKey[key]); sharedSeed != "" {
seedByTask[key] = sharedSeed
continue
}
if sharedSeed := firstDescendantCodeSeed(seedByTask, specByKey, childrenByKey, key, map[string]struct{}{}); sharedSeed != "" {
seedByTask[key] = sharedSeed
continue
}
if sharedSeed := firstCodeTaskSeedInOrder(tasks, seedByTask); sharedSeed != "" {
seedByTask[key] = sharedSeed
}
}
return seedByTask
}
func laneSpecForSeed(specByKey map[string]leaderTaskSpec, seed string, fallback leaderTaskSpec) leaderTaskSpec {
if spec, ok := specByKey[strings.TrimSpace(seed)]; ok {
return spec
}
return fallback
}
func isNonCodeLeaderTask(kind string) bool {
switch strings.TrimSpace(kind) {
case "gate", "milestone":
return true
default:
return false
}
}
func firstDependencySeed(seedByTask map[string]string, deps []string) string {
for _, dep := range deps {
if seed := strings.TrimSpace(seedByTask[strings.TrimSpace(dep)]); seed != "" {
return seed
}
}
return ""
}
func firstDescendantCodeSeed(
seedByTask map[string]string,
specByKey map[string]leaderTaskSpec,
childrenByKey map[string][]string,
key string,
seen map[string]struct{},
) string {
key = strings.TrimSpace(key)
if key == "" {
return ""
}
if _, ok := seen[key]; ok {
return ""
}
seen[key] = struct{}{}
for _, child := range childrenByKey[key] {
child = strings.TrimSpace(child)
if child == "" {
continue
}
childSpec, ok := specByKey[child]
if ok && !isNonCodeLeaderTask(childSpec.Kind) {
if seed := strings.TrimSpace(seedByTask[child]); seed != "" {
return seed
}
}
if seed := firstDescendantCodeSeed(seedByTask, specByKey, childrenByKey, child, seen); seed != "" {
return seed
}
}
return ""
}
func firstCodeTaskSeedInOrder(tasks []leaderTaskSpec, seedByTask map[string]string) string {
for _, item := range tasks {
if isNonCodeLeaderTask(item.Kind) {
continue
}
if seed := strings.TrimSpace(seedByTask[strings.TrimSpace(item.Key)]); seed != "" {
return seed
}
}
return ""
}
func reduceLeaderTaskDependencies(tasks []leaderTaskSpec) map[string][]string {
depsByKey := make(map[string][]string, len(tasks))
taskSet := make(map[string]struct{}, len(tasks))
children := map[string][]string{}
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
if key == "" {
continue
}
taskSet[key] = struct{}{}
normalized := normalizeTaskRefs(spec.DependsOn)
depsByKey[key] = normalized
for _, dep := range normalized {
children[dep] = append(children[dep], key)
}
}
reachable := map[string]map[string]struct{}{}
var visit func(string) map[string]struct{}
visit = func(key string) map[string]struct{} {
if cached, ok := reachable[key]; ok {
return cached
}
out := map[string]struct{}{}
for _, child := range children[key] {
out[child] = struct{}{}
for grandChild := range visit(child) {
out[grandChild] = struct{}{}
}
}
reachable[key] = out
return out
}
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
normalized := depsByKey[key]
if len(normalized) < 2 {
continue
}
reduced := make([]string, 0, len(normalized))
for _, dep := range normalized {
redundant := false
for _, other := range normalized {
if dep == other {
continue
}
if _, ok := taskSet[other]; !ok {
continue
}
if descendants := visit(dep); descendants != nil {
if _, ok := descendants[other]; ok {
redundant = true
break
}
}
}
if !redundant {
reduced = append(reduced, dep)
}
}
depsByKey[key] = reduced
}
return depsByKey
}
func topoSortLeaderTasks(tasks []leaderTaskSpec, depsByKey map[string][]string) ([]leaderTaskSpec, error) {
byKey := make(map[string]leaderTaskSpec, len(tasks))
inDegree := make(map[string]int, len(tasks))
children := make(map[string][]string, len(tasks))
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
byKey[key] = spec
inDegree[key] = 0
}
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
for _, dep := range depsByKey[key] {
if _, ok := byKey[dep]; !ok {
continue
}
inDegree[key]++
children[dep] = append(children[dep], key)
}
}
queue := make([]string, 0, len(tasks))
for _, spec := range tasks {
key := strings.TrimSpace(spec.Key)
if inDegree[key] == 0 {
queue = append(queue, key)
}
}
ordered := make([]leaderTaskSpec, 0, len(tasks))
for len(queue) > 0 {
key := queue[0]
queue = queue[1:]
ordered = append(ordered, byKey[key])
for _, child := range children[key] {
inDegree[child]--
if inDegree[child] == 0 {
queue = append(queue, child)
}
}
}
if len(ordered) != len(tasks) {
return nil, fmt.Errorf("leader task graph contains a cycle or unresolved dependency ordering")
}
return ordered, nil
}
func countLeaderTaskChildren(depsByKey map[string][]string) map[string]int {
counts := map[string]int{}
for _, deps := range depsByKey {
for _, dep := range deps {
counts[dep]++
}
}
return counts
}
func normalizeTaskRefs(values []string) []string {
seen := map[string]struct{}{}
out := make([]string, 0, len(values))
for _, value := range values {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
continue
}
if _, ok := seen[trimmed]; ok {
continue
}
seen[trimmed] = struct{}{}
out = append(out, trimmed)
}
return out
}
func (s *Service) resolveAutoStartLanes(ctx context.Context, topicID string, exclude map[string]struct{}) ([]lane.Record, error) {
tasksByTopic, err := s.repo.ListTasksByTopic(ctx, topicID)
if err != nil {
return nil, err
}
if hasUnresolvedGateTasks(tasksByTopic) {
return nil, nil
}
lanesByTopic, err := s.repo.ListLanesByTopic(ctx, topicID)
if err != nil {
return nil, err
}
tasksByLane := make(map[string][]task.Record)
for _, item := range tasksByTopic {
tasksByLane[item.LaneID] = append(tasksByLane[item.LaneID], item)
}
out := make([]lane.Record, 0)
for _, item := range lanesByTopic {
if _, skip := exclude[item.ID]; skip {
continue
}
if item.Status != lane.StatusReady {
continue
}
if !hasReadyTask(tasksByLane[item.ID]) {
continue
}
out = append(out, item)
}
return out, nil
}
func hasUnresolvedGateTasks(items []task.Record) bool {
for _, item := range items {
if item.Kind != task.KindGate {
continue
}
switch item.Status {
case task.StatusSucceeded, task.StatusCancelled:
continue
default:
return true
}
}
return false
}
func hasReadyTask(items []task.Record) bool {
for _, item := range items {
if item.Status == task.StatusReady {
return true
}
}
return false
}
func (s *Service) setTaskDependencies(ctx context.Context, item task.Record, deps []task.Dependency) (task.Record, error) {
for idx := range deps {
deps[idx].TaskID = item.ID
}
txRepo, ok := s.repo.(interface {
UpdateTaskWithDependencies(ctx context.Context, value task.Record, dependencies *[]task.Dependency) (task.Record, error)
})
if !ok {
return item, fmt.Errorf("leaderloop repository cannot update task dependencies")
}
return txRepo.UpdateTaskWithDependencies(ctx, item, &deps)
}
const defaultLeaderSystemPrompt = `你是 leader。
你运行在宿主机上,是当前主题唯一的编排者。你负责与用户沟通、澄清目标、维护 task graph、启动执行,并汇总最终结果。lane 由系统从任务图自动派生。
规则:
- 先理解用户目标,再拆分任务;不要直接跳过澄清。
- 所有 task 都必须明确目标、依赖、验收标准。
- worker 的问题和总结都只作为编排输入,不要把它们直接当作完成证明。
- worker 只能向你升级阻塞;是否打扰用户由你决定。
- 你可以创建、更新、启动、停止 lane 与 task,但不要代替 worker 在容器里完成实现。`
func buildLeaderPrompt(resolved runtimeconfig.ResolvedRole, record topic.Record, msg message.Record, messages []message.Record, lanes []lane.Record, tasks []task.Record, initialFreeze bool) string {
var builder strings.Builder
builder.WriteString("## Role\n")
builder.WriteString(runtimeconfig.RenderInstructions(resolved, defaultLeaderSystemPrompt))
builder.WriteString("\n\n## Planning Contract\n")
builder.WriteString("规则:\n")
builder.WriteString("- 不要直接编写实现代码。\n")
builder.WriteString("- 如果信息不足,优先回复用户澄清,不要盲目创建 lane/task。\n")
if initialFreeze {
builder.WriteString("- 当前是初始建图阶段。只输出结构化 JSONplan_mode、execution_mode、plan_summary_markdown、leader_reply、tasks。\n")
} else {
builder.WriteString("- 一旦需要调整任务图,只输出结构化 JSONplan_mode、execution_mode、plan_summary_markdown、leader_reply、tasks。系统会自动从任务图派生 lanes。\n")
}
builder.WriteString("- worker 的总结只作为编排输入,不要把它当作最终完成证明。\n")
builder.WriteString("- 输出必须严格符合 JSON schema,不要添加额外文本。\n")
builder.WriteString("- 每个 task 至少要写清 key、title、body_markdown、acceptance_markdown、kind、depends_on;非 milestone task 还要给出 deliverables。\n")
builder.WriteString("- priority、task_order 只在你需要覆盖系统默认排序时再填写。\n")
builder.WriteString("- kind=milestone 表示汇聚节点,不会派发给 worker;它只在依赖都成功后自动完成。\n")
builder.WriteString("- 空图首次规划必须使用 plan_mode=initial;已有 lane/task 时必须使用 plan_mode=patch。\n")
builder.WriteString("- plan_mode=patch 时必须填写 replan_reason,简要说明为什么要调整现有任务图。\n")
if initialFreeze {
builder.WriteString("- execution_mode 只能是 clarify / plan_only。初始建图阶段禁止 plan_and_start。\n")
} else {
builder.WriteString("- execution_mode 只能是 clarify / plan_only / plan_and_start。\n")
}
builder.WriteString("- 当 execution_mode=clarify 时,不要创建 tasks。\n")
builder.WriteString("- gate task 用于执行前置检查;系统会自动把 graph 派生成 execution lanes,并保证 gate 优先。\n")
if initialFreeze {
builder.WriteString("- 当前 topic 还没有执行图。请先给出可确认的初始 DAG;如果信息不足就用 clarify,否则使用 execution_mode=plan_only。系统会在用户确认后再启动执行。\n")
}
builder.WriteString("\n## Topic\n")
builder.WriteString(fmt.Sprintf("- slug: %s\n- status: %s\n", record.Slug, record.Status))
builder.WriteString("\n## Incoming Message\n")
builder.WriteString(fmt.Sprintf("- from: %s\n- type: %s\n- stage: %s\n%s\n", msg.FromRoleName, msg.Type, msg.Stage, msg.BodyMarkdown))
if len(lanes) > 0 {
builder.WriteString("\n## Existing Execution Lanes\n")
for _, item := range lanes {
builder.WriteString(fmt.Sprintf("- %s (%s): lane_id=%s\n", item.Name, item.Status, item.ID))
}
}
if len(tasks) > 0 {
builder.WriteString("\n## Existing Tasks\n")
for _, item := range tasks {
builder.WriteString(fmt.Sprintf("### %s [%s]\n", item.Title, item.Status))
builder.WriteString(fmt.Sprintf("- task_id: %s\n- lane_id: %s\n- kind: %s\n", item.ID, item.LaneID, item.Kind))
if strings.TrimSpace(item.AcceptanceMarkdown) != "" {
builder.WriteString(fmt.Sprintf("- acceptance: %s\n", item.AcceptanceMarkdown))
}
if len(item.Deliverables) > 0 {
builder.WriteString(fmt.Sprintf("- deliverables: %s\n", strings.Join(item.Deliverables, ", ")))
}
if strings.TrimSpace(item.BlockingReasonMarkdown) != "" {
builder.WriteString(fmt.Sprintf("- blocking_reason: %s\n", item.BlockingReasonMarkdown))
}
builder.WriteString(item.BodyMarkdown)
builder.WriteString("\n")
}
}
if len(messages) > 1 {
builder.WriteString("\n## Recent Messages\n")
start := 0
if len(messages) > 8 {
start = len(messages) - 8
}
for _, item := range messages[start:] {
builder.WriteString(fmt.Sprintf("### %s -> %s (%s)\n%s\n", item.FromRoleName, item.ToExpr, item.Stage, item.BodyMarkdown))
}
}
return builder.String()
}
func leaderOutputSchema(initialFreeze bool) string {
if initialFreeze {
return `{
"type":"object",
"additionalProperties":false,
"required":["plan_summary_markdown","plan_mode","replan_reason","execution_mode","leader_reply","tasks"],
"properties":{
"plan_summary_markdown":{"type":"string"},
"plan_mode":{"type":"string","enum":["initial"]},
"replan_reason":{"type":"string"},
"execution_mode":{"type":"string","enum":["clarify","plan_only"]},
"leader_reply":{
"type":"object",
"additionalProperties":false,
"required":["markdown","type"],
"properties":{
"markdown":{"type":"string"},
"type":{"type":"string","enum":["chat","question","summary","decision"]}
}
},
"tasks":{
"type":"array",
"items":{
"type":"object",
"additionalProperties":false,
"required":["key","title","body_markdown","acceptance_markdown","kind","deliverables","priority","task_order","depends_on"],
"properties":{
"key":{"type":"string"},
"title":{"type":"string"},
"body_markdown":{"type":"string"},
"acceptance_markdown":{"type":"string"},
"kind":{"type":"string","enum":["execution","gate","verification","milestone"]},
"deliverables":{"type":"array","items":{"type":"string"}},
"priority":{"type":"integer"},
"task_order":{"type":"integer"},
"depends_on":{"type":"array","items":{"type":"string"}}
}
}
}
}
}`
}
return `{
"type":"object",
"additionalProperties":false,
"required":["plan_summary_markdown","plan_mode","replan_reason","execution_mode","leader_reply","tasks"],
"properties":{
"plan_summary_markdown":{"type":"string"},
"plan_mode":{"type":"string","enum":["initial","patch"]},
"replan_reason":{"type":"string"},
"execution_mode":{"type":"string","enum":["clarify","plan_only","plan_and_start"]},
"leader_reply":{
"type":"object",
"additionalProperties":false,
"required":["markdown","type"],
"properties":{
"markdown":{"type":"string"},
"type":{"type":"string","enum":["chat","question","summary","decision"]}
}
},
"tasks":{
"type":"array",
"items":{
"type":"object",
"additionalProperties":false,
"required":["key","title","body_markdown","acceptance_markdown","kind","deliverables","priority","task_order","depends_on"],
"properties":{
"key":{"type":"string"},
"title":{"type":"string"},
"body_markdown":{"type":"string"},
"acceptance_markdown":{"type":"string"},
"kind":{"type":"string","enum":["execution","gate","verification","milestone"]},
"deliverables":{"type":"array","items":{"type":"string"}},
"priority":{"type":"integer"},
"task_order":{"type":"integer"},
"depends_on":{"type":"array","items":{"type":"string"}}
}
}
}
}
}`
}
func validateLeaderOutput(output leaderOutput, initialFreeze bool) error {
switch strings.TrimSpace(output.PlanMode) {
case "initial", "patch":
default:
return fmt.Errorf("leader output has invalid plan_mode %q", output.PlanMode)
}
switch strings.TrimSpace(output.ExecutionMode) {
case "clarify", "plan_only":
case "plan_and_start":
if initialFreeze {
return fmt.Errorf("leader initial planning cannot use execution_mode %q", output.ExecutionMode)
}
default:
return fmt.Errorf("leader output has invalid execution_mode %q", output.ExecutionMode)
}
switch strings.TrimSpace(output.LeaderReply.Type) {
case "chat", "question", "summary", "decision":
default:
return fmt.Errorf("leader output has invalid leader_reply.type %q", output.LeaderReply.Type)
}
if strings.TrimSpace(output.LeaderReply.Markdown) == "" && strings.TrimSpace(output.PlanSummaryMarkdown) == "" {
return fmt.Errorf("leader output must include leader_reply.markdown or plan_summary_markdown")
}
if output.ExecutionMode == "clarify" {
if len(output.Tasks) > 0 {
return fmt.Errorf("leader clarify mode cannot create tasks")
}
return nil
}
hasGateTasks := false
seenKeys := map[string]struct{}{}
for idx, raw := range output.Tasks {
item := normalizeLeaderTaskSpec(raw, idx)
key := strings.TrimSpace(item.Key)
if key == "" {
return fmt.Errorf("leader task key is required")
}
if _, exists := seenKeys[key]; exists {
return fmt.Errorf("leader task key %q must be unique", key)
}
seenKeys[key] = struct{}{}
if strings.TrimSpace(item.Title) == "" {
return fmt.Errorf("leader task %q title is required", item.Key)
}
if strings.TrimSpace(item.BodyMarkdown) == "" {
return fmt.Errorf("leader task %q body_markdown is required", item.Key)
}
if strings.TrimSpace(item.AcceptanceMarkdown) == "" {
return fmt.Errorf("leader task %q acceptance_markdown is required", item.Key)
}
if strings.TrimSpace(item.Kind) != "milestone" && len(item.Deliverables) == 0 {
return fmt.Errorf("leader task %q must declare deliverables", item.Key)
}
switch strings.TrimSpace(item.Kind) {
case "execution", "gate", "verification", "milestone":
default:
return fmt.Errorf("leader task %q has invalid kind %q", item.Key, item.Kind)
}
if strings.TrimSpace(item.Kind) == "gate" {
hasGateTasks = true
}
}
_ = hasGateTasks
return nil
}
func mustEncodeLeaderCommand(output leaderOutput) string {
data, err := json.Marshal(map[string]any{
"mode": "leader",
"planning": output,
})
if err != nil {
return `{"mode":"leader"}`
}
return string(data)
}
func normalizeLeaderTaskSpec(spec leaderTaskSpec, index int) leaderTaskSpec {
spec.Key = strings.TrimSpace(spec.Key)
spec.Title = strings.TrimSpace(spec.Title)
spec.BodyMarkdown = strings.TrimSpace(spec.BodyMarkdown)
spec.AcceptanceMarkdown = strings.TrimSpace(spec.AcceptanceMarkdown)
spec.Kind = strings.TrimSpace(spec.Kind)
spec.Deliverables = append([]string(nil), spec.Deliverables...)
spec.DependsOn = normalizeTaskRefs(spec.DependsOn)
if spec.TaskOrder <= 0 {
spec.TaskOrder = index + 1
}
return spec
}
func mustEncodeLeaderCommandWithGraphStatus(commandJSON, graphStatus string) string {
var payload map[string]any
if err := json.Unmarshal([]byte(commandJSON), &payload); err != nil {
return commandJSON
}
payload["graph_status"] = graphStatus
data, err := json.Marshal(payload)
if err != nil {
return commandJSON
}
return string(data)
}
func (s *Service) recordTaskGraphVersion(ctx context.Context, topicID, createdBy string, output leaderOutput, freezeInitial bool) error {
if len(output.Tasks) == 0 {
return nil
}
var latest taskgraph.Record
version := 1
supersedesID := ""
if item, err := s.repo.GetLatestTaskGraphVersionByTopic(ctx, topicID); err == nil {
latest = item
version = item.Version + 1
if item.Status == taskgraph.StatusActive || item.Status == taskgraph.StatusDraft {
supersedesID = item.ID
}
} else if err != sql.ErrNoRows {
return err
}
if supersedesID != "" {
latest.Status = taskgraph.StatusSuperseded
if _, err := s.repo.UpdateTaskGraphVersion(ctx, latest); err != nil {
return err
}
}
status := taskgraph.StatusActive
if freezeInitial {
status = taskgraph.StatusDraft
}
planJSON, err := json.Marshal(output)
if err != nil {
return err
}
_, err = s.repo.CreateTaskGraphVersion(ctx, taskgraph.Record{
TopicID: topicID,
Version: version,
Status: status,
PlanJSON: string(planJSON),
PlanSummaryMarkdown: strings.TrimSpace(output.PlanSummaryMarkdown),
CreatedByRoleName: firstNonEmpty(strings.TrimSpace(createdBy), "leader"),
SupersedesGraphVersionID: supersedesID,
ConfirmedAt: confirmedAtValue(status, s.clock.Now()),
})
return err
}
func confirmedAtValue(status taskgraph.Status, now time.Time) string {
if status != taskgraph.StatusActive {
return ""
}
return timeutil.FormatRFC3339(now)
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}
type hostCodexRunner struct {
projectRoot string
}
func (r hostCodexRunner) Run(ctx context.Context, workspaceRoot string, resolved runtimeconfig.ResolvedRole, prompt, schemaJSON string) (RunResult, error) {
tempDir, err := os.MkdirTemp("", "leaderloop-*")
if err != nil {
return RunResult{}, err
}
defer os.RemoveAll(tempDir)
resultFile := filepath.Join(tempDir, "result.json")
schemaFile := filepath.Join(tempDir, "schema.json")
if err := os.WriteFile(schemaFile, []byte(schemaJSON), 0644); err != nil {
return RunResult{}, err
}
args := []string{
"exec",
"--skip-git-repo-check",
"-C", workspaceRoot,
"-s", "workspace-write",
"-o", resultFile,
"--output-schema", schemaFile,
"-",
}
cmd := exec.CommandContext(ctx, "codex", args...)
cmd.Dir = workspaceRoot
cmd.Stdin = strings.NewReader(prompt)
cmd.Env = leaderCommandEnv(r.projectRoot, resolved)
var stdout, stderr strings.Builder
cmd.Stdout = &stdout
cmd.Stderr = &stderr
runResult := RunResult{
Status: workflow.RunStatusSucceeded,
ExitCode: 0,
}
err = cmd.Run()
if err != nil {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = err.Error()
if exitErr, ok := err.(*exec.ExitError); ok {
runResult.ExitCode = exitErr.ExitCode()
}
}
runResult.StdoutLines = splitNonEmptyLines(stdout.String())
runResult.StderrLines = splitNonEmptyLines(stderr.String())
if resultBytes, readErr := os.ReadFile(resultFile); readErr == nil {
runResult.ResultJSON = strings.TrimSpace(string(resultBytes))
}
if runResult.Status == workflow.RunStatusSucceeded && !json.Valid([]byte(runResult.ResultJSON)) {
runResult.Status = workflow.RunStatusFailed
runResult.ErrorMessage = "leader returned invalid json output"
}
return runResult, nil
}
func leaderCommandEnv(projectRoot string, resolved runtimeconfig.ResolvedRole) []string {
roleHome, codexDir := leaderRuntimePaths(projectRoot, resolved)
_, _ = runtimecodex.WriteResolvedRoleHome(runtimecodex.HostLeaderRuntimeRoot(projectRoot, resolved.WorkspaceID), resolved)
env := append([]string{}, os.Environ()...)
env = append(env, "HOME="+roleHome)
env = append(env, "CODEX_HOME="+codexDir)
return env
}
func leaderRuntimePaths(projectRoot string, resolved runtimeconfig.ResolvedRole) (string, string) {
roleHome := runtimecodex.HostLeaderHomeDir(projectRoot, resolved.WorkspaceID, resolved.Role.Name)
codexDir := runtimecodex.HostLeaderCodexDir(projectRoot, resolved.WorkspaceID, resolved.Role.Name)
return roleHome, codexDir
}
func splitNonEmptyLines(value string) []string {
lines := strings.Split(value, "\n")
out := make([]string, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
out = append(out, line)
}
return out
}