package taskexec import ( "context" "database/sql" "encoding/json" "fmt" "sort" "strings" "inbox/internal/app/lanegit" "inbox/internal/app/lanematerialize" "inbox/internal/app/lanesnapshot" "inbox/internal/app/runtimeconfig" "inbox/internal/base/timeutil" "inbox/internal/domain/lane" "inbox/internal/domain/message" "inbox/internal/domain/task" "inbox/internal/domain/topic" "inbox/internal/domain/workflow" ) type Repository interface { GetLane(ctx context.Context, laneID string) (lane.Record, error) UpdateLane(ctx context.Context, value lane.Record) (lane.Record, error) GetTask(ctx context.Context, taskID string) (task.Record, error) ListTasksByLane(ctx context.Context, laneID string) ([]task.Record, error) UpdateTask(ctx context.Context, value task.Record) (task.Record, error) ListTaskDependencies(ctx context.Context, taskID string) ([]task.Dependency, error) AppendTaskEvent(ctx context.Context, value task.Event) (task.Event, error) GetTopic(ctx context.Context, topicID string) (topic.Record, error) UpdateTopic(ctx context.Context, value topic.Record) (topic.Record, error) ListMessagesByTopic(ctx context.Context, topicID string) ([]message.Record, error) ListLanesByTopic(ctx context.Context, topicID string) ([]lane.Record, error) ListTasksByTopic(ctx context.Context, topicID string) ([]task.Record, error) CreateMessage(ctx context.Context, value message.Record) (message.Record, error) CreateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error) GetWorkflowRun(ctx context.Context, runID string) (workflow.Run, error) ClaimTaskExecution(ctx context.Context, run workflow.Run, taskID, startedAt string) (workflow.Run, task.Record, error) UpdateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error) CompleteTaskExecution(ctx context.Context, runID, taskID, laneID string, status workflow.RunStatus, exitCode int, resultMarkdown, errorMessage, completedAt string) (workflow.Run, task.Record, lane.Record, []task.Record, error) AppendWorkflowRunLog(ctx context.Context, value workflow.RunLog) (workflow.RunLog, error) } type RuntimeResolver interface { ResolveRole(ctx context.Context, workspaceID, roleName string) (runtimeconfig.ResolvedRole, error) } type Snapshotter interface { Capture(ctx context.Context, item lane.Record, taskRecord task.Record) (string, error) } type Materializer interface { Materialize(ctx context.Context, downstream lane.Record, taskID string, upstreams []lanematerialize.Upstream) error } type LaneRuntimeReleaser interface { ReleaseLaneRuntime(ctx context.Context, laneID string) (lane.Record, error) } type Option func(*Service) type Assignment struct { Run workflow.Run `json:"run"` Lane lane.Record `json:"lane"` Task task.Record `json:"task"` Role runtimeconfig.ResolvedRole `json:"role"` Prompt string `json:"prompt"` Model string `json:"model,omitempty"` OutputMode string `json:"output_mode"` } type Completion struct { RunID string `json:"run_id"` Status workflow.RunStatus `json:"status"` ExitCode int `json:"exit_code"` ResultMarkdown string `json:"result_markdown"` ErrorMessage string `json:"error_message"` } type commandMetadata struct { LaneID string `json:"lane_id"` TaskID string `json:"task_id"` RunnerID string `json:"runner_id,omitempty"` } type Service struct { repo Repository resolver RuntimeResolver clock timeutil.Clock snapshotter Snapshotter materializer Materializer runtime LaneRuntimeReleaser } func NewService(repo Repository, resolver RuntimeResolver, clock timeutil.Clock, opts ...Option) *Service { if clock == nil { clock = timeutil.SystemClock{} } svc := &Service{ repo: repo, resolver: resolver, clock: clock, snapshotter: lanesnapshot.NewService(lanegit.ExecRunner{}, clock), } if recorder, ok := repo.(lanematerialize.SyncRecorder); ok { svc.materializer = lanematerialize.NewService(recorder, lanegit.ExecRunner{}, clock) } for _, opt := range opts { opt(svc) } return svc } func WithSnapshotter(snapshotter Snapshotter) Option { return func(s *Service) { s.snapshotter = snapshotter } } func WithMaterializer(materializer Materializer) Option { return func(s *Service) { s.materializer = materializer } } func WithLaneRuntimeReleaser(runtime LaneRuntimeReleaser) Option { return func(s *Service) { s.runtime = runtime } } func (s *Service) ClaimNext(ctx context.Context, laneID, runnerID string) (Assignment, error) { laneRecord, err := s.repo.GetLane(ctx, laneID) if err != nil { return Assignment{}, err } topicRecord, err := s.repo.GetTopic(ctx, laneRecord.TopicID) if err != nil { return Assignment{}, err } if strings.TrimSpace(topicRecord.Status) != "execution" { return Assignment{}, sql.ErrNoRows } tasks, err := s.repo.ListTasksByLane(ctx, laneID) if err != nil { return Assignment{}, err } if hasRunningTask(tasks) { return Assignment{}, sql.ErrNoRows } candidate, err := s.nextReadyTask(ctx, tasks) if err != nil { return Assignment{}, err } if candidate.ID == "" { return Assignment{}, sql.ErrNoRows } if err := s.materializeTaskInputs(ctx, laneRecord, candidate); err != nil { return Assignment{}, err } resolved, err := s.resolver.ResolveRole(ctx, laneRecord.WorkspaceID, "worker") if err != nil { return Assignment{}, fmt.Errorf("resolve worker config: %w", err) } snapshot, err := resolved.SnapshotJSON() if err != nil { return Assignment{}, err } commandJSON, err := marshalCommandMetadata(commandMetadata{ LaneID: laneID, TaskID: candidate.ID, RunnerID: strings.TrimSpace(runnerID), }) if err != nil { return Assignment{}, err } runTemplate := workflow.Run{ WorkspaceID: candidate.WorkspaceID, TopicID: candidate.TopicID, RoleName: "worker", Stage: workflow.StageExecution, Mode: "task", Status: workflow.RunStatusRunning, ConfigSnapshotJSON: snapshot, CommandJSON: commandJSON, } startedAt := timeutil.FormatRFC3339(s.clock.Now()) run, candidate, err := s.repo.ClaimTaskExecution(ctx, runTemplate, candidate.ID, startedAt) if err != nil { return Assignment{}, err } if _, err := s.repo.AppendTaskEvent(ctx, task.Event{ TaskID: candidate.ID, EventType: "started", BodyMarkdown: "Worker started execution.", CreatedByRoleName: "worker", CreatedAt: startedAt, }); err != nil { _, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{ RunID: run.ID, Stream: workflow.LogStreamSystem, Content: "claim side effects failed after main state commit:\n- append started event for task " + candidate.ID + ": " + err.Error(), CreatedAt: startedAt, }) } topicRecord, err = s.repo.GetTopic(ctx, candidate.TopicID) if err != nil { return Assignment{}, err } messages, err := s.repo.ListMessagesByTopic(ctx, candidate.TopicID) if err != nil { return Assignment{}, err } assignment := Assignment{ Run: run, Lane: laneRecord, Task: candidate, Role: resolved, Prompt: buildWorkerPrompt(topicRecord, laneRecord, candidate, resolved, messages), OutputMode: "markdown", } return assignment, nil } func (s *Service) Complete(ctx context.Context, value Completion) (workflow.Run, error) { if err := workflow.ValidateRunStatus(value.Status); err != nil { return workflow.Run{}, err } if value.Status == workflow.RunStatusRunning { return workflow.Run{}, fmt.Errorf("completion status must be terminal") } run, err := s.repo.GetWorkflowRun(ctx, value.RunID) if err != nil { return workflow.Run{}, err } command, err := parseCommandMetadata(run.CommandJSON) if err != nil { return workflow.Run{}, err } topicRecord, err := s.repo.GetTopic(ctx, run.TopicID) if err != nil { return workflow.Run{}, err } if strings.TrimSpace(topicRecord.Status) == "cancelled" { run.Status = workflow.RunStatusCancelled run.ExitCode = 130 run.CompletedAt = timeutil.FormatRFC3339(s.clock.Now()) run.ErrorMessage = "Topic was stopped before task completion could be applied." return s.repo.UpdateWorkflowRun(ctx, run) } now := timeutil.FormatRFC3339(s.clock.Now()) value.ResultMarkdown = strings.TrimSpace(value.ResultMarkdown) value.ErrorMessage = strings.TrimSpace(value.ErrorMessage) taskRecord, err := s.repo.GetTask(ctx, command.TaskID) if err != nil { return workflow.Run{}, err } laneRecord, err := s.repo.GetLane(ctx, command.LaneID) if err != nil { return workflow.Run{}, err } if value.Status == workflow.RunStatusSucceeded && shouldSnapshotTask(taskRecord) && s.snapshotter != nil { headCommit, snapshotErr := s.snapshotter.Capture(ctx, laneRecord, taskRecord) if snapshotErr != nil { value.Status = workflow.RunStatusFailed value.ExitCode = failedExitCode(value.ExitCode) value.ResultMarkdown = "" value.ErrorMessage = "Lane snapshot failed: " + strings.TrimSpace(snapshotErr.Error()) } else if headCommit != "" && laneRecord.HeadCommit != headCommit { laneRecord.HeadCommit = headCommit if _, err := s.repo.UpdateLane(ctx, laneRecord); err != nil { value.Status = workflow.RunStatusFailed value.ExitCode = failedExitCode(value.ExitCode) value.ResultMarkdown = "" value.ErrorMessage = "Persist lane head commit: " + strings.TrimSpace(err.Error()) } } } updatedRun, taskRecord, chainRecord, promotedTasks, err := s.repo.CompleteTaskExecution( ctx, run.ID, command.TaskID, command.LaneID, value.Status, value.ExitCode, value.ResultMarkdown, value.ErrorMessage, now, ) if err != nil { return workflow.Run{}, err } if err := s.syncTopicStatus(ctx, run.TopicID); err != nil { return workflow.Run{}, err } s.emitCompletionSideEffects(ctx, updatedRun, taskRecord, chainRecord, promotedTasks, value, now) return updatedRun, nil } func (s *Service) AppendLog(ctx context.Context, runID string, stream workflow.LogStream, content string) (workflow.RunLog, error) { return s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{ RunID: runID, Stream: stream, Content: strings.TrimSpace(content), }) } func (s *Service) nextReadyTask(ctx context.Context, items []task.Record) (task.Record, error) { for _, item := range items { if item.Status != task.StatusReady { continue } if item.Kind == task.KindMilestone { continue } ready, err := s.dependenciesSatisfied(ctx, item.ID) if err != nil { return task.Record{}, err } if ready { return item, nil } } return task.Record{}, nil } func (s *Service) materializeTaskInputs(ctx context.Context, laneRecord lane.Record, taskRecord task.Record) error { if s.materializer == nil { return nil } upstreams, err := s.upstreamLanesForTask(ctx, laneRecord, taskRecord) if err != nil { return err } if len(upstreams) == 0 { return nil } if err := s.materializer.Materialize(ctx, laneRecord, taskRecord.ID, upstreams); err != nil { if blockErr := s.blockTaskForMaterializationFailure(ctx, laneRecord, taskRecord, err); blockErr != nil { return fmt.Errorf("%v; block task: %w", err, blockErr) } return sql.ErrNoRows } return nil } func (s *Service) upstreamLanesForTask(ctx context.Context, downstream lane.Record, taskRecord task.Record) ([]lanematerialize.Upstream, error) { deps, err := s.repo.ListTaskDependencies(ctx, taskRecord.ID) if err != nil { return nil, err } seen := map[string]struct{}{} out := make([]lanematerialize.Upstream, 0, len(deps)) for _, dep := range deps { upstreamTask, err := s.repo.GetTask(ctx, dep.DependsOnTaskID) if err != nil { return nil, err } if upstreamTask.LaneID == downstream.ID { continue } if _, ok := seen[upstreamTask.LaneID]; ok { continue } upstreamLane, err := s.repo.GetLane(ctx, upstreamTask.LaneID) if err != nil { return nil, err } seen[upstreamTask.LaneID] = struct{}{} out = append(out, lanematerialize.Upstream{TaskID: upstreamTask.ID, Lane: upstreamLane}) } sort.SliceStable(out, func(i, j int) bool { return out[i].Lane.ID < out[j].Lane.ID }) return out, nil } func (s *Service) blockTaskForMaterializationFailure(ctx context.Context, laneRecord lane.Record, taskRecord task.Record, cause error) error { now := timeutil.FormatRFC3339(s.clock.Now()) reason := "Lane input materialization failed.\n\n" + strings.TrimSpace(cause.Error()) taskRecord.Status = task.StatusBlocked taskRecord.BlockingReasonMarkdown = reason taskRecord.ResultSummaryMarkdown = "" taskRecord.AssignedRunID = "" taskRecord.UpdatedAt = now taskRecord.StartedAt = "" if _, err := s.repo.UpdateTask(ctx, taskRecord); err != nil { return err } laneRecord.Status = lane.StatusBlocked laneRecord.ErrorMessage = strings.TrimSpace(cause.Error()) laneRecord.UpdatedAt = now if _, err := s.repo.UpdateLane(ctx, laneRecord); err != nil { return err } if _, err := s.repo.AppendTaskEvent(ctx, task.Event{ TaskID: taskRecord.ID, EventType: "blocked", BodyMarkdown: reason, CreatedByRoleName: "worker", CreatedAt: now, }); err != nil { return err } if _, err := s.repo.CreateMessage(ctx, message.Record{ WorkspaceID: taskRecord.WorkspaceID, TopicID: taskRecord.TopicID, FromRoleName: "worker", ToExpr: "leader", Type: message.TypeSummary, Stage: string(workflow.StageExecution), BodyMarkdown: fmt.Sprintf("Lane `%s` blocked before task `%s` could start.\n\n%s", laneRecord.Name, taskRecord.Title, reason), CreatedAt: now, }); err != nil { return err } return s.syncTopicStatus(ctx, taskRecord.TopicID) } func shouldSnapshotTask(taskRecord task.Record) bool { switch taskRecord.Kind { case task.KindExecution, task.KindVerification: return true default: return false } } func failedExitCode(exitCode int) int { if exitCode != 0 { return exitCode } return 1 } func (s *Service) dependenciesSatisfied(ctx context.Context, taskID string) (bool, error) { deps, err := s.repo.ListTaskDependencies(ctx, taskID) if err != nil { return false, err } for _, dep := range deps { item, err := s.repo.GetTask(ctx, dep.DependsOnTaskID) if err != nil { return false, err } if item.Status != task.StatusSucceeded { return false, nil } } return true, nil } func hasRunningTask(items []task.Record) bool { for _, item := range items { if item.Status == task.StatusRunning { return true } } return false } func (s *Service) emitCompletionSideEffects( ctx context.Context, run workflow.Run, taskRecord task.Record, chainRecord lane.Record, promotedTasks []task.Record, value Completion, now string, ) { var failures []string eventType := "completed" eventBody := taskRecord.ResultSummaryMarkdown if taskRecord.Status == task.StatusFailed { eventType = "failed" eventBody = taskRecord.BlockingReasonMarkdown } if _, err := s.repo.AppendTaskEvent(ctx, task.Event{ TaskID: taskRecord.ID, EventType: eventType, BodyMarkdown: eventBody, CreatedByRoleName: "worker", CreatedAt: now, }); err != nil { failures = append(failures, fmt.Sprintf("append %s event for task %s: %v", eventType, taskRecord.ID, err)) } for _, item := range promotedTasks { if _, err := s.repo.AppendTaskEvent(ctx, task.Event{ TaskID: item.ID, EventType: "ready", BodyMarkdown: "Dependencies satisfied. Task is ready for execution.", CreatedByRoleName: "leader", CreatedAt: now, }); err != nil { failures = append(failures, fmt.Sprintf("append ready event for task %s: %v", item.ID, err)) } } topicRecord, err := s.repo.GetTopic(ctx, taskRecord.TopicID) if err == nil && strings.TrimSpace(topicRecord.Status) == "cancelled" { if len(failures) == 0 { return } _, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{ RunID: run.ID, Stream: workflow.LogStreamSystem, Content: "completion side effects failed after main state commit:\n- " + strings.Join(failures, "\n- "), CreatedAt: now, }) return } if _, err := s.repo.CreateMessage(ctx, message.Record{ WorkspaceID: taskRecord.WorkspaceID, TopicID: taskRecord.TopicID, FromRoleName: "worker", ToExpr: "leader", Type: message.TypeSummary, Stage: string(workflow.StageExecution), BodyMarkdown: workerSummaryMarkdown(chainRecord, taskRecord, value), ReplyToMessageID: "", CreatedAt: now, }); err != nil { failures = append(failures, fmt.Sprintf("create summary message for run %s: %v", run.ID, err)) } if len(failures) == 0 { return } _, _ = s.repo.AppendWorkflowRunLog(ctx, workflow.RunLog{ RunID: run.ID, Stream: workflow.LogStreamSystem, Content: "completion side effects failed after main state commit:\n- " + strings.Join(failures, "\n- "), CreatedAt: now, }) } func (s *Service) syncTopicStatus(ctx context.Context, topicID string) error { record, err := s.repo.GetTopic(ctx, topicID) if err != nil { return err } switch strings.TrimSpace(record.Status) { case "cancelled", "awaiting_confirmation": return nil } tasks, err := s.repo.ListTasksByTopic(ctx, topicID) if err != nil { return err } chains, err := s.repo.ListLanesByTopic(ctx, topicID) if err != nil { return err } nextStatus := "execution" if len(tasks) > 0 && graphCompleted(tasks, chains) { nextStatus = "completed" } else if graphBlocked(tasks, chains) { nextStatus = "blocked" } if record.Status == nextStatus { return nil } record.Status = nextStatus if nextStatus == "completed" && record.ClosedAt == "" { record.ClosedAt = timeutil.FormatRFC3339(s.clock.Now()) } if _, err := s.repo.UpdateTopic(ctx, record); err != nil { return err } if nextStatus == "completed" && s.runtime != nil { s.releaseTopicLaneRuntimes(ctx, chains) } return nil } func (s *Service) releaseTopicLaneRuntimes(ctx context.Context, lanes []lane.Record) { for _, item := range lanes { if strings.TrimSpace(item.ContainerName) == "" && strings.TrimSpace(item.RuntimeEndpoint) == "" { continue } _, _ = s.runtime.ReleaseLaneRuntime(ctx, item.ID) } } func graphBlocked(tasks []task.Record, chains []lane.Record) bool { for _, item := range tasks { if item.Status == task.StatusFailed || item.Status == task.StatusBlocked { return true } } for _, item := range chains { if item.Status == lane.StatusBlocked || item.Status == lane.StatusFailed { return true } } return false } func graphCompleted(tasks []task.Record, chains []lane.Record) bool { hasTasks := false for _, item := range tasks { hasTasks = true switch item.Status { case task.StatusSucceeded, task.StatusCancelled: default: return false } } if !hasTasks { return false } for _, item := range chains { switch item.Status { case lane.StatusSucceeded, lane.StatusCancelled: default: return false } } return true } func workerSummaryMarkdown(chainRecord lane.Record, taskRecord task.Record, value Completion) string { if value.Status == workflow.RunStatusSucceeded { body := strings.TrimSpace(value.ResultMarkdown) if body == "" { body = "Task completed without an explicit summary." } return fmt.Sprintf("Lane `%s` completed task `%s`.\n\n%s", chainRecord.Name, taskRecord.Title, body) } body := strings.TrimSpace(value.ErrorMessage) if body == "" { body = "Task failed without an explicit error message." } return fmt.Sprintf("Lane `%s` failed task `%s`.\n\n%s", chainRecord.Name, taskRecord.Title, body) } func buildWorkerPrompt(topicRecord topic.Record, chainRecord lane.Record, taskRecord task.Record, resolved runtimeconfig.ResolvedRole, messages []message.Record) string { var builder strings.Builder builder.WriteString("## Role\n") builder.WriteString(runtimeconfig.RenderInstructions(resolved, "")) builder.WriteString("\n") builder.WriteString("\n## Execution Context\n") builder.WriteString(fmt.Sprintf("- topic: %s\n- lane: %s\n- task: %s\n- task kind: %s\n- branch: %s\n", topicRecord.Slug, chainRecord.Name, taskRecord.Title, taskRecord.Kind, chainRecord.BranchName)) builder.WriteString("\n## Task\n") builder.WriteString(taskRecord.BodyMarkdown) if strings.TrimSpace(taskRecord.AcceptanceMarkdown) != "" { builder.WriteString("\n\n## Acceptance\n") builder.WriteString(taskRecord.AcceptanceMarkdown) } if len(messages) > 0 { builder.WriteString("\n## Recent Messages\n") start := 0 if len(messages) > 6 { start = len(messages) - 6 } for _, item := range messages[start:] { builder.WriteString(fmt.Sprintf("### %s -> %s (%s)\n%s\n", item.FromRoleName, item.ToExpr, item.Stage, item.BodyMarkdown)) } } builder.WriteString("\n## Blockers\n") builder.WriteString("If you are blocked, send exactly one concrete question to the leader via the inbox skill with the current execution stage, then stop and make the final markdown summary describe the blocker and the decision you need.\n") builder.WriteString("\nReturn a concise markdown execution summary suitable to send back to the leader. Do not add front matter.\n") return builder.String() } func marshalCommandMetadata(value commandMetadata) (string, error) { data, err := json.Marshal(value) if err != nil { return "", fmt.Errorf("marshal task command metadata: %w", err) } return string(data), nil } func parseCommandMetadata(value string) (commandMetadata, error) { var item commandMetadata if err := json.Unmarshal([]byte(value), &item); err != nil { return commandMetadata{}, fmt.Errorf("decode task command metadata: %w", err) } if strings.TrimSpace(item.TaskID) == "" { return commandMetadata{}, fmt.Errorf("task command metadata missing task id") } if strings.TrimSpace(item.LaneID) == "" { return commandMetadata{}, fmt.Errorf("task command metadata missing lane id") } return item, nil }