1435 lines
45 KiB
Go
1435 lines
45 KiB
Go
package leaderloop
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"os"
|
||
"os/exec"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
"inbox/internal/app/runtimecodex"
|
||
"inbox/internal/app/runtimeconfig"
|
||
"inbox/internal/app/workspaceruntime"
|
||
"inbox/internal/base/slug"
|
||
"inbox/internal/base/timeutil"
|
||
"inbox/internal/domain/lane"
|
||
"inbox/internal/domain/message"
|
||
"inbox/internal/domain/task"
|
||
"inbox/internal/domain/taskgraph"
|
||
"inbox/internal/domain/topic"
|
||
"inbox/internal/domain/workflow"
|
||
"inbox/internal/domain/workspace"
|
||
)
|
||
|
||
const (
|
||
defaultPollInterval = 2 * time.Second
|
||
defaultLease = 2 * time.Minute
|
||
)
|
||
|
||
type Repository interface {
|
||
ListWorkspaces(ctx context.Context, projectID string) ([]workspace.Workspace, error)
|
||
ClaimNextDelivery(ctx context.Context, workspaceID string, roleNames []string, staleBefore string) (message.DeliveryClaim, error)
|
||
GetTopic(ctx context.Context, topicID string) (topic.Record, error)
|
||
UpdateTopic(ctx context.Context, value topic.Record) (topic.Record, error)
|
||
ListMessagesByTopic(ctx context.Context, topicID string) ([]message.Record, error)
|
||
CreateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error)
|
||
UpdateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error)
|
||
AppendWorkflowRunLog(ctx context.Context, value workflow.RunLog) (workflow.RunLog, error)
|
||
ArchiveDelivery(ctx context.Context, messageID, roleName string) error
|
||
CreateMessage(ctx context.Context, value message.Record) (message.Record, error)
|
||
ListLanesByTopic(ctx context.Context, topicID string) ([]lane.Record, error)
|
||
CreateLane(ctx context.Context, value lane.Record) (lane.Record, error)
|
||
ListTasksByTopic(ctx context.Context, topicID string) ([]task.Record, error)
|
||
CreateTask(ctx context.Context, value task.Record, dependencies []task.Dependency) (task.Record, error)
|
||
CreateTaskGraphVersion(ctx context.Context, value taskgraph.Record) (taskgraph.Record, error)
|
||
GetLatestTaskGraphVersionByTopic(ctx context.Context, topicID string) (taskgraph.Record, error)
|
||
UpdateTaskGraphVersion(ctx context.Context, value taskgraph.Record) (taskgraph.Record, error)
|
||
}
|
||
|
||
type RuntimeResolver interface {
|
||
ResolveRole(ctx context.Context, workspaceID, roleName string) (runtimeconfig.ResolvedRole, error)
|
||
}
|
||
|
||
type WorkspaceRuntime interface {
|
||
Ensure(ctx context.Context, workspaceID string) (workspace.Workspace, workspaceruntime.Runtime, error)
|
||
EnsureLane(ctx context.Context, laneID string) (lane.Record, error)
|
||
}
|
||
|
||
type CodexRunner interface {
|
||
Run(ctx context.Context, workspaceRoot string, resolved runtimeconfig.ResolvedRole, prompt, schemaJSON string) (RunResult, error)
|
||
}
|
||
|
||
type RunResult struct {
|
||
ExitCode int
|
||
Status workflow.RunStatus
|
||
ResultJSON string
|
||
ErrorMessage string
|
||
StdoutLines []string
|
||
StderrLines []string
|
||
}
|
||
|
||
type Service struct {
|
||
repo Repository
|
||
resolver RuntimeResolver
|
||
runtime WorkspaceRuntime
|
||
runner CodexRunner
|
||
projectRoot string
|
||
clock timeutil.Clock
|
||
pollInterval time.Duration
|
||
lease time.Duration
|
||
}
|
||
|
||
type Result struct {
|
||
Claim message.DeliveryClaim
|
||
Run workflow.Run
|
||
Reply *message.Record
|
||
Lanes []lane.Record
|
||
Tasks []task.Record
|
||
}
|
||
|
||
func NewService(repo Repository, resolver RuntimeResolver, runtime WorkspaceRuntime, runner CodexRunner, clock timeutil.Clock, projectRoot string) *Service {
|
||
if clock == nil {
|
||
clock = timeutil.SystemClock{}
|
||
}
|
||
if runner == nil {
|
||
runner = hostCodexRunner{projectRoot: strings.TrimSpace(projectRoot)}
|
||
}
|
||
return &Service{
|
||
repo: repo,
|
||
resolver: resolver,
|
||
runtime: runtime,
|
||
runner: runner,
|
||
projectRoot: strings.TrimSpace(projectRoot),
|
||
clock: clock,
|
||
pollInterval: defaultPollInterval,
|
||
lease: defaultLease,
|
||
}
|
||
}
|
||
|
||
func (s *Service) Run(ctx context.Context) error {
|
||
ticker := time.NewTicker(s.pollInterval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
|
||
if _, err := s.ProcessOnce(ctx); err != nil && err != context.Canceled {
|
||
// Keep the daemon alive; failures are persisted into workflow runs and reply messages.
|
||
}
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-ticker.C:
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Service) ProcessOnce(ctx context.Context) (*Result, error) {
|
||
workspaces, err := s.repo.ListWorkspaces(ctx, "")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
staleBefore := timeutil.FormatRFC3339(s.clock.Now().Add(-s.lease))
|
||
for _, ws := range workspaces {
|
||
claim, err := s.repo.ClaimNextDelivery(ctx, ws.ID, []string{"leader"}, staleBefore)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
result, processErr := s.processClaim(ctx, claim)
|
||
if processErr != nil {
|
||
return nil, processErr
|
||
}
|
||
return result, nil
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
func (s *Service) processClaim(ctx context.Context, claim message.DeliveryClaim) (*Result, error) {
|
||
ws, _, err := s.runtime.Ensure(ctx, claim.Message.WorkspaceID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
topicRecord, err := s.repo.GetTopic(ctx, claim.Message.TopicID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if strings.TrimSpace(topicRecord.Status) == "cancelled" {
|
||
if err := s.repo.ArchiveDelivery(ctx, claim.Message.ID, claim.RecipientRoleName); err != nil {
|
||
return nil, err
|
||
}
|
||
return &Result{Claim: claim}, nil
|
||
}
|
||
messages, err := s.repo.ListMessagesByTopic(ctx, topicRecord.ID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
existingLanes, err := s.repo.ListLanesByTopic(ctx, topicRecord.ID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
existingTasks, err := s.repo.ListTasksByTopic(ctx, topicRecord.ID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
shouldFreezeInitialPlan := len(existingLanes) == 0 &&
|
||
len(existingTasks) == 0 &&
|
||
claim.Message.FromRoleName == "user"
|
||
if shouldSkipLeaderPlanning(claim.Message, topicRecord, existingLanes, existingTasks) {
|
||
if err := s.repo.ArchiveDelivery(ctx, claim.Message.ID, claim.RecipientRoleName); err != nil {
|
||
return nil, err
|
||
}
|
||
return &Result{Claim: claim}, nil
|
||
}
|
||
resolved, err := s.resolver.ResolveRole(ctx, claim.Message.WorkspaceID, "leader")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
snapshot, err := resolved.SnapshotJSON()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
run, err := s.repo.CreateWorkflowRun(ctx, workflow.Run{
|
||
WorkspaceID: claim.Message.WorkspaceID,
|
||
TopicID: claim.Message.TopicID,
|
||
RoleName: "leader",
|
||
Stage: workflow.StagePlan,
|
||
Mode: "message",
|
||
Status: workflow.RunStatusRunning,
|
||
RequestMessageID: claim.Message.ID,
|
||
ConfigSnapshotJSON: snapshot,
|
||
CommandJSON: `{"mode":"leader"}`,
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
prompt := buildLeaderPrompt(resolved, topicRecord, claim.Message, messages, existingLanes, existingTasks, shouldFreezeInitialPlan)
|
||
runResult, runErr := s.runner.Run(ctx, ws.RootPath, resolved, prompt, leaderOutputSchema(shouldFreezeInitialPlan))
|
||
if runErr != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = strings.TrimSpace(runErr.Error())
|
||
}
|
||
for _, line := range runResult.StdoutLines {
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
_, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{
|
||
RunID: run.ID,
|
||
Stream: workflow.LogStreamStdout,
|
||
Content: line,
|
||
})
|
||
}
|
||
for _, line := range runResult.StderrLines {
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
_, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{
|
||
RunID: run.ID,
|
||
Stream: workflow.LogStreamStderr,
|
||
Content: line,
|
||
})
|
||
}
|
||
|
||
var reply *message.Record
|
||
var createdLanes []lane.Record
|
||
var createdTasks []task.Record
|
||
graphVersionStatus := taskgraph.StatusActive
|
||
|
||
if runResult.Status == workflow.RunStatusSucceeded {
|
||
output, err := parseLeaderOutput(runResult.ResultJSON, shouldFreezeInitialPlan)
|
||
if err != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = err.Error()
|
||
} else {
|
||
freezeInitialGraph := shouldFreezeInitialPlan && len(output.Tasks) > 0
|
||
latestTopic, latestTopicErr := s.repo.GetTopic(ctx, claim.Message.TopicID)
|
||
if latestTopicErr != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = latestTopicErr.Error()
|
||
} else if strings.TrimSpace(latestTopic.Status) == "cancelled" {
|
||
runResult.Status = workflow.RunStatusCancelled
|
||
runResult.ExitCode = 130
|
||
runResult.ErrorMessage = "Topic was stopped before leader output could be applied."
|
||
} else {
|
||
if err := validateLeaderOutputForTopic(output, existingLanes, existingTasks); err != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = err.Error()
|
||
} else {
|
||
run.CommandJSON = mustEncodeLeaderCommand(output)
|
||
reply, createdLanes, createdTasks, err = s.applyLeaderOutput(ctx, ws, claim.Message, output)
|
||
if err != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = err.Error()
|
||
} else {
|
||
if err := s.recordTaskGraphVersion(ctx, claim.Message.TopicID, claim.Message.FromRoleName, output, freezeInitialGraph); err != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = err.Error()
|
||
} else if freezeInitialGraph {
|
||
topicRecord.Status = "awaiting_confirmation"
|
||
if _, err := s.repo.UpdateTopic(ctx, topicRecord); err != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = err.Error()
|
||
} else {
|
||
graphVersionStatus = taskgraph.StatusDraft
|
||
}
|
||
} else if output.ExecutionMode == "plan_and_start" {
|
||
autoStartLanes, startErr := s.resolveAutoStartLanes(ctx, claim.Message.TopicID, map[string]struct{}{})
|
||
if startErr != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = startErr.Error()
|
||
} else {
|
||
for idx := range autoStartLanes {
|
||
started, ensureErr := s.runtime.EnsureLane(ctx, autoStartLanes[idx].ID)
|
||
if ensureErr != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = ensureErr.Error()
|
||
break
|
||
}
|
||
for createdIdx := range createdLanes {
|
||
if createdLanes[createdIdx].ID == started.ID {
|
||
createdLanes[createdIdx] = started
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if reply == nil && runResult.Status == workflow.RunStatusFailed {
|
||
failureBody := strings.TrimSpace(runResult.ErrorMessage)
|
||
if failureBody == "" {
|
||
failureBody = "Leader execution failed without an explicit error message."
|
||
}
|
||
item, msgErr := s.repo.CreateMessage(ctx, message.Record{
|
||
WorkspaceID: claim.Message.WorkspaceID,
|
||
TopicID: claim.Message.TopicID,
|
||
FromRoleName: "leader",
|
||
ToExpr: claim.Message.FromRoleName,
|
||
Type: message.TypeSummary,
|
||
Stage: claim.Message.Stage,
|
||
ReplyToMessageID: claim.Message.ID,
|
||
BodyMarkdown: "Leader execution failed.\n\n" + failureBody,
|
||
CreatedAt: timeutil.FormatRFC3339(s.clock.Now()),
|
||
})
|
||
if msgErr == nil {
|
||
reply = &item
|
||
}
|
||
}
|
||
|
||
run.Status = runResult.Status
|
||
run.ExitCode = runResult.ExitCode
|
||
run.ErrorMessage = strings.TrimSpace(runResult.ErrorMessage)
|
||
run.CompletedAt = timeutil.FormatRFC3339(s.clock.Now())
|
||
if shouldFreezeInitialPlan && run.Status == workflow.RunStatusSucceeded && graphVersionStatus == taskgraph.StatusDraft {
|
||
run.CommandJSON = mustEncodeLeaderCommandWithGraphStatus(run.CommandJSON, "draft")
|
||
}
|
||
if reply != nil {
|
||
run.ReplyMessageID = reply.ID
|
||
}
|
||
run, err = s.repo.UpdateWorkflowRun(ctx, run)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if err := s.repo.ArchiveDelivery(ctx, claim.Message.ID, claim.RecipientRoleName); err != nil {
|
||
return nil, err
|
||
}
|
||
return &Result{
|
||
Claim: claim,
|
||
Run: run,
|
||
Reply: reply,
|
||
Lanes: createdLanes,
|
||
Tasks: createdTasks,
|
||
}, nil
|
||
}
|
||
|
||
func shouldSkipLeaderPlanning(msg message.Record, record topic.Record, lanes []lane.Record, tasks []task.Record) bool {
|
||
if msg.FromRoleName != "worker" {
|
||
return false
|
||
}
|
||
switch strings.TrimSpace(record.Status) {
|
||
case "awaiting_confirmation", "cancelled", "completed":
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
func hasRunningTask(items []task.Record) bool {
|
||
for _, item := range items {
|
||
if item.Status == task.StatusRunning {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func hasRunningLane(items []lane.Record) bool {
|
||
for _, item := range items {
|
||
if item.Status == lane.StatusRunning {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func hasFailedOrBlockedGraph(lanes []lane.Record, tasks []task.Record) bool {
|
||
for _, item := range lanes {
|
||
if item.Status == lane.StatusBlocked || item.Status == lane.StatusFailed {
|
||
return true
|
||
}
|
||
}
|
||
for _, item := range tasks {
|
||
if item.Status == task.StatusFailed || item.Status == task.StatusBlocked {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func allTasksTerminal(items []task.Record) bool {
|
||
if len(items) == 0 {
|
||
return false
|
||
}
|
||
for _, item := range items {
|
||
switch item.Status {
|
||
case task.StatusSucceeded, task.StatusFailed, task.StatusCancelled:
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
type leaderOutput struct {
|
||
PlanSummaryMarkdown string `json:"plan_summary_markdown"`
|
||
PlanMode string `json:"plan_mode"`
|
||
ReplanReason string `json:"replan_reason"`
|
||
ExecutionMode string `json:"execution_mode"`
|
||
LeaderReply leaderReplySpec `json:"leader_reply"`
|
||
Tasks []leaderTaskSpec `json:"tasks"`
|
||
}
|
||
|
||
type leaderReplySpec struct {
|
||
Markdown string `json:"markdown"`
|
||
Type string `json:"type"`
|
||
}
|
||
|
||
type leaderTaskSpec struct {
|
||
Key string `json:"key"`
|
||
Title string `json:"title"`
|
||
BodyMarkdown string `json:"body_markdown"`
|
||
AcceptanceMarkdown string `json:"acceptance_markdown"`
|
||
Kind string `json:"kind"`
|
||
Deliverables []string `json:"deliverables"`
|
||
Priority int `json:"priority"`
|
||
TaskOrder int `json:"task_order"`
|
||
DependsOn []string `json:"depends_on"`
|
||
}
|
||
|
||
func parseLeaderOutput(raw string, initialFreeze bool) (leaderOutput, error) {
|
||
var output leaderOutput
|
||
if strings.TrimSpace(raw) == "" {
|
||
return output, fmt.Errorf("leader returned empty output")
|
||
}
|
||
decoder := json.NewDecoder(bytes.NewReader([]byte(raw)))
|
||
decoder.DisallowUnknownFields()
|
||
if err := decoder.Decode(&output); err != nil {
|
||
return output, fmt.Errorf("decode leader output: %w", err)
|
||
}
|
||
if err := validateLeaderOutput(output, initialFreeze); err != nil {
|
||
return output, err
|
||
}
|
||
return output, nil
|
||
}
|
||
|
||
func validateLeaderOutputForTopic(output leaderOutput, existingLanes []lane.Record, existingTasks []task.Record) error {
|
||
hasExistingGraph := len(existingLanes) > 0 || len(existingTasks) > 0
|
||
switch strings.TrimSpace(output.PlanMode) {
|
||
case "initial":
|
||
if hasExistingGraph {
|
||
return fmt.Errorf("leader must use plan_mode=patch when topic already has lanes or tasks")
|
||
}
|
||
case "patch":
|
||
if !hasExistingGraph {
|
||
return fmt.Errorf("leader cannot use plan_mode=patch on an empty topic graph")
|
||
}
|
||
if strings.TrimSpace(output.ReplanReason) == "" {
|
||
return fmt.Errorf("leader patch mode requires replan_reason")
|
||
}
|
||
default:
|
||
return fmt.Errorf("leader output has invalid plan_mode %q", output.PlanMode)
|
||
}
|
||
return validateLeaderDependencyKeys(output, existingTasks)
|
||
}
|
||
|
||
func validateLeaderDependencyKeys(output leaderOutput, existingTasks []task.Record) error {
|
||
taskKeys := make(map[string]struct{}, len(output.Tasks)+len(existingTasks))
|
||
for _, item := range existingTasks {
|
||
taskKeys[item.ID] = struct{}{}
|
||
}
|
||
for _, item := range output.Tasks {
|
||
taskKeys[strings.TrimSpace(item.Key)] = struct{}{}
|
||
}
|
||
for _, item := range output.Tasks {
|
||
for _, dep := range item.DependsOn {
|
||
if _, ok := taskKeys[strings.TrimSpace(dep)]; !ok {
|
||
return fmt.Errorf("leader task %q references unknown dependency key %q", item.Key, dep)
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *Service) applyLeaderOutput(ctx context.Context, ws workspace.Workspace, source message.Record, output leaderOutput) (*message.Record, []lane.Record, []task.Record, error) {
|
||
var reply *message.Record
|
||
replyBody := strings.TrimSpace(output.LeaderReply.Markdown)
|
||
if replyBody == "" {
|
||
replyBody = strings.TrimSpace(output.PlanSummaryMarkdown)
|
||
}
|
||
if replyBody != "" {
|
||
replyType := message.Type(strings.TrimSpace(output.LeaderReply.Type))
|
||
if replyType == "" {
|
||
replyType = message.TypeSummary
|
||
}
|
||
item, err := s.repo.CreateMessage(ctx, message.Record{
|
||
WorkspaceID: source.WorkspaceID,
|
||
TopicID: source.TopicID,
|
||
FromRoleName: "leader",
|
||
ToExpr: source.FromRoleName,
|
||
Type: replyType,
|
||
Stage: source.Stage,
|
||
ReplyToMessageID: source.ID,
|
||
BodyMarkdown: replyBody,
|
||
CreatedAt: timeutil.FormatRFC3339(s.clock.Now()),
|
||
})
|
||
if err != nil {
|
||
return nil, nil, nil, err
|
||
}
|
||
reply = &item
|
||
}
|
||
|
||
existingLanes, err := s.repo.ListLanesByTopic(ctx, source.TopicID)
|
||
if err != nil {
|
||
return reply, nil, nil, err
|
||
}
|
||
existingTasks, err := s.repo.ListTasksByTopic(ctx, source.TopicID)
|
||
if err != nil {
|
||
return reply, nil, nil, err
|
||
}
|
||
usedLaneSlugs := map[string]struct{}{}
|
||
laneByID := map[string]lane.Record{}
|
||
for _, item := range existingLanes {
|
||
usedLaneSlugs[item.Slug] = struct{}{}
|
||
laneByID[item.ID] = item
|
||
}
|
||
createdLanes := make([]lane.Record, 0)
|
||
reducedDeps := reduceLeaderTaskDependencies(output.Tasks)
|
||
orderedTasks, err := topoSortLeaderTasks(output.Tasks, reducedDeps)
|
||
if err != nil {
|
||
return reply, nil, nil, err
|
||
}
|
||
for idx := range orderedTasks {
|
||
orderedTasks[idx] = normalizeLeaderTaskSpec(orderedTasks[idx], idx)
|
||
}
|
||
reducedDeps = reduceLeaderTaskDependencies(orderedTasks)
|
||
childCounts := countLeaderTaskChildren(reducedDeps)
|
||
laneSeeds := deriveLeaderTaskLaneSeeds(orderedTasks, reducedDeps, childCounts)
|
||
taskSpecByKey := make(map[string]leaderTaskSpec, len(orderedTasks))
|
||
for _, item := range orderedTasks {
|
||
taskSpecByKey[strings.TrimSpace(item.Key)] = item
|
||
}
|
||
taskKeyToID := map[string]string{}
|
||
for _, existing := range existingTasks {
|
||
taskKeyToID[existing.ID] = existing.ID
|
||
}
|
||
seedToLaneID := map[string]string{}
|
||
createdTasks := make([]task.Record, 0, len(orderedTasks))
|
||
hasGateTasks := false
|
||
for _, spec := range orderedTasks {
|
||
if strings.TrimSpace(spec.Kind) == "gate" {
|
||
hasGateTasks = true
|
||
break
|
||
}
|
||
}
|
||
|
||
for _, spec := range orderedTasks {
|
||
seed := laneSeeds[strings.TrimSpace(spec.Key)]
|
||
if seed == "" {
|
||
return reply, createdLanes, createdTasks, fmt.Errorf("missing derived lane seed for task %q", spec.Key)
|
||
}
|
||
laneSpec := laneSpecForSeed(taskSpecByKey, seed, spec)
|
||
laneRecord, laneCreated, err := s.ensureDerivedLane(ctx, ws, source, seed, laneSpec, usedLaneSlugs, seedToLaneID, laneByID)
|
||
if err != nil {
|
||
return reply, createdLanes, createdTasks, err
|
||
}
|
||
if laneCreated != nil {
|
||
createdLanes = append(createdLanes, *laneCreated)
|
||
laneByID[laneCreated.ID] = *laneCreated
|
||
}
|
||
status := task.StatusDraft
|
||
if len(reducedDeps[strings.TrimSpace(spec.Key)]) == 0 {
|
||
status = task.StatusReady
|
||
}
|
||
if hasGateTasks && strings.TrimSpace(spec.Kind) != "gate" {
|
||
status = task.StatusDraft
|
||
}
|
||
item, err := s.repo.CreateTask(ctx, task.Record{
|
||
WorkspaceID: source.WorkspaceID,
|
||
TopicID: source.TopicID,
|
||
LaneID: laneRecord.ID,
|
||
Title: spec.Title,
|
||
BodyMarkdown: spec.BodyMarkdown,
|
||
AcceptanceMarkdown: strings.TrimSpace(spec.AcceptanceMarkdown),
|
||
Kind: task.Kind(strings.TrimSpace(spec.Kind)),
|
||
Deliverables: append([]string(nil), spec.Deliverables...),
|
||
Status: status,
|
||
Priority: spec.Priority,
|
||
TaskOrder: spec.TaskOrder,
|
||
CreatedByRoleName: "leader",
|
||
}, nil)
|
||
if err != nil {
|
||
return reply, createdLanes, createdTasks, err
|
||
}
|
||
key := strings.TrimSpace(spec.Key)
|
||
if key == "" {
|
||
key = item.ID
|
||
}
|
||
taskKeyToID[key] = item.ID
|
||
createdTasks = append(createdTasks, item)
|
||
}
|
||
|
||
for idx, spec := range orderedTasks {
|
||
depRefs := reducedDeps[strings.TrimSpace(spec.Key)]
|
||
if len(depRefs) == 0 {
|
||
continue
|
||
}
|
||
deps := make([]task.Dependency, 0, len(depRefs))
|
||
for _, key := range depRefs {
|
||
depID, ok := taskKeyToID[strings.TrimSpace(key)]
|
||
if !ok {
|
||
return reply, createdLanes, createdTasks, fmt.Errorf("leader output references unknown task dependency key %q", key)
|
||
}
|
||
deps = append(deps, task.Dependency{DependsOnTaskID: depID})
|
||
}
|
||
updated, err := s.setTaskDependencies(ctx, createdTasks[idx], deps)
|
||
if err != nil {
|
||
return reply, createdLanes, createdTasks, err
|
||
}
|
||
createdTasks[idx] = updated
|
||
}
|
||
|
||
return reply, createdLanes, createdTasks, nil
|
||
}
|
||
|
||
func (s *Service) ensureDerivedLane(
|
||
ctx context.Context,
|
||
ws workspace.Workspace,
|
||
source message.Record,
|
||
seed string,
|
||
spec leaderTaskSpec,
|
||
usedLaneSlugs map[string]struct{},
|
||
seedToLaneID map[string]string,
|
||
laneByID map[string]lane.Record,
|
||
) (lane.Record, *lane.Record, error) {
|
||
if laneID, ok := seedToLaneID[seed]; ok {
|
||
if laneRecord, exists := laneByID[laneID]; exists {
|
||
return laneRecord, nil, nil
|
||
}
|
||
}
|
||
|
||
baseName := strings.TrimSpace(spec.Title)
|
||
if baseName == "" {
|
||
baseName = "Execution lane"
|
||
}
|
||
baseSlug := slug.Normalize(baseName)
|
||
if baseSlug == "" {
|
||
baseSlug = "lane"
|
||
}
|
||
uniqueSlug := baseSlug
|
||
for idx := 2; ; idx++ {
|
||
if _, ok := usedLaneSlugs[uniqueSlug]; !ok {
|
||
break
|
||
}
|
||
uniqueSlug = fmt.Sprintf("%s-%d", baseSlug, idx)
|
||
}
|
||
usedLaneSlugs[uniqueSlug] = struct{}{}
|
||
|
||
item, err := s.repo.CreateLane(ctx, lane.Record{
|
||
WorkspaceID: source.WorkspaceID,
|
||
TopicID: source.TopicID,
|
||
Name: baseName,
|
||
Slug: uniqueSlug,
|
||
Purpose: strings.TrimSpace(spec.AcceptanceMarkdown),
|
||
Status: lane.StatusReady,
|
||
BranchName: lane.DefaultBranchName(ws.Slug, source.TopicID, uniqueSlug),
|
||
WorktreePath: lane.DefaultWorktreePath(ws.RootPath, ws.Slug, source.TopicID, uniqueSlug),
|
||
ContainerName: lane.DefaultContainerName(ws.Slug, source.TopicID, uniqueSlug),
|
||
CreatedByRoleName: "leader",
|
||
ResultSummaryMarkdown: strings.TrimSpace(spec.BodyMarkdown),
|
||
})
|
||
if err != nil {
|
||
return lane.Record{}, nil, err
|
||
}
|
||
seedToLaneID[seed] = item.ID
|
||
return item, &item, nil
|
||
}
|
||
|
||
func deriveLeaderTaskLaneSeeds(tasks []leaderTaskSpec, depsByKey map[string][]string, childCounts map[string]int) map[string]string {
|
||
childrenByKey := make(map[string][]string, len(tasks))
|
||
specByKey := make(map[string]leaderTaskSpec, len(tasks))
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
specByKey[key] = spec
|
||
for _, dep := range depsByKey[key] {
|
||
childrenByKey[dep] = append(childrenByKey[dep], key)
|
||
}
|
||
}
|
||
seedByTask := make(map[string]string, len(tasks))
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
deps := depsByKey[key]
|
||
switch len(deps) {
|
||
case 0:
|
||
seedByTask[key] = key
|
||
case 1:
|
||
parent := deps[0]
|
||
if childCounts[parent] <= 1 {
|
||
if parentSeed := seedByTask[parent]; parentSeed != "" {
|
||
seedByTask[key] = parentSeed
|
||
break
|
||
}
|
||
}
|
||
seedByTask[key] = key
|
||
default:
|
||
seedByTask[key] = key
|
||
}
|
||
}
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
if !isNonCodeLeaderTask(spec.Kind) {
|
||
continue
|
||
}
|
||
if sharedSeed := firstDependencySeed(seedByTask, depsByKey[key]); sharedSeed != "" {
|
||
seedByTask[key] = sharedSeed
|
||
continue
|
||
}
|
||
if sharedSeed := firstDescendantCodeSeed(seedByTask, specByKey, childrenByKey, key, map[string]struct{}{}); sharedSeed != "" {
|
||
seedByTask[key] = sharedSeed
|
||
continue
|
||
}
|
||
if sharedSeed := firstCodeTaskSeedInOrder(tasks, seedByTask); sharedSeed != "" {
|
||
seedByTask[key] = sharedSeed
|
||
}
|
||
}
|
||
return seedByTask
|
||
}
|
||
|
||
func laneSpecForSeed(specByKey map[string]leaderTaskSpec, seed string, fallback leaderTaskSpec) leaderTaskSpec {
|
||
if spec, ok := specByKey[strings.TrimSpace(seed)]; ok {
|
||
return spec
|
||
}
|
||
return fallback
|
||
}
|
||
|
||
func isNonCodeLeaderTask(kind string) bool {
|
||
switch strings.TrimSpace(kind) {
|
||
case "gate", "milestone":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func firstDependencySeed(seedByTask map[string]string, deps []string) string {
|
||
for _, dep := range deps {
|
||
if seed := strings.TrimSpace(seedByTask[strings.TrimSpace(dep)]); seed != "" {
|
||
return seed
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
func firstDescendantCodeSeed(
|
||
seedByTask map[string]string,
|
||
specByKey map[string]leaderTaskSpec,
|
||
childrenByKey map[string][]string,
|
||
key string,
|
||
seen map[string]struct{},
|
||
) string {
|
||
key = strings.TrimSpace(key)
|
||
if key == "" {
|
||
return ""
|
||
}
|
||
if _, ok := seen[key]; ok {
|
||
return ""
|
||
}
|
||
seen[key] = struct{}{}
|
||
for _, child := range childrenByKey[key] {
|
||
child = strings.TrimSpace(child)
|
||
if child == "" {
|
||
continue
|
||
}
|
||
childSpec, ok := specByKey[child]
|
||
if ok && !isNonCodeLeaderTask(childSpec.Kind) {
|
||
if seed := strings.TrimSpace(seedByTask[child]); seed != "" {
|
||
return seed
|
||
}
|
||
}
|
||
if seed := firstDescendantCodeSeed(seedByTask, specByKey, childrenByKey, child, seen); seed != "" {
|
||
return seed
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
func firstCodeTaskSeedInOrder(tasks []leaderTaskSpec, seedByTask map[string]string) string {
|
||
for _, item := range tasks {
|
||
if isNonCodeLeaderTask(item.Kind) {
|
||
continue
|
||
}
|
||
if seed := strings.TrimSpace(seedByTask[strings.TrimSpace(item.Key)]); seed != "" {
|
||
return seed
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
func reduceLeaderTaskDependencies(tasks []leaderTaskSpec) map[string][]string {
|
||
depsByKey := make(map[string][]string, len(tasks))
|
||
taskSet := make(map[string]struct{}, len(tasks))
|
||
children := map[string][]string{}
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
if key == "" {
|
||
continue
|
||
}
|
||
taskSet[key] = struct{}{}
|
||
normalized := normalizeTaskRefs(spec.DependsOn)
|
||
depsByKey[key] = normalized
|
||
for _, dep := range normalized {
|
||
children[dep] = append(children[dep], key)
|
||
}
|
||
}
|
||
|
||
reachable := map[string]map[string]struct{}{}
|
||
var visit func(string) map[string]struct{}
|
||
visit = func(key string) map[string]struct{} {
|
||
if cached, ok := reachable[key]; ok {
|
||
return cached
|
||
}
|
||
out := map[string]struct{}{}
|
||
for _, child := range children[key] {
|
||
out[child] = struct{}{}
|
||
for grandChild := range visit(child) {
|
||
out[grandChild] = struct{}{}
|
||
}
|
||
}
|
||
reachable[key] = out
|
||
return out
|
||
}
|
||
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
normalized := depsByKey[key]
|
||
if len(normalized) < 2 {
|
||
continue
|
||
}
|
||
reduced := make([]string, 0, len(normalized))
|
||
for _, dep := range normalized {
|
||
redundant := false
|
||
for _, other := range normalized {
|
||
if dep == other {
|
||
continue
|
||
}
|
||
if _, ok := taskSet[other]; !ok {
|
||
continue
|
||
}
|
||
if descendants := visit(dep); descendants != nil {
|
||
if _, ok := descendants[other]; ok {
|
||
redundant = true
|
||
break
|
||
}
|
||
}
|
||
}
|
||
if !redundant {
|
||
reduced = append(reduced, dep)
|
||
}
|
||
}
|
||
depsByKey[key] = reduced
|
||
}
|
||
return depsByKey
|
||
}
|
||
|
||
func topoSortLeaderTasks(tasks []leaderTaskSpec, depsByKey map[string][]string) ([]leaderTaskSpec, error) {
|
||
byKey := make(map[string]leaderTaskSpec, len(tasks))
|
||
inDegree := make(map[string]int, len(tasks))
|
||
children := make(map[string][]string, len(tasks))
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
byKey[key] = spec
|
||
inDegree[key] = 0
|
||
}
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
for _, dep := range depsByKey[key] {
|
||
if _, ok := byKey[dep]; !ok {
|
||
continue
|
||
}
|
||
inDegree[key]++
|
||
children[dep] = append(children[dep], key)
|
||
}
|
||
}
|
||
queue := make([]string, 0, len(tasks))
|
||
for _, spec := range tasks {
|
||
key := strings.TrimSpace(spec.Key)
|
||
if inDegree[key] == 0 {
|
||
queue = append(queue, key)
|
||
}
|
||
}
|
||
ordered := make([]leaderTaskSpec, 0, len(tasks))
|
||
for len(queue) > 0 {
|
||
key := queue[0]
|
||
queue = queue[1:]
|
||
ordered = append(ordered, byKey[key])
|
||
for _, child := range children[key] {
|
||
inDegree[child]--
|
||
if inDegree[child] == 0 {
|
||
queue = append(queue, child)
|
||
}
|
||
}
|
||
}
|
||
if len(ordered) != len(tasks) {
|
||
return nil, fmt.Errorf("leader task graph contains a cycle or unresolved dependency ordering")
|
||
}
|
||
return ordered, nil
|
||
}
|
||
|
||
func countLeaderTaskChildren(depsByKey map[string][]string) map[string]int {
|
||
counts := map[string]int{}
|
||
for _, deps := range depsByKey {
|
||
for _, dep := range deps {
|
||
counts[dep]++
|
||
}
|
||
}
|
||
return counts
|
||
}
|
||
|
||
func normalizeTaskRefs(values []string) []string {
|
||
seen := map[string]struct{}{}
|
||
out := make([]string, 0, len(values))
|
||
for _, value := range values {
|
||
trimmed := strings.TrimSpace(value)
|
||
if trimmed == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[trimmed]; ok {
|
||
continue
|
||
}
|
||
seen[trimmed] = struct{}{}
|
||
out = append(out, trimmed)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func (s *Service) resolveAutoStartLanes(ctx context.Context, topicID string, exclude map[string]struct{}) ([]lane.Record, error) {
|
||
tasksByTopic, err := s.repo.ListTasksByTopic(ctx, topicID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if hasUnresolvedGateTasks(tasksByTopic) {
|
||
return nil, nil
|
||
}
|
||
lanesByTopic, err := s.repo.ListLanesByTopic(ctx, topicID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
tasksByLane := make(map[string][]task.Record)
|
||
for _, item := range tasksByTopic {
|
||
tasksByLane[item.LaneID] = append(tasksByLane[item.LaneID], item)
|
||
}
|
||
out := make([]lane.Record, 0)
|
||
for _, item := range lanesByTopic {
|
||
if _, skip := exclude[item.ID]; skip {
|
||
continue
|
||
}
|
||
if item.Status != lane.StatusReady {
|
||
continue
|
||
}
|
||
if !hasReadyTask(tasksByLane[item.ID]) {
|
||
continue
|
||
}
|
||
out = append(out, item)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func hasUnresolvedGateTasks(items []task.Record) bool {
|
||
for _, item := range items {
|
||
if item.Kind != task.KindGate {
|
||
continue
|
||
}
|
||
switch item.Status {
|
||
case task.StatusSucceeded, task.StatusCancelled:
|
||
continue
|
||
default:
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func hasReadyTask(items []task.Record) bool {
|
||
for _, item := range items {
|
||
if item.Status == task.StatusReady {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func (s *Service) setTaskDependencies(ctx context.Context, item task.Record, deps []task.Dependency) (task.Record, error) {
|
||
for idx := range deps {
|
||
deps[idx].TaskID = item.ID
|
||
}
|
||
txRepo, ok := s.repo.(interface {
|
||
UpdateTaskWithDependencies(ctx context.Context, value task.Record, dependencies *[]task.Dependency) (task.Record, error)
|
||
})
|
||
if !ok {
|
||
return item, fmt.Errorf("leaderloop repository cannot update task dependencies")
|
||
}
|
||
return txRepo.UpdateTaskWithDependencies(ctx, item, &deps)
|
||
}
|
||
|
||
const defaultLeaderSystemPrompt = `你是 leader。
|
||
|
||
你运行在宿主机上,是当前主题唯一的编排者。你负责与用户沟通、澄清目标、维护 task graph、启动执行,并汇总最终结果。lane 由系统从任务图自动派生。
|
||
|
||
规则:
|
||
- 先理解用户目标,再拆分任务;不要直接跳过澄清。
|
||
- 所有 task 都必须明确目标、依赖、验收标准。
|
||
- worker 的问题和总结都只作为编排输入,不要把它们直接当作完成证明。
|
||
- worker 只能向你升级阻塞;是否打扰用户由你决定。
|
||
- 你可以创建、更新、启动、停止 lane 与 task,但不要代替 worker 在容器里完成实现。`
|
||
|
||
func buildLeaderPrompt(resolved runtimeconfig.ResolvedRole, record topic.Record, msg message.Record, messages []message.Record, lanes []lane.Record, tasks []task.Record, initialFreeze bool) string {
|
||
var builder strings.Builder
|
||
builder.WriteString("## Role\n")
|
||
builder.WriteString(runtimeconfig.RenderInstructions(resolved, defaultLeaderSystemPrompt))
|
||
builder.WriteString("\n\n## Planning Contract\n")
|
||
builder.WriteString("规则:\n")
|
||
builder.WriteString("- 不要直接编写实现代码。\n")
|
||
builder.WriteString("- 如果信息不足,优先回复用户澄清,不要盲目创建 lane/task。\n")
|
||
if initialFreeze {
|
||
builder.WriteString("- 当前是初始建图阶段。只输出结构化 JSON:plan_mode、execution_mode、plan_summary_markdown、leader_reply、tasks。\n")
|
||
} else {
|
||
builder.WriteString("- 一旦需要调整任务图,只输出结构化 JSON:plan_mode、execution_mode、plan_summary_markdown、leader_reply、tasks。系统会自动从任务图派生 lanes。\n")
|
||
}
|
||
builder.WriteString("- worker 的总结只作为编排输入,不要把它当作最终完成证明。\n")
|
||
builder.WriteString("- 输出必须严格符合 JSON schema,不要添加额外文本。\n")
|
||
builder.WriteString("- 每个 task 至少要写清 key、title、body_markdown、acceptance_markdown、kind、depends_on;非 milestone task 还要给出 deliverables。\n")
|
||
builder.WriteString("- priority、task_order 只在你需要覆盖系统默认排序时再填写。\n")
|
||
builder.WriteString("- kind=milestone 表示汇聚节点,不会派发给 worker;它只在依赖都成功后自动完成。\n")
|
||
builder.WriteString("- 空图首次规划必须使用 plan_mode=initial;已有 lane/task 时必须使用 plan_mode=patch。\n")
|
||
builder.WriteString("- plan_mode=patch 时必须填写 replan_reason,简要说明为什么要调整现有任务图。\n")
|
||
if initialFreeze {
|
||
builder.WriteString("- execution_mode 只能是 clarify / plan_only。初始建图阶段禁止 plan_and_start。\n")
|
||
} else {
|
||
builder.WriteString("- execution_mode 只能是 clarify / plan_only / plan_and_start。\n")
|
||
}
|
||
builder.WriteString("- 当 execution_mode=clarify 时,不要创建 tasks。\n")
|
||
builder.WriteString("- gate task 用于执行前置检查;系统会自动把 graph 派生成 execution lanes,并保证 gate 优先。\n")
|
||
if initialFreeze {
|
||
builder.WriteString("- 当前 topic 还没有执行图。请先给出可确认的初始 DAG;如果信息不足就用 clarify,否则使用 execution_mode=plan_only。系统会在用户确认后再启动执行。\n")
|
||
}
|
||
|
||
builder.WriteString("\n## Topic\n")
|
||
builder.WriteString(fmt.Sprintf("- slug: %s\n- status: %s\n", record.Slug, record.Status))
|
||
builder.WriteString("\n## Incoming Message\n")
|
||
builder.WriteString(fmt.Sprintf("- from: %s\n- type: %s\n- stage: %s\n%s\n", msg.FromRoleName, msg.Type, msg.Stage, msg.BodyMarkdown))
|
||
|
||
if len(lanes) > 0 {
|
||
builder.WriteString("\n## Existing Execution Lanes\n")
|
||
for _, item := range lanes {
|
||
builder.WriteString(fmt.Sprintf("- %s (%s): lane_id=%s\n", item.Name, item.Status, item.ID))
|
||
}
|
||
}
|
||
|
||
if len(tasks) > 0 {
|
||
builder.WriteString("\n## Existing Tasks\n")
|
||
for _, item := range tasks {
|
||
builder.WriteString(fmt.Sprintf("### %s [%s]\n", item.Title, item.Status))
|
||
builder.WriteString(fmt.Sprintf("- task_id: %s\n- lane_id: %s\n- kind: %s\n", item.ID, item.LaneID, item.Kind))
|
||
if strings.TrimSpace(item.AcceptanceMarkdown) != "" {
|
||
builder.WriteString(fmt.Sprintf("- acceptance: %s\n", item.AcceptanceMarkdown))
|
||
}
|
||
if len(item.Deliverables) > 0 {
|
||
builder.WriteString(fmt.Sprintf("- deliverables: %s\n", strings.Join(item.Deliverables, ", ")))
|
||
}
|
||
if strings.TrimSpace(item.BlockingReasonMarkdown) != "" {
|
||
builder.WriteString(fmt.Sprintf("- blocking_reason: %s\n", item.BlockingReasonMarkdown))
|
||
}
|
||
builder.WriteString(item.BodyMarkdown)
|
||
builder.WriteString("\n")
|
||
}
|
||
}
|
||
|
||
if len(messages) > 1 {
|
||
builder.WriteString("\n## Recent Messages\n")
|
||
start := 0
|
||
if len(messages) > 8 {
|
||
start = len(messages) - 8
|
||
}
|
||
for _, item := range messages[start:] {
|
||
builder.WriteString(fmt.Sprintf("### %s -> %s (%s)\n%s\n", item.FromRoleName, item.ToExpr, item.Stage, item.BodyMarkdown))
|
||
}
|
||
}
|
||
return builder.String()
|
||
}
|
||
|
||
func leaderOutputSchema(initialFreeze bool) string {
|
||
if initialFreeze {
|
||
return `{
|
||
"type":"object",
|
||
"additionalProperties":false,
|
||
"required":["plan_summary_markdown","plan_mode","replan_reason","execution_mode","leader_reply","tasks"],
|
||
"properties":{
|
||
"plan_summary_markdown":{"type":"string"},
|
||
"plan_mode":{"type":"string","enum":["initial"]},
|
||
"replan_reason":{"type":"string"},
|
||
"execution_mode":{"type":"string","enum":["clarify","plan_only"]},
|
||
"leader_reply":{
|
||
"type":"object",
|
||
"additionalProperties":false,
|
||
"required":["markdown","type"],
|
||
"properties":{
|
||
"markdown":{"type":"string"},
|
||
"type":{"type":"string","enum":["chat","question","summary","decision"]}
|
||
}
|
||
},
|
||
"tasks":{
|
||
"type":"array",
|
||
"items":{
|
||
"type":"object",
|
||
"additionalProperties":false,
|
||
"required":["key","title","body_markdown","acceptance_markdown","kind","deliverables","priority","task_order","depends_on"],
|
||
"properties":{
|
||
"key":{"type":"string"},
|
||
"title":{"type":"string"},
|
||
"body_markdown":{"type":"string"},
|
||
"acceptance_markdown":{"type":"string"},
|
||
"kind":{"type":"string","enum":["execution","gate","verification","milestone"]},
|
||
"deliverables":{"type":"array","items":{"type":"string"}},
|
||
"priority":{"type":"integer"},
|
||
"task_order":{"type":"integer"},
|
||
"depends_on":{"type":"array","items":{"type":"string"}}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}`
|
||
}
|
||
return `{
|
||
"type":"object",
|
||
"additionalProperties":false,
|
||
"required":["plan_summary_markdown","plan_mode","replan_reason","execution_mode","leader_reply","tasks"],
|
||
"properties":{
|
||
"plan_summary_markdown":{"type":"string"},
|
||
"plan_mode":{"type":"string","enum":["initial","patch"]},
|
||
"replan_reason":{"type":"string"},
|
||
"execution_mode":{"type":"string","enum":["clarify","plan_only","plan_and_start"]},
|
||
"leader_reply":{
|
||
"type":"object",
|
||
"additionalProperties":false,
|
||
"required":["markdown","type"],
|
||
"properties":{
|
||
"markdown":{"type":"string"},
|
||
"type":{"type":"string","enum":["chat","question","summary","decision"]}
|
||
}
|
||
},
|
||
"tasks":{
|
||
"type":"array",
|
||
"items":{
|
||
"type":"object",
|
||
"additionalProperties":false,
|
||
"required":["key","title","body_markdown","acceptance_markdown","kind","deliverables","priority","task_order","depends_on"],
|
||
"properties":{
|
||
"key":{"type":"string"},
|
||
"title":{"type":"string"},
|
||
"body_markdown":{"type":"string"},
|
||
"acceptance_markdown":{"type":"string"},
|
||
"kind":{"type":"string","enum":["execution","gate","verification","milestone"]},
|
||
"deliverables":{"type":"array","items":{"type":"string"}},
|
||
"priority":{"type":"integer"},
|
||
"task_order":{"type":"integer"},
|
||
"depends_on":{"type":"array","items":{"type":"string"}}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}`
|
||
}
|
||
|
||
func validateLeaderOutput(output leaderOutput, initialFreeze bool) error {
|
||
switch strings.TrimSpace(output.PlanMode) {
|
||
case "initial", "patch":
|
||
default:
|
||
return fmt.Errorf("leader output has invalid plan_mode %q", output.PlanMode)
|
||
}
|
||
switch strings.TrimSpace(output.ExecutionMode) {
|
||
case "clarify", "plan_only":
|
||
case "plan_and_start":
|
||
if initialFreeze {
|
||
return fmt.Errorf("leader initial planning cannot use execution_mode %q", output.ExecutionMode)
|
||
}
|
||
default:
|
||
return fmt.Errorf("leader output has invalid execution_mode %q", output.ExecutionMode)
|
||
}
|
||
switch strings.TrimSpace(output.LeaderReply.Type) {
|
||
case "chat", "question", "summary", "decision":
|
||
default:
|
||
return fmt.Errorf("leader output has invalid leader_reply.type %q", output.LeaderReply.Type)
|
||
}
|
||
if strings.TrimSpace(output.LeaderReply.Markdown) == "" && strings.TrimSpace(output.PlanSummaryMarkdown) == "" {
|
||
return fmt.Errorf("leader output must include leader_reply.markdown or plan_summary_markdown")
|
||
}
|
||
if output.ExecutionMode == "clarify" {
|
||
if len(output.Tasks) > 0 {
|
||
return fmt.Errorf("leader clarify mode cannot create tasks")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
hasGateTasks := false
|
||
seenKeys := map[string]struct{}{}
|
||
for idx, raw := range output.Tasks {
|
||
item := normalizeLeaderTaskSpec(raw, idx)
|
||
key := strings.TrimSpace(item.Key)
|
||
if key == "" {
|
||
return fmt.Errorf("leader task key is required")
|
||
}
|
||
if _, exists := seenKeys[key]; exists {
|
||
return fmt.Errorf("leader task key %q must be unique", key)
|
||
}
|
||
seenKeys[key] = struct{}{}
|
||
if strings.TrimSpace(item.Title) == "" {
|
||
return fmt.Errorf("leader task %q title is required", item.Key)
|
||
}
|
||
if strings.TrimSpace(item.BodyMarkdown) == "" {
|
||
return fmt.Errorf("leader task %q body_markdown is required", item.Key)
|
||
}
|
||
if strings.TrimSpace(item.AcceptanceMarkdown) == "" {
|
||
return fmt.Errorf("leader task %q acceptance_markdown is required", item.Key)
|
||
}
|
||
if strings.TrimSpace(item.Kind) != "milestone" && len(item.Deliverables) == 0 {
|
||
return fmt.Errorf("leader task %q must declare deliverables", item.Key)
|
||
}
|
||
switch strings.TrimSpace(item.Kind) {
|
||
case "execution", "gate", "verification", "milestone":
|
||
default:
|
||
return fmt.Errorf("leader task %q has invalid kind %q", item.Key, item.Kind)
|
||
}
|
||
if strings.TrimSpace(item.Kind) == "gate" {
|
||
hasGateTasks = true
|
||
}
|
||
}
|
||
_ = hasGateTasks
|
||
return nil
|
||
}
|
||
|
||
func mustEncodeLeaderCommand(output leaderOutput) string {
|
||
data, err := json.Marshal(map[string]any{
|
||
"mode": "leader",
|
||
"planning": output,
|
||
})
|
||
if err != nil {
|
||
return `{"mode":"leader"}`
|
||
}
|
||
return string(data)
|
||
}
|
||
|
||
func normalizeLeaderTaskSpec(spec leaderTaskSpec, index int) leaderTaskSpec {
|
||
spec.Key = strings.TrimSpace(spec.Key)
|
||
spec.Title = strings.TrimSpace(spec.Title)
|
||
spec.BodyMarkdown = strings.TrimSpace(spec.BodyMarkdown)
|
||
spec.AcceptanceMarkdown = strings.TrimSpace(spec.AcceptanceMarkdown)
|
||
spec.Kind = strings.TrimSpace(spec.Kind)
|
||
spec.Deliverables = append([]string(nil), spec.Deliverables...)
|
||
spec.DependsOn = normalizeTaskRefs(spec.DependsOn)
|
||
if spec.TaskOrder <= 0 {
|
||
spec.TaskOrder = index + 1
|
||
}
|
||
return spec
|
||
}
|
||
|
||
func mustEncodeLeaderCommandWithGraphStatus(commandJSON, graphStatus string) string {
|
||
var payload map[string]any
|
||
if err := json.Unmarshal([]byte(commandJSON), &payload); err != nil {
|
||
return commandJSON
|
||
}
|
||
payload["graph_status"] = graphStatus
|
||
data, err := json.Marshal(payload)
|
||
if err != nil {
|
||
return commandJSON
|
||
}
|
||
return string(data)
|
||
}
|
||
|
||
func (s *Service) recordTaskGraphVersion(ctx context.Context, topicID, createdBy string, output leaderOutput, freezeInitial bool) error {
|
||
if len(output.Tasks) == 0 {
|
||
return nil
|
||
}
|
||
var latest taskgraph.Record
|
||
version := 1
|
||
supersedesID := ""
|
||
if item, err := s.repo.GetLatestTaskGraphVersionByTopic(ctx, topicID); err == nil {
|
||
latest = item
|
||
version = item.Version + 1
|
||
if item.Status == taskgraph.StatusActive || item.Status == taskgraph.StatusDraft {
|
||
supersedesID = item.ID
|
||
}
|
||
} else if err != sql.ErrNoRows {
|
||
return err
|
||
}
|
||
if supersedesID != "" {
|
||
latest.Status = taskgraph.StatusSuperseded
|
||
if _, err := s.repo.UpdateTaskGraphVersion(ctx, latest); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
status := taskgraph.StatusActive
|
||
if freezeInitial {
|
||
status = taskgraph.StatusDraft
|
||
}
|
||
planJSON, err := json.Marshal(output)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
_, err = s.repo.CreateTaskGraphVersion(ctx, taskgraph.Record{
|
||
TopicID: topicID,
|
||
Version: version,
|
||
Status: status,
|
||
PlanJSON: string(planJSON),
|
||
PlanSummaryMarkdown: strings.TrimSpace(output.PlanSummaryMarkdown),
|
||
CreatedByRoleName: firstNonEmpty(strings.TrimSpace(createdBy), "leader"),
|
||
SupersedesGraphVersionID: supersedesID,
|
||
ConfirmedAt: confirmedAtValue(status, s.clock.Now()),
|
||
})
|
||
return err
|
||
}
|
||
|
||
func confirmedAtValue(status taskgraph.Status, now time.Time) string {
|
||
if status != taskgraph.StatusActive {
|
||
return ""
|
||
}
|
||
return timeutil.FormatRFC3339(now)
|
||
}
|
||
|
||
func firstNonEmpty(values ...string) string {
|
||
for _, value := range values {
|
||
if strings.TrimSpace(value) != "" {
|
||
return value
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
type hostCodexRunner struct {
|
||
projectRoot string
|
||
}
|
||
|
||
func (r hostCodexRunner) Run(ctx context.Context, workspaceRoot string, resolved runtimeconfig.ResolvedRole, prompt, schemaJSON string) (RunResult, error) {
|
||
tempDir, err := os.MkdirTemp("", "leaderloop-*")
|
||
if err != nil {
|
||
return RunResult{}, err
|
||
}
|
||
defer os.RemoveAll(tempDir)
|
||
|
||
resultFile := filepath.Join(tempDir, "result.json")
|
||
schemaFile := filepath.Join(tempDir, "schema.json")
|
||
if err := os.WriteFile(schemaFile, []byte(schemaJSON), 0644); err != nil {
|
||
return RunResult{}, err
|
||
}
|
||
|
||
args := []string{
|
||
"exec",
|
||
"--skip-git-repo-check",
|
||
"-C", workspaceRoot,
|
||
"-s", "workspace-write",
|
||
"-o", resultFile,
|
||
"--output-schema", schemaFile,
|
||
"-",
|
||
}
|
||
|
||
cmd := exec.CommandContext(ctx, "codex", args...)
|
||
cmd.Dir = workspaceRoot
|
||
cmd.Stdin = strings.NewReader(prompt)
|
||
cmd.Env = leaderCommandEnv(r.projectRoot, resolved)
|
||
|
||
var stdout, stderr strings.Builder
|
||
cmd.Stdout = &stdout
|
||
cmd.Stderr = &stderr
|
||
|
||
runResult := RunResult{
|
||
Status: workflow.RunStatusSucceeded,
|
||
ExitCode: 0,
|
||
}
|
||
err = cmd.Run()
|
||
if err != nil {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = err.Error()
|
||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||
runResult.ExitCode = exitErr.ExitCode()
|
||
}
|
||
}
|
||
runResult.StdoutLines = splitNonEmptyLines(stdout.String())
|
||
runResult.StderrLines = splitNonEmptyLines(stderr.String())
|
||
if resultBytes, readErr := os.ReadFile(resultFile); readErr == nil {
|
||
runResult.ResultJSON = strings.TrimSpace(string(resultBytes))
|
||
}
|
||
if runResult.Status == workflow.RunStatusSucceeded && !json.Valid([]byte(runResult.ResultJSON)) {
|
||
runResult.Status = workflow.RunStatusFailed
|
||
runResult.ErrorMessage = "leader returned invalid json output"
|
||
}
|
||
return runResult, nil
|
||
}
|
||
|
||
func leaderCommandEnv(projectRoot string, resolved runtimeconfig.ResolvedRole) []string {
|
||
roleHome, codexDir := leaderRuntimePaths(projectRoot, resolved)
|
||
_, _ = runtimecodex.WriteResolvedRoleHome(runtimecodex.HostLeaderRuntimeRoot(projectRoot, resolved.WorkspaceID), resolved)
|
||
env := append([]string{}, os.Environ()...)
|
||
env = append(env, "HOME="+roleHome)
|
||
env = append(env, "CODEX_HOME="+codexDir)
|
||
return env
|
||
}
|
||
|
||
func leaderRuntimePaths(projectRoot string, resolved runtimeconfig.ResolvedRole) (string, string) {
|
||
roleHome := runtimecodex.HostLeaderHomeDir(projectRoot, resolved.WorkspaceID, resolved.Role.Name)
|
||
codexDir := runtimecodex.HostLeaderCodexDir(projectRoot, resolved.WorkspaceID, resolved.Role.Name)
|
||
return roleHome, codexDir
|
||
}
|
||
|
||
func splitNonEmptyLines(value string) []string {
|
||
lines := strings.Split(value, "\n")
|
||
out := make([]string, 0, len(lines))
|
||
for _, line := range lines {
|
||
line = strings.TrimSpace(line)
|
||
if line == "" {
|
||
continue
|
||
}
|
||
out = append(out, line)
|
||
}
|
||
return out
|
||
}
|