package store import ( "context" "database/sql" "encoding/json" "errors" "fmt" "sort" "strings" "time" ) var councilReviewerRoles = []string{ "architecture-reviewer", "implementation-reviewer", "risk-reviewer", } type CouncilRun struct { RunID string `json:"run_id"` Mode string `json:"mode"` TargetType string `json:"target_type"` OutputMode string `json:"output_mode"` OnlyUnanimous bool `json:"only_unanimous"` } type CouncilInput struct { RunID string `json:"run_id"` Prompt string `json:"prompt,omitempty"` TargetFile string `json:"target_file,omitempty"` RepoPath string `json:"repo_path,omitempty"` TargetTaskID string `json:"task_id,omitempty"` } type CouncilReviewer struct { ReviewerRole string `json:"reviewer_role"` TaskID string `json:"task_id"` Status string `json:"status"` } type CouncilStartInput struct { RunID string Target string TargetFile string RepoPath string TargetTaskID string TargetType string Mode string OutputMode string OnlyUnanimous bool } type CouncilStartResult struct { Run CouncilRun `json:"run"` Input CouncilInput `json:"input"` Reviewers []CouncilReviewer `json:"reviewers"` } type CouncilWaitInput struct { RunID string Timeout time.Duration } type CouncilWaitResult struct { Woke bool `json:"woke"` RunID string `json:"run_id"` AllComplete bool `json:"all_complete"` ReviewerStatuses []CouncilReviewer `json:"reviewers"` } type CouncilFinding struct { RunID string `json:"run_id"` ReviewerRole string `json:"reviewer_role"` FindingID string `json:"finding_id"` Title string `json:"title"` Summary string `json:"summary"` Proposal string `json:"proposal"` Rationale string `json:"rationale"` Confidence string `json:"confidence"` TagsJSON json.RawMessage `json:"tags_json"` TargetRefsJSON json.RawMessage `json:"target_refs_json"` } type CouncilGroup struct { RunID string `json:"run_id"` GroupID string `json:"group_id"` Proposal string `json:"proposal"` Bucket string `json:"bucket"` SupportCount int `json:"support_count"` SupportersJSON json.RawMessage `json:"supporters_json"` DissentersJSON json.RawMessage `json:"dissenters_json"` RationaleSummary string `json:"rationale_summary"` TagsJSON json.RawMessage `json:"tags_json"` SourceFindingIDsJSON json.RawMessage `json:"source_finding_ids_json"` } type CouncilTallyInput struct { RunID string Similarity string } type CouncilTallyResult struct { RunID string `json:"run_id"` Similarity string `json:"similarity"` Counts map[string]int `json:"counts"` GroupedRecommendations []CouncilGroup `json:"grouped_recommendations"` } type CouncilReportInput struct { RunID string Show string } type CouncilReportArtifact struct { Kind string `json:"kind"` Path string `json:"path"` } type CouncilReportResult struct { RunID string `json:"run_id"` Show []string `json:"show"` Summary map[string]int `json:"summary"` GroupedRecommendations []CouncilGroup `json:"grouped_recommendations"` Markdown string `json:"markdown,omitempty"` ReportArtifacts []CouncilReportArtifact `json:"report_artifacts,omitempty"` } type CouncilPersistReportInput struct { RunID string Show []string Summary map[string]int MarkdownPath string } type councilReviewerOutput struct { ReviewerRole string `json:"reviewer_role"` Findings []councilFindingOutput `json:"findings"` } type councilFindingOutput struct { Title string `json:"title"` Summary string `json:"summary"` Proposal string `json:"proposal"` Rationale string `json:"rationale"` Confidence string `json:"confidence"` Tags json.RawMessage `json:"tags"` TargetRefs json.RawMessage `json:"target_refs"` } func (s *OrchStore) StartCouncil(ctx context.Context, input CouncilStartInput) (CouncilStartResult, error) { runID := strings.TrimSpace(input.RunID) if runID == "" { return CouncilStartResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) } councilInput, err := normalizeCouncilInput(input) if err != nil { return CouncilStartResult{}, err } councilRun, err := normalizeCouncilRun(input) if err != nil { return CouncilStartResult{}, err } now := nowUTC() tx, err := s.db.BeginTx(ctx, nil) if err != nil { return CouncilStartResult{}, fmt.Errorf("begin council start transaction: %w", err) } defer tx.Rollback() if _, err := selectRun(ctx, tx, runID); err == nil { return CouncilStartResult{}, fmt.Errorf("%w: run %s already exists", ErrInvalidState, runID) } else if !errors.Is(err, ErrRunNotFound) { return CouncilStartResult{}, err } goal := buildCouncilRunGoal(councilInput) summary := buildCouncilRunSummary(councilRun, councilInput) _, err = tx.ExecContext( ctx, `INSERT INTO runs (run_id, goal, summary, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)`, runID, goal, summary, "active", formatTime(now), formatTime(now), ) if err != nil { return CouncilStartResult{}, fmt.Errorf("insert council run into runs: %w", err) } if err := insertEvent(ctx, tx, eventInput{ RunID: runID, Source: "orch", EventType: "run_initialized", Summary: summary, PayloadJSON: marshalJSON(map[string]any{"goal": goal, "summary": summary}), CreatedAt: now, }); err != nil { return CouncilStartResult{}, err } _, err = tx.ExecContext( ctx, `INSERT INTO council_runs ( run_id, mode, target_type, output_mode, only_unanimous, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?)`, runID, councilRun.Mode, councilRun.TargetType, councilRun.OutputMode, boolToInt(councilRun.OnlyUnanimous), formatTime(now), formatTime(now), ) if err != nil { return CouncilStartResult{}, fmt.Errorf("insert council run metadata: %w", err) } _, err = tx.ExecContext( ctx, `INSERT INTO council_inputs ( run_id, prompt, target_file, repo_path, target_task_id, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?)`, runID, councilInput.Prompt, councilInput.TargetFile, councilInput.RepoPath, councilInput.TargetTaskID, formatTime(now), formatTime(now), ) if err != nil { return CouncilStartResult{}, fmt.Errorf("insert council input metadata: %w", err) } reviewers := make([]CouncilReviewer, 0, len(councilReviewerRoles)) for i, reviewerRole := range councilReviewerRoles { taskID := fmt.Sprintf("CR%d", i+1) task := Task{ RunID: runID, TaskID: taskID, Title: buildCouncilTaskTitle(reviewerRole), Summary: buildCouncilTaskSummary(reviewerRole), Status: "ready", DefaultTo: reviewerRole, Priority: "normal", AcceptanceJSON: []byte(buildCouncilTaskAcceptanceJSON(councilRun, councilInput, reviewerRole)), CreatedAt: now, UpdatedAt: now, } _, err = tx.ExecContext( ctx, `INSERT INTO tasks ( run_id, task_id, title, summary, status, default_to, priority, acceptance_json, latest_attempt_no, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, ?, ?)`, task.RunID, task.TaskID, task.Title, task.Summary, task.Status, nullIfEmpty(task.DefaultTo), task.Priority, string(task.AcceptanceJSON), formatTime(task.CreatedAt), formatTime(task.UpdatedAt), ) if err != nil { return CouncilStartResult{}, fmt.Errorf("insert council reviewer task: %w", err) } if err := insertEvent(ctx, tx, eventInput{ RunID: runID, TaskID: taskID, Source: "orch", EventType: "task_added", Summary: task.Title, PayloadJSON: marshalJSON(map[string]any{"title": task.Title, "priority": task.Priority}), CreatedAt: now, }); err != nil { return CouncilStartResult{}, err } if err := insertEvent(ctx, tx, eventInput{ RunID: runID, TaskID: taskID, Source: "orch", EventType: "task_ready", Summary: task.Title, PayloadJSON: marshalJSON(map[string]any{"task_id": taskID}), CreatedAt: now, }); err != nil { return CouncilStartResult{}, err } dispatchResult, finalizeWorkspace, err := s.dispatchTaskTx( ctx, tx, task, reviewerRole, buildCouncilTaskBody(councilRun, councilInput, reviewerRole), "", nil, now, ) if err != nil { return CouncilStartResult{}, err } defer finalizeWorkspace(false) _, err = tx.ExecContext( ctx, `INSERT INTO council_reviewers (run_id, reviewer_role, task_id, status) VALUES (?, ?, ?, ?)`, runID, reviewerRole, taskID, dispatchResult.Task.Status, ) if err != nil { return CouncilStartResult{}, fmt.Errorf("insert council reviewer row: %w", err) } reviewers = append(reviewers, CouncilReviewer{ ReviewerRole: reviewerRole, TaskID: taskID, Status: dispatchResult.Task.Status, }) } if err := insertEvent(ctx, tx, eventInput{ RunID: runID, Source: "orch", EventType: "council_started", Summary: "council reviewers dispatched", PayloadJSON: marshalJSON(map[string]any{ "mode": councilRun.Mode, "target_type": councilRun.TargetType, "output_mode": councilRun.OutputMode, "only_unanimous": councilRun.OnlyUnanimous, "reviewers": reviewers, }), CreatedAt: now, }); err != nil { return CouncilStartResult{}, err } if err := updateRunAggregateStatus(ctx, tx, runID, now); err != nil { return CouncilStartResult{}, err } if err := tx.Commit(); err != nil { return CouncilStartResult{}, fmt.Errorf("commit council start transaction: %w", err) } return CouncilStartResult{ Run: councilRun, Input: councilInput, Reviewers: reviewers, }, nil } func normalizeCouncilRun(input CouncilStartInput) (CouncilRun, error) { mode := defaultString(strings.TrimSpace(input.Mode), "brainstorm") switch mode { case "brainstorm", "review": default: return CouncilRun{}, fmt.Errorf("%w: mode must be brainstorm or review", ErrInvalidInput) } targetType := defaultString(strings.TrimSpace(input.TargetType), "mixed") switch targetType { case "text", "repo", "mixed": default: return CouncilRun{}, fmt.Errorf("%w: target-type must be text, repo, or mixed", ErrInvalidInput) } outputMode := defaultString(strings.TrimSpace(input.OutputMode), "both") switch outputMode { case "markdown", "json", "both": default: return CouncilRun{}, fmt.Errorf("%w: output must be markdown, json, or both", ErrInvalidInput) } return CouncilRun{ RunID: strings.TrimSpace(input.RunID), Mode: mode, TargetType: targetType, OutputMode: outputMode, OnlyUnanimous: input.OnlyUnanimous, }, nil } func normalizeCouncilInput(input CouncilStartInput) (CouncilInput, error) { result := CouncilInput{ RunID: strings.TrimSpace(input.RunID), Prompt: strings.TrimSpace(input.Target), TargetFile: strings.TrimSpace(input.TargetFile), RepoPath: strings.TrimSpace(input.RepoPath), TargetTaskID: strings.TrimSpace(input.TargetTaskID), } if result.Prompt == "" && result.TargetFile == "" && result.RepoPath == "" && result.TargetTaskID == "" { return CouncilInput{}, fmt.Errorf("%w: at least one of target, target-file, repo-path, or task-id is required", ErrInvalidInput) } return result, nil } func buildCouncilRunGoal(input CouncilInput) string { switch { case input.Prompt != "": return "Council review: " + truncateSingleLine(input.Prompt, 80) case input.TargetTaskID != "": return "Council review for task " + input.TargetTaskID case input.TargetFile != "": return "Council review for " + input.TargetFile case input.RepoPath != "": return "Council review for repo " + input.RepoPath default: return "Council review" } } func buildCouncilRunSummary(run CouncilRun, input CouncilInput) string { return fmt.Sprintf("%s council (%s)", run.Mode, run.TargetType) } func buildCouncilTaskTitle(reviewerRole string) string { switch reviewerRole { case "architecture-reviewer": return "Council architecture review" case "implementation-reviewer": return "Council implementation review" case "risk-reviewer": return "Council risk review" default: return "Council review" } } func buildCouncilTaskSummary(reviewerRole string) string { switch reviewerRole { case "architecture-reviewer": return "Review the target for architecture, boundaries, and interfaces" case "implementation-reviewer": return "Review the target for simplicity, maintainability, and practicality" case "risk-reviewer": return "Review the target for regressions, correctness, and operability risks" default: return "Review the target" } } func buildCouncilTaskAcceptanceJSON(run CouncilRun, input CouncilInput, reviewerRole string) string { return marshalJSON(map[string]any{ "mode": "analysis", "council": map[string]any{ "reviewer_role": reviewerRole, "council_mode": run.Mode, "target_type": run.TargetType, "output_mode": run.OutputMode, "only_unanimous": run.OnlyUnanimous, "target": map[string]any{ "prompt": input.Prompt, "target_file": input.TargetFile, "repo_path": input.RepoPath, "task_id": input.TargetTaskID, }, "response_format": map[string]any{ "reviewer_role": reviewerRole, "findings": []map[string]any{ { "title": "string", "summary": "string", "proposal": "string", "rationale": "string", "confidence": "low|medium|high", "tags": []string{}, "target_refs": map[string]any{}, }, }, }, }, }) } func buildCouncilTaskBody(run CouncilRun, input CouncilInput, reviewerRole string) string { parts := []string{ fmt.Sprintf("Reviewer role: %s", reviewerRole), fmt.Sprintf("Council mode: %s", run.Mode), fmt.Sprintf("Target type: %s", run.TargetType), "Analyze the target from your assigned reviewer perspective.", "Return structured findings with title, summary, proposal, rationale, confidence, tags, and optional target references.", } if input.Prompt != "" { parts = append(parts, "", "Prompt:", input.Prompt) } if input.TargetFile != "" { parts = append(parts, "", "Target file:", input.TargetFile) } if input.RepoPath != "" { parts = append(parts, "", "Repo path:", input.RepoPath) } if input.TargetTaskID != "" { parts = append(parts, "", "Related task id:", input.TargetTaskID) } return strings.Join(parts, "\n") } func truncateSingleLine(value string, maxLen int) string { value = strings.TrimSpace(value) value = strings.ReplaceAll(value, "\n", " ") value = strings.ReplaceAll(value, "\r", " ") value = strings.Join(strings.Fields(value), " ") if maxLen <= 0 || len(value) <= maxLen { return value } if maxLen <= 3 { return value[:maxLen] } return value[:maxLen-3] + "..." } func boolToInt(value bool) int { if value { return 1 } return 0 } func (s *OrchStore) WaitForCouncil(ctx context.Context, input CouncilWaitInput) (CouncilWaitResult, error) { runID := strings.TrimSpace(input.RunID) if runID == "" { return CouncilWaitResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) } if _, err := s.GetCouncilRun(ctx, runID); err != nil { return CouncilWaitResult{}, err } waitCtx := ctx cancel := func() {} if input.Timeout > 0 { waitCtx, cancel = context.WithTimeout(ctx, input.Timeout) } defer cancel() for { reviewers, allComplete, err := s.GetCouncilReviewerStatuses(waitCtx, runID) if err != nil { if isDeadlineExceeded(waitCtx) { return CouncilWaitResult{ Woke: false, RunID: runID, AllComplete: false, ReviewerStatuses: reviewers, }, nil } return CouncilWaitResult{}, err } if allComplete { return CouncilWaitResult{ Woke: true, RunID: runID, AllComplete: true, ReviewerStatuses: reviewers, }, nil } if _, err := s.ReconcileRun(waitCtx, runID); err != nil { if isSQLiteBusyError(err) { ok, waitErr := waitForNextPoll(waitCtx, 25*time.Millisecond) if waitErr != nil { if errors.Is(waitErr, context.DeadlineExceeded) { reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) return CouncilWaitResult{ Woke: false, RunID: runID, AllComplete: false, ReviewerStatuses: reviewers, }, nil } return CouncilWaitResult{}, waitErr } if !ok { reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) return CouncilWaitResult{ Woke: false, RunID: runID, AllComplete: false, ReviewerStatuses: reviewers, }, nil } continue } if isDeadlineExceeded(waitCtx) { reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) return CouncilWaitResult{ Woke: false, RunID: runID, AllComplete: false, ReviewerStatuses: reviewers, }, nil } return CouncilWaitResult{}, err } ok, err := waitForNextPoll(waitCtx, 200*time.Millisecond) if err != nil { if errors.Is(err, context.DeadlineExceeded) { reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) return CouncilWaitResult{ Woke: false, RunID: runID, AllComplete: false, ReviewerStatuses: reviewers, }, nil } return CouncilWaitResult{}, err } if !ok { reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID) return CouncilWaitResult{ Woke: false, RunID: runID, AllComplete: false, ReviewerStatuses: reviewers, }, nil } } } func (s *OrchStore) GetCouncilRun(ctx context.Context, runID string) (CouncilRun, error) { row := s.db.QueryRowContext( ctx, `SELECT run_id, mode, target_type, output_mode, only_unanimous FROM council_runs WHERE run_id = ?`, runID, ) var ( run CouncilRun onlyUnanimous int ) err := row.Scan(&run.RunID, &run.Mode, &run.TargetType, &run.OutputMode, &onlyUnanimous) if errors.Is(err, sql.ErrNoRows) { return CouncilRun{}, fmt.Errorf("%w: council run %s not found", ErrRunNotFound, runID) } if err != nil { return CouncilRun{}, fmt.Errorf("scan council run: %w", err) } run.OnlyUnanimous = onlyUnanimous != 0 return run, nil } func (s *OrchStore) GetCouncilReviewerStatuses(ctx context.Context, runID string) ([]CouncilReviewer, bool, error) { rows, err := s.db.QueryContext( ctx, `SELECT cr.reviewer_role, cr.task_id, t.status FROM council_reviewers cr JOIN tasks t ON t.run_id = cr.run_id AND t.task_id = cr.task_id WHERE cr.run_id = ? ORDER BY cr.reviewer_role ASC`, runID, ) if err != nil { return nil, false, fmt.Errorf("query council reviewer statuses: %w", err) } defer rows.Close() reviewers := make([]CouncilReviewer, 0, len(councilReviewerRoles)) allComplete := true for rows.Next() { var reviewer CouncilReviewer if err := rows.Scan(&reviewer.ReviewerRole, &reviewer.TaskID, &reviewer.Status); err != nil { return nil, false, fmt.Errorf("scan council reviewer status: %w", err) } if reviewer.Status != "done" && reviewer.Status != "failed" && reviewer.Status != "cancelled" { allComplete = false } reviewers = append(reviewers, reviewer) } if err := rows.Err(); err != nil { return nil, false, fmt.Errorf("iterate council reviewer statuses: %w", err) } if len(reviewers) == 0 { return nil, false, fmt.Errorf("%w: council reviewers for run %s not found", ErrRunNotFound, runID) } return reviewers, allComplete, nil } func (s *OrchStore) TallyCouncil(ctx context.Context, input CouncilTallyInput) (CouncilTallyResult, error) { runID := strings.TrimSpace(input.RunID) if runID == "" { return CouncilTallyResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) } similarity := defaultString(strings.TrimSpace(input.Similarity), "normal") if similarity != "normal" && similarity != "strict" { return CouncilTallyResult{}, fmt.Errorf("%w: similarity must be strict or normal", ErrInvalidInput) } if _, err := s.GetCouncilRun(ctx, runID); err != nil { return CouncilTallyResult{}, err } if _, err := s.ReconcileRun(ctx, runID); err != nil && !isSQLiteBusyError(err) { return CouncilTallyResult{}, err } reviewers, allComplete, err := s.GetCouncilReviewerStatuses(ctx, runID) if err != nil { return CouncilTallyResult{}, err } if !allComplete { return CouncilTallyResult{}, fmt.Errorf("%w: council reviewers are not complete yet", ErrInvalidState) } findings, err := s.collectCouncilFindings(ctx, runID, reviewers) if err != nil { return CouncilTallyResult{}, err } groups := groupCouncilFindings(runID, findings, reviewers, similarity) counts := make(map[string]int) for _, group := range groups { counts[group.Bucket]++ } tx, err := s.db.BeginTx(ctx, nil) if err != nil { return CouncilTallyResult{}, fmt.Errorf("begin council tally transaction: %w", err) } defer tx.Rollback() if _, err := tx.ExecContext(ctx, `DELETE FROM council_findings WHERE run_id = ?`, runID); err != nil { return CouncilTallyResult{}, fmt.Errorf("clear council findings: %w", err) } if _, err := tx.ExecContext(ctx, `DELETE FROM council_groups WHERE run_id = ?`, runID); err != nil { return CouncilTallyResult{}, fmt.Errorf("clear council groups: %w", err) } for _, finding := range findings { if _, err := tx.ExecContext( ctx, `INSERT INTO council_findings ( run_id, reviewer_role, finding_id, title, summary, proposal, rationale, confidence, tags_json, target_refs_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, finding.RunID, finding.ReviewerRole, finding.FindingID, finding.Title, finding.Summary, finding.Proposal, finding.Rationale, finding.Confidence, string(finding.TagsJSON), string(finding.TargetRefsJSON), ); err != nil { return CouncilTallyResult{}, fmt.Errorf("insert council finding: %w", err) } } for _, group := range groups { if _, err := tx.ExecContext( ctx, `INSERT INTO council_groups ( run_id, group_id, proposal, bucket, support_count, supporters_json, dissenters_json, rationale_summary, tags_json, source_finding_ids_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, group.RunID, group.GroupID, group.Proposal, group.Bucket, group.SupportCount, string(group.SupportersJSON), string(group.DissentersJSON), group.RationaleSummary, string(group.TagsJSON), string(group.SourceFindingIDsJSON), ); err != nil { return CouncilTallyResult{}, fmt.Errorf("insert council group: %w", err) } } if err := insertEvent(ctx, tx, eventInput{ RunID: runID, Source: "orch", EventType: "council_tallied", Summary: "council recommendations grouped", PayloadJSON: marshalJSON(map[string]any{ "similarity": similarity, "counts": counts, }), CreatedAt: nowUTC(), }); err != nil { return CouncilTallyResult{}, err } if err := tx.Commit(); err != nil { return CouncilTallyResult{}, fmt.Errorf("commit council tally transaction: %w", err) } return CouncilTallyResult{ RunID: runID, Similarity: similarity, Counts: counts, GroupedRecommendations: groups, }, nil } func (s *OrchStore) BuildCouncilReport(ctx context.Context, input CouncilReportInput) (CouncilReportResult, error) { runID := strings.TrimSpace(input.RunID) if runID == "" { return CouncilReportResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) } run, err := s.GetCouncilRun(ctx, runID) if err != nil { return CouncilReportResult{}, err } councilInput, err := s.GetCouncilInput(ctx, runID) if err != nil { return CouncilReportResult{}, err } show, err := normalizeCouncilReportShow(input.Show, run.OnlyUnanimous) if err != nil { return CouncilReportResult{}, err } groups, tallied, err := s.ListCouncilGroups(ctx, runID) if err != nil { return CouncilReportResult{}, err } if !tallied { return CouncilReportResult{}, fmt.Errorf("%w: council groups are not available; run council tally first", ErrInvalidState) } summary := councilGroupSummary(groups) selectedGroups := selectCouncilGroupsForReport(groups, show) markdown := renderCouncilReportMarkdown(run, councilInput, show, summary, selectedGroups) return CouncilReportResult{ RunID: runID, Show: show, Summary: summary, GroupedRecommendations: selectedGroups, Markdown: markdown, }, nil } func (s *OrchStore) PersistCouncilReport(ctx context.Context, input CouncilPersistReportInput) error { runID := strings.TrimSpace(input.RunID) if runID == "" { return fmt.Errorf("%w: run id is required", ErrInvalidInput) } if _, err := s.GetCouncilRun(ctx, runID); err != nil { return err } showJSON := marshalJSON(input.Show) summaryJSON := marshalJSON(normalizeCouncilSummary(input.Summary)) markdownPath := strings.TrimSpace(input.MarkdownPath) now := nowUTC() tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("begin council report transaction: %w", err) } defer tx.Rollback() if _, err := tx.ExecContext( ctx, `INSERT INTO council_reports ( run_id, show_json, summary_json, markdown_path, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(run_id) DO UPDATE SET show_json = excluded.show_json, summary_json = excluded.summary_json, markdown_path = excluded.markdown_path, updated_at = excluded.updated_at`, runID, showJSON, summaryJSON, markdownPath, formatTime(now), formatTime(now), ); err != nil { return fmt.Errorf("persist council report metadata: %w", err) } if err := insertEvent(ctx, tx, eventInput{ RunID: runID, Source: "orch", EventType: "council_reported", Summary: "council report generated", PayloadJSON: marshalJSON(map[string]any{ "show": input.Show, "markdown_path": markdownPath, }), CreatedAt: now, }); err != nil { return err } if err := tx.Commit(); err != nil { return fmt.Errorf("commit council report transaction: %w", err) } return nil } func (s *OrchStore) collectCouncilFindings(ctx context.Context, runID string, reviewers []CouncilReviewer) ([]CouncilFinding, error) { findings := make([]CouncilFinding, 0) for _, reviewer := range reviewers { if reviewer.Status != "done" { return nil, fmt.Errorf("%w: reviewer %s did not finish successfully", ErrInvalidState, reviewer.ReviewerRole) } message, err := s.loadCouncilReviewerResultMessage(ctx, runID, reviewer.TaskID) if err != nil { return nil, err } output, err := parseCouncilReviewerOutput(reviewer.ReviewerRole, message) if err != nil { return nil, err } for i, finding := range output.Findings { tagsJSON, err := normalizeOptionalJSONArray(finding.Tags) if err != nil { return nil, fmt.Errorf("%w: reviewer %s finding %d tags must be a JSON array", ErrInvalidInput, reviewer.ReviewerRole, i+1) } targetRefsJSON, err := normalizeOptionalJSONObject(finding.TargetRefs) if err != nil { return nil, fmt.Errorf("%w: reviewer %s finding %d target_refs must be a JSON object", ErrInvalidInput, reviewer.ReviewerRole, i+1) } confidence := strings.TrimSpace(finding.Confidence) switch confidence { case "low", "medium", "high": default: return nil, fmt.Errorf("%w: reviewer %s finding %d confidence must be low, medium, or high", ErrInvalidInput, reviewer.ReviewerRole, i+1) } if strings.TrimSpace(finding.Proposal) == "" { return nil, fmt.Errorf("%w: reviewer %s finding %d proposal is required", ErrInvalidInput, reviewer.ReviewerRole, i+1) } findings = append(findings, CouncilFinding{ RunID: runID, ReviewerRole: reviewer.ReviewerRole, FindingID: fmt.Sprintf("f%02d", i+1), Title: strings.TrimSpace(finding.Title), Summary: strings.TrimSpace(finding.Summary), Proposal: strings.TrimSpace(finding.Proposal), Rationale: strings.TrimSpace(finding.Rationale), Confidence: confidence, TagsJSON: json.RawMessage(tagsJSON), TargetRefsJSON: json.RawMessage(targetRefsJSON), }) } } return findings, nil } func (s *OrchStore) GetCouncilInput(ctx context.Context, runID string) (CouncilInput, error) { row := s.db.QueryRowContext( ctx, `SELECT run_id, prompt, target_file, repo_path, target_task_id FROM council_inputs WHERE run_id = ?`, runID, ) var input CouncilInput if err := row.Scan( &input.RunID, &input.Prompt, &input.TargetFile, &input.RepoPath, &input.TargetTaskID, ); errors.Is(err, sql.ErrNoRows) { return CouncilInput{RunID: runID}, nil } else if err != nil { return CouncilInput{}, fmt.Errorf("scan council input: %w", err) } return input, nil } func (s *OrchStore) ListCouncilGroups(ctx context.Context, runID string) ([]CouncilGroup, bool, error) { rows, err := s.db.QueryContext( ctx, `SELECT run_id, group_id, proposal, bucket, support_count, supporters_json, dissenters_json, rationale_summary, tags_json, source_finding_ids_json FROM council_groups WHERE run_id = ? ORDER BY CASE bucket WHEN 'consensus' THEN 1 WHEN 'majority' THEN 2 WHEN 'minority' THEN 3 ELSE 4 END, support_count DESC, group_id ASC`, runID, ) if err != nil { return nil, false, fmt.Errorf("query council groups: %w", err) } defer rows.Close() groups := make([]CouncilGroup, 0) for rows.Next() { var group CouncilGroup var supportersJSON string var dissentersJSON string var tagsJSON string var sourceFindingIDsJSON string if err := rows.Scan( &group.RunID, &group.GroupID, &group.Proposal, &group.Bucket, &group.SupportCount, &supportersJSON, &dissentersJSON, &group.RationaleSummary, &tagsJSON, &sourceFindingIDsJSON, ); err != nil { return nil, false, fmt.Errorf("scan council group: %w", err) } group.SupportersJSON = json.RawMessage(supportersJSON) group.DissentersJSON = json.RawMessage(dissentersJSON) group.TagsJSON = json.RawMessage(tagsJSON) group.SourceFindingIDsJSON = json.RawMessage(sourceFindingIDsJSON) groups = append(groups, group) } if err := rows.Err(); err != nil { return nil, false, fmt.Errorf("iterate council groups: %w", err) } if len(groups) > 0 { return groups, true, nil } tallied, err := s.hasCouncilTallyEvent(ctx, runID) if err != nil { return nil, false, err } return groups, tallied, nil } func (s *OrchStore) hasCouncilTallyEvent(ctx context.Context, runID string) (bool, error) { var count int if err := s.db.QueryRowContext( ctx, `SELECT COUNT(*) FROM events WHERE run_id = ? AND event_type = 'council_tallied'`, runID, ).Scan(&count); err != nil { return false, fmt.Errorf("query council tally events: %w", err) } return count > 0, nil } func (s *OrchStore) loadCouncilReviewerResultMessage(ctx context.Context, runID, taskID string) (Message, error) { task, err := selectTask(ctx, s.db, runID, taskID) if err != nil { return Message{}, err } if task.LatestAttemptNo == 0 { return Message{}, fmt.Errorf("%w: reviewer task %s has no attempt", ErrInvalidState, taskID) } attempt, err := selectAttempt(ctx, s.db, runID, taskID, task.LatestAttemptNo) if err != nil { return Message{}, err } row := s.db.QueryRowContext( ctx, `SELECT message_id, thread_id, from_agent, to_agent, kind, summary, body, payload_json, created_at FROM messages WHERE thread_id = ? AND kind = 'result' ORDER BY created_at DESC LIMIT 1`, attempt.ThreadID, ) message, err := scanMessage(row) if errors.Is(err, sql.ErrNoRows) { return Message{}, fmt.Errorf("%w: reviewer task %s has no result message", ErrInvalidState, taskID) } if err != nil { return Message{}, err } return message, nil } func parseCouncilReviewerOutput(expectedRole string, message Message) (councilReviewerOutput, error) { candidates := []string{strings.TrimSpace(message.Body), strings.TrimSpace(string(message.PayloadJSON))} var lastErr error for _, candidate := range candidates { if candidate == "" || candidate == "{}" { continue } var output councilReviewerOutput if err := json.Unmarshal([]byte(candidate), &output); err != nil { lastErr = err continue } if strings.TrimSpace(output.ReviewerRole) == "" { return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output must include reviewer_role", ErrInvalidInput) } if output.ReviewerRole != expectedRole { return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output role %s does not match expected %s", ErrInvalidInput, output.ReviewerRole, expectedRole) } return output, nil } if lastErr != nil { return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output must be valid JSON", ErrInvalidInput) } return councilReviewerOutput{}, fmt.Errorf("%w: reviewer result message did not contain council output JSON", ErrInvalidInput) } func normalizeOptionalJSONArray(raw json.RawMessage) (string, error) { if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { return "[]", nil } var value []any if err := json.Unmarshal(raw, &value); err != nil { return "", err } return marshalJSON(value), nil } func normalizeOptionalJSONObject(raw json.RawMessage) (string, error) { if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { return "{}", nil } var value map[string]any if err := json.Unmarshal(raw, &value); err != nil { return "", err } return marshalJSON(value), nil } func groupCouncilFindings(runID string, findings []CouncilFinding, reviewers []CouncilReviewer, similarity string) []CouncilGroup { type groupedFinding struct { key string proposal string findings []CouncilFinding } order := make([]string, 0) groupsByKey := make(map[string]*groupedFinding) for _, finding := range findings { key := councilProposalGroupKey(finding.Proposal, similarity) group, ok := groupsByKey[key] if !ok { group = &groupedFinding{ key: key, proposal: finding.Proposal, } groupsByKey[key] = group order = append(order, key) } group.findings = append(group.findings, finding) } sortedReviewers := make([]string, 0, len(reviewers)) for _, reviewer := range reviewers { sortedReviewers = append(sortedReviewers, reviewer.ReviewerRole) } sort.Strings(sortedReviewers) result := make([]CouncilGroup, 0, len(order)) for idx, key := range order { group := groupsByKey[key] supporterSet := make(map[string]struct{}) tagSet := make(map[string]struct{}) sourceFindingIDs := make([]string, 0, len(group.findings)) rationaleSummary := "" for _, finding := range group.findings { supporterSet[finding.ReviewerRole] = struct{}{} sourceFindingIDs = append(sourceFindingIDs, finding.ReviewerRole+":"+finding.FindingID) if rationaleSummary == "" && finding.Rationale != "" { rationaleSummary = finding.Rationale } var tags []string if len(finding.TagsJSON) > 0 { _ = json.Unmarshal(finding.TagsJSON, &tags) } for _, tag := range tags { tag = strings.TrimSpace(tag) if tag != "" { tagSet[tag] = struct{}{} } } } supporters := make([]string, 0, len(supporterSet)) for _, reviewer := range sortedReviewers { if _, ok := supporterSet[reviewer]; ok { supporters = append(supporters, reviewer) } } dissenters := make([]string, 0, len(sortedReviewers)-len(supporters)) for _, reviewer := range sortedReviewers { if _, ok := supporterSet[reviewer]; !ok { dissenters = append(dissenters, reviewer) } } tags := make([]string, 0, len(tagSet)) for tag := range tagSet { tags = append(tags, tag) } sort.Strings(tags) sort.Strings(sourceFindingIDs) supportCount := len(supporters) bucket := "minority" if supportCount == 3 { bucket = "consensus" } else if supportCount == 2 { bucket = "majority" } result = append(result, CouncilGroup{ RunID: runID, GroupID: fmt.Sprintf("grp_%02d", idx+1), Proposal: group.proposal, Bucket: bucket, SupportCount: supportCount, SupportersJSON: json.RawMessage(marshalJSON(supporters)), DissentersJSON: json.RawMessage(marshalJSON(dissenters)), RationaleSummary: rationaleSummary, TagsJSON: json.RawMessage(marshalJSON(tags)), SourceFindingIDsJSON: json.RawMessage(marshalJSON(sourceFindingIDs)), }) } sort.SliceStable(result, func(i, j int) bool { if result[i].SupportCount != result[j].SupportCount { return result[i].SupportCount > result[j].SupportCount } return result[i].Proposal < result[j].Proposal }) for i := range result { result[i].GroupID = fmt.Sprintf("grp_%02d", i+1) } return result } func normalizeCouncilReportShow(raw string, onlyUnanimous bool) ([]string, error) { if strings.TrimSpace(raw) == "" { if onlyUnanimous { return []string{"consensus"}, nil } return []string{"consensus", "majority"}, nil } parts := strings.Split(raw, ",") show := make([]string, 0, len(parts)) seen := make(map[string]struct{}, len(parts)) for _, part := range parts { value := strings.ToLower(strings.TrimSpace(part)) if value == "" { continue } if value == "all" { return []string{"consensus", "majority", "minority"}, nil } switch value { case "consensus", "majority", "minority": default: return nil, fmt.Errorf("%w: show must contain consensus, majority, minority, or all", ErrInvalidInput) } if _, ok := seen[value]; ok { continue } seen[value] = struct{}{} show = append(show, value) } if len(show) == 0 { return nil, fmt.Errorf("%w: show must contain at least one bucket", ErrInvalidInput) } return show, nil } func councilGroupSummary(groups []CouncilGroup) map[string]int { summary := normalizeCouncilSummary(nil) for _, group := range groups { summary[group.Bucket]++ } return summary } func normalizeCouncilSummary(summary map[string]int) map[string]int { result := map[string]int{ "consensus": 0, "majority": 0, "minority": 0, } for key, value := range summary { result[key] = value } return result } func selectCouncilGroupsForReport(groups []CouncilGroup, show []string) []CouncilGroup { groupedByBucket := make(map[string][]CouncilGroup, len(show)) for _, group := range groups { groupedByBucket[group.Bucket] = append(groupedByBucket[group.Bucket], group) } selected := make([]CouncilGroup, 0, len(groups)) for _, bucket := range show { selected = append(selected, groupedByBucket[bucket]...) } return selected } func renderCouncilReportMarkdown(run CouncilRun, input CouncilInput, show []string, summary map[string]int, groups []CouncilGroup) string { var builder strings.Builder builder.WriteString("# Council Review Report\n\n") builder.WriteString(fmt.Sprintf("- Run ID: `%s`\n", run.RunID)) builder.WriteString(fmt.Sprintf("- Mode: `%s`\n", run.Mode)) builder.WriteString(fmt.Sprintf("- Target Type: `%s`\n", run.TargetType)) builder.WriteString(fmt.Sprintf("- Report Buckets: `%s`\n\n", strings.Join(show, "`, `"))) builder.WriteString("## Target\n\n") if strings.TrimSpace(input.Prompt) != "" { builder.WriteString(fmt.Sprintf("- Prompt: %s\n", input.Prompt)) } if strings.TrimSpace(input.TargetFile) != "" { builder.WriteString(fmt.Sprintf("- Target File: `%s`\n", input.TargetFile)) } if strings.TrimSpace(input.RepoPath) != "" { builder.WriteString(fmt.Sprintf("- Repo Path: `%s`\n", input.RepoPath)) } if strings.TrimSpace(input.TargetTaskID) != "" { builder.WriteString(fmt.Sprintf("- Task ID: `%s`\n", input.TargetTaskID)) } if strings.TrimSpace(input.Prompt) == "" && strings.TrimSpace(input.TargetFile) == "" && strings.TrimSpace(input.RepoPath) == "" && strings.TrimSpace(input.TargetTaskID) == "" { builder.WriteString("- No explicit target metadata was recorded.\n") } builder.WriteString("\n") builder.WriteString("## Summary\n\n") builder.WriteString(fmt.Sprintf("- Consensus: %d\n", summary["consensus"])) builder.WriteString(fmt.Sprintf("- Majority: %d\n", summary["majority"])) builder.WriteString(fmt.Sprintf("- Minority: %d\n\n", summary["minority"])) groupedByBucket := make(map[string][]CouncilGroup, len(show)) for _, group := range groups { groupedByBucket[group.Bucket] = append(groupedByBucket[group.Bucket], group) } for _, bucket := range show { builder.WriteString(fmt.Sprintf("## %s\n\n", councilBucketHeading(bucket))) bucketGroups := groupedByBucket[bucket] if len(bucketGroups) == 0 { builder.WriteString(fmt.Sprintf("No %s recommendations.\n\n", bucket)) continue } for _, group := range bucketGroups { supporters := decodeCouncilStringSlice(group.SupportersJSON) dissenters := decodeCouncilStringSlice(group.DissentersJSON) tags := decodeCouncilStringSlice(group.TagsJSON) sourceFindingIDs := decodeCouncilStringSlice(group.SourceFindingIDsJSON) builder.WriteString(fmt.Sprintf("### %s\n\n", group.GroupID)) builder.WriteString(group.Proposal) builder.WriteString("\n\n") builder.WriteString(fmt.Sprintf("- Support: %d of 3 reviewers", group.SupportCount)) if len(supporters) > 0 { builder.WriteString(fmt.Sprintf(" (`%s`)", strings.Join(supporters, "`, `"))) } builder.WriteString("\n") if len(dissenters) > 0 { builder.WriteString(fmt.Sprintf("- Dissenters: `%s`\n", strings.Join(dissenters, "`, `"))) } if strings.TrimSpace(group.RationaleSummary) != "" { builder.WriteString(fmt.Sprintf("- Rationale: %s\n", group.RationaleSummary)) } if len(tags) > 0 { builder.WriteString(fmt.Sprintf("- Tags: `%s`\n", strings.Join(tags, "`, `"))) } if len(sourceFindingIDs) > 0 { builder.WriteString(fmt.Sprintf("- Source Findings: `%s`\n", strings.Join(sourceFindingIDs, "`, `"))) } builder.WriteString("\n") } } return builder.String() } func councilBucketHeading(bucket string) string { switch bucket { case "consensus": return "Consensus" case "majority": return "Majority" case "minority": return "Minority" default: if bucket == "" { return "Recommendations" } return strings.ToUpper(bucket[:1]) + bucket[1:] } } func decodeCouncilStringSlice(raw json.RawMessage) []string { if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { return nil } var values []string if err := json.Unmarshal(raw, &values); err != nil { return nil } result := make([]string, 0, len(values)) for _, value := range values { value = strings.TrimSpace(value) if value != "" { result = append(result, value) } } return result } func councilProposalGroupKey(proposal, similarity string) string { tokens := proposalTokens(proposal) if similarity == "strict" { return strings.Join(tokens, " ") } stopWords := map[string]struct{}{ "a": {}, "an": {}, "the": {}, "to": {}, "into": {}, "and": {}, "or": {}, "of": {}, "for": {}, "in": {}, "on": {}, "with": {}, "from": {}, "that": {}, "this": {}, "it": {}, "is": {}, "are": {}, "be": {}, "by": {}, "as": {}, "keep": {}, "use": {}, "add": {}, } set := make(map[string]struct{}) filtered := make([]string, 0, len(tokens)) for _, token := range tokens { if _, stop := stopWords[token]; stop { continue } if len(token) <= 2 { continue } if _, seen := set[token]; seen { continue } set[token] = struct{}{} filtered = append(filtered, token) } sort.Strings(filtered) if len(filtered) == 0 { return strings.Join(tokens, " ") } return strings.Join(filtered, " ") } func proposalTokens(value string) []string { lower := strings.ToLower(strings.TrimSpace(value)) fields := strings.FieldsFunc(lower, func(r rune) bool { return !((r >= 'a' && r <= 'z') || (r >= '0' && r <= '9')) }) result := make([]string, 0, len(fields)) for _, field := range fields { if field != "" { result = append(result, field) } } return result }