Files
ai-workflow-skill/internal/store/council.go
T

1087 lines
32 KiB
Go

package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"sort"
"strings"
"time"
)
var councilReviewerRoles = []string{
"architecture-reviewer",
"implementation-reviewer",
"risk-reviewer",
}
type CouncilRun struct {
RunID string `json:"run_id"`
Mode string `json:"mode"`
TargetType string `json:"target_type"`
OutputMode string `json:"output_mode"`
OnlyUnanimous bool `json:"only_unanimous"`
}
type CouncilInput struct {
RunID string `json:"run_id"`
Prompt string `json:"prompt,omitempty"`
TargetFile string `json:"target_file,omitempty"`
RepoPath string `json:"repo_path,omitempty"`
TargetTaskID string `json:"task_id,omitempty"`
}
type CouncilReviewer struct {
ReviewerRole string `json:"reviewer_role"`
TaskID string `json:"task_id"`
Status string `json:"status"`
}
type CouncilStartInput struct {
RunID string
Target string
TargetFile string
RepoPath string
TargetTaskID string
TargetType string
Mode string
OutputMode string
OnlyUnanimous bool
}
type CouncilStartResult struct {
Run CouncilRun `json:"run"`
Input CouncilInput `json:"input"`
Reviewers []CouncilReviewer `json:"reviewers"`
}
type CouncilWaitInput struct {
RunID string
Timeout time.Duration
}
type CouncilWaitResult struct {
Woke bool `json:"woke"`
RunID string `json:"run_id"`
AllComplete bool `json:"all_complete"`
ReviewerStatuses []CouncilReviewer `json:"reviewers"`
}
type CouncilFinding struct {
RunID string `json:"run_id"`
ReviewerRole string `json:"reviewer_role"`
FindingID string `json:"finding_id"`
Title string `json:"title"`
Summary string `json:"summary"`
Proposal string `json:"proposal"`
Rationale string `json:"rationale"`
Confidence string `json:"confidence"`
TagsJSON json.RawMessage `json:"tags_json"`
TargetRefsJSON json.RawMessage `json:"target_refs_json"`
}
type CouncilGroup struct {
RunID string `json:"run_id"`
GroupID string `json:"group_id"`
Proposal string `json:"proposal"`
Bucket string `json:"bucket"`
SupportCount int `json:"support_count"`
SupportersJSON json.RawMessage `json:"supporters_json"`
DissentersJSON json.RawMessage `json:"dissenters_json"`
RationaleSummary string `json:"rationale_summary"`
TagsJSON json.RawMessage `json:"tags_json"`
SourceFindingIDsJSON json.RawMessage `json:"source_finding_ids_json"`
}
type CouncilTallyInput struct {
RunID string
Similarity string
}
type CouncilTallyResult struct {
RunID string `json:"run_id"`
Similarity string `json:"similarity"`
Counts map[string]int `json:"counts"`
GroupedRecommendations []CouncilGroup `json:"grouped_recommendations"`
}
type councilReviewerOutput struct {
ReviewerRole string `json:"reviewer_role"`
Findings []councilFindingOutput `json:"findings"`
}
type councilFindingOutput struct {
Title string `json:"title"`
Summary string `json:"summary"`
Proposal string `json:"proposal"`
Rationale string `json:"rationale"`
Confidence string `json:"confidence"`
Tags json.RawMessage `json:"tags"`
TargetRefs json.RawMessage `json:"target_refs"`
}
func (s *OrchStore) StartCouncil(ctx context.Context, input CouncilStartInput) (CouncilStartResult, error) {
runID := strings.TrimSpace(input.RunID)
if runID == "" {
return CouncilStartResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
councilInput, err := normalizeCouncilInput(input)
if err != nil {
return CouncilStartResult{}, err
}
councilRun, err := normalizeCouncilRun(input)
if err != nil {
return CouncilStartResult{}, err
}
now := nowUTC()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return CouncilStartResult{}, fmt.Errorf("begin council start transaction: %w", err)
}
defer tx.Rollback()
if _, err := selectRun(ctx, tx, runID); err == nil {
return CouncilStartResult{}, fmt.Errorf("%w: run %s already exists", ErrInvalidState, runID)
} else if !errors.Is(err, ErrRunNotFound) {
return CouncilStartResult{}, err
}
goal := buildCouncilRunGoal(councilInput)
summary := buildCouncilRunSummary(councilRun, councilInput)
_, err = tx.ExecContext(
ctx,
`INSERT INTO runs (run_id, goal, summary, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)`,
runID,
goal,
summary,
"active",
formatTime(now),
formatTime(now),
)
if err != nil {
return CouncilStartResult{}, fmt.Errorf("insert council run into runs: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: runID,
Source: "orch",
EventType: "run_initialized",
Summary: summary,
PayloadJSON: marshalJSON(map[string]any{"goal": goal, "summary": summary}),
CreatedAt: now,
}); err != nil {
return CouncilStartResult{}, err
}
_, err = tx.ExecContext(
ctx,
`INSERT INTO council_runs (
run_id, mode, target_type, output_mode, only_unanimous, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
runID,
councilRun.Mode,
councilRun.TargetType,
councilRun.OutputMode,
boolToInt(councilRun.OnlyUnanimous),
formatTime(now),
formatTime(now),
)
if err != nil {
return CouncilStartResult{}, fmt.Errorf("insert council run metadata: %w", err)
}
_, err = tx.ExecContext(
ctx,
`INSERT INTO council_inputs (
run_id, prompt, target_file, repo_path, target_task_id, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
runID,
councilInput.Prompt,
councilInput.TargetFile,
councilInput.RepoPath,
councilInput.TargetTaskID,
formatTime(now),
formatTime(now),
)
if err != nil {
return CouncilStartResult{}, fmt.Errorf("insert council input metadata: %w", err)
}
reviewers := make([]CouncilReviewer, 0, len(councilReviewerRoles))
for i, reviewerRole := range councilReviewerRoles {
taskID := fmt.Sprintf("CR%d", i+1)
task := Task{
RunID: runID,
TaskID: taskID,
Title: buildCouncilTaskTitle(reviewerRole),
Summary: buildCouncilTaskSummary(reviewerRole),
Status: "ready",
DefaultTo: reviewerRole,
Priority: "normal",
AcceptanceJSON: []byte(buildCouncilTaskAcceptanceJSON(councilRun, councilInput, reviewerRole)),
CreatedAt: now,
UpdatedAt: now,
}
_, err = tx.ExecContext(
ctx,
`INSERT INTO tasks (
run_id, task_id, title, summary, status, default_to, priority,
acceptance_json, latest_attempt_no, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, ?, ?)`,
task.RunID,
task.TaskID,
task.Title,
task.Summary,
task.Status,
nullIfEmpty(task.DefaultTo),
task.Priority,
string(task.AcceptanceJSON),
formatTime(task.CreatedAt),
formatTime(task.UpdatedAt),
)
if err != nil {
return CouncilStartResult{}, fmt.Errorf("insert council reviewer task: %w", err)
}
if err := insertEvent(ctx, tx, eventInput{
RunID: runID,
TaskID: taskID,
Source: "orch",
EventType: "task_added",
Summary: task.Title,
PayloadJSON: marshalJSON(map[string]any{"title": task.Title, "priority": task.Priority}),
CreatedAt: now,
}); err != nil {
return CouncilStartResult{}, err
}
if err := insertEvent(ctx, tx, eventInput{
RunID: runID,
TaskID: taskID,
Source: "orch",
EventType: "task_ready",
Summary: task.Title,
PayloadJSON: marshalJSON(map[string]any{"task_id": taskID}),
CreatedAt: now,
}); err != nil {
return CouncilStartResult{}, err
}
dispatchResult, finalizeWorkspace, err := s.dispatchTaskTx(
ctx,
tx,
task,
reviewerRole,
buildCouncilTaskBody(councilRun, councilInput, reviewerRole),
"",
nil,
now,
)
if err != nil {
return CouncilStartResult{}, err
}
defer finalizeWorkspace(false)
_, err = tx.ExecContext(
ctx,
`INSERT INTO council_reviewers (run_id, reviewer_role, task_id, status)
VALUES (?, ?, ?, ?)`,
runID,
reviewerRole,
taskID,
dispatchResult.Task.Status,
)
if err != nil {
return CouncilStartResult{}, fmt.Errorf("insert council reviewer row: %w", err)
}
reviewers = append(reviewers, CouncilReviewer{
ReviewerRole: reviewerRole,
TaskID: taskID,
Status: dispatchResult.Task.Status,
})
}
if err := insertEvent(ctx, tx, eventInput{
RunID: runID,
Source: "orch",
EventType: "council_started",
Summary: "council reviewers dispatched",
PayloadJSON: marshalJSON(map[string]any{
"mode": councilRun.Mode,
"target_type": councilRun.TargetType,
"output_mode": councilRun.OutputMode,
"only_unanimous": councilRun.OnlyUnanimous,
"reviewers": reviewers,
}),
CreatedAt: now,
}); err != nil {
return CouncilStartResult{}, err
}
if err := updateRunAggregateStatus(ctx, tx, runID, now); err != nil {
return CouncilStartResult{}, err
}
if err := tx.Commit(); err != nil {
return CouncilStartResult{}, fmt.Errorf("commit council start transaction: %w", err)
}
return CouncilStartResult{
Run: councilRun,
Input: councilInput,
Reviewers: reviewers,
}, nil
}
func normalizeCouncilRun(input CouncilStartInput) (CouncilRun, error) {
mode := defaultString(strings.TrimSpace(input.Mode), "brainstorm")
switch mode {
case "brainstorm", "review":
default:
return CouncilRun{}, fmt.Errorf("%w: mode must be brainstorm or review", ErrInvalidInput)
}
targetType := defaultString(strings.TrimSpace(input.TargetType), "mixed")
switch targetType {
case "text", "repo", "mixed":
default:
return CouncilRun{}, fmt.Errorf("%w: target-type must be text, repo, or mixed", ErrInvalidInput)
}
outputMode := defaultString(strings.TrimSpace(input.OutputMode), "both")
switch outputMode {
case "markdown", "json", "both":
default:
return CouncilRun{}, fmt.Errorf("%w: output must be markdown, json, or both", ErrInvalidInput)
}
return CouncilRun{
RunID: strings.TrimSpace(input.RunID),
Mode: mode,
TargetType: targetType,
OutputMode: outputMode,
OnlyUnanimous: input.OnlyUnanimous,
}, nil
}
func normalizeCouncilInput(input CouncilStartInput) (CouncilInput, error) {
result := CouncilInput{
RunID: strings.TrimSpace(input.RunID),
Prompt: strings.TrimSpace(input.Target),
TargetFile: strings.TrimSpace(input.TargetFile),
RepoPath: strings.TrimSpace(input.RepoPath),
TargetTaskID: strings.TrimSpace(input.TargetTaskID),
}
if result.Prompt == "" && result.TargetFile == "" && result.RepoPath == "" && result.TargetTaskID == "" {
return CouncilInput{}, fmt.Errorf("%w: at least one of target, target-file, repo-path, or task-id is required", ErrInvalidInput)
}
return result, nil
}
func buildCouncilRunGoal(input CouncilInput) string {
switch {
case input.Prompt != "":
return "Council review: " + truncateSingleLine(input.Prompt, 80)
case input.TargetTaskID != "":
return "Council review for task " + input.TargetTaskID
case input.TargetFile != "":
return "Council review for " + input.TargetFile
case input.RepoPath != "":
return "Council review for repo " + input.RepoPath
default:
return "Council review"
}
}
func buildCouncilRunSummary(run CouncilRun, input CouncilInput) string {
return fmt.Sprintf("%s council (%s)", run.Mode, run.TargetType)
}
func buildCouncilTaskTitle(reviewerRole string) string {
switch reviewerRole {
case "architecture-reviewer":
return "Council architecture review"
case "implementation-reviewer":
return "Council implementation review"
case "risk-reviewer":
return "Council risk review"
default:
return "Council review"
}
}
func buildCouncilTaskSummary(reviewerRole string) string {
switch reviewerRole {
case "architecture-reviewer":
return "Review the target for architecture, boundaries, and interfaces"
case "implementation-reviewer":
return "Review the target for simplicity, maintainability, and practicality"
case "risk-reviewer":
return "Review the target for regressions, correctness, and operability risks"
default:
return "Review the target"
}
}
func buildCouncilTaskAcceptanceJSON(run CouncilRun, input CouncilInput, reviewerRole string) string {
return marshalJSON(map[string]any{
"mode": "analysis",
"council": map[string]any{
"reviewer_role": reviewerRole,
"council_mode": run.Mode,
"target_type": run.TargetType,
"output_mode": run.OutputMode,
"only_unanimous": run.OnlyUnanimous,
"target": map[string]any{
"prompt": input.Prompt,
"target_file": input.TargetFile,
"repo_path": input.RepoPath,
"task_id": input.TargetTaskID,
},
"response_format": map[string]any{
"reviewer_role": reviewerRole,
"findings": []map[string]any{
{
"title": "string",
"summary": "string",
"proposal": "string",
"rationale": "string",
"confidence": "low|medium|high",
"tags": []string{},
"target_refs": map[string]any{},
},
},
},
},
})
}
func buildCouncilTaskBody(run CouncilRun, input CouncilInput, reviewerRole string) string {
parts := []string{
fmt.Sprintf("Reviewer role: %s", reviewerRole),
fmt.Sprintf("Council mode: %s", run.Mode),
fmt.Sprintf("Target type: %s", run.TargetType),
"Analyze the target from your assigned reviewer perspective.",
"Return structured findings with title, summary, proposal, rationale, confidence, tags, and optional target references.",
}
if input.Prompt != "" {
parts = append(parts, "", "Prompt:", input.Prompt)
}
if input.TargetFile != "" {
parts = append(parts, "", "Target file:", input.TargetFile)
}
if input.RepoPath != "" {
parts = append(parts, "", "Repo path:", input.RepoPath)
}
if input.TargetTaskID != "" {
parts = append(parts, "", "Related task id:", input.TargetTaskID)
}
return strings.Join(parts, "\n")
}
func truncateSingleLine(value string, maxLen int) string {
value = strings.TrimSpace(value)
value = strings.ReplaceAll(value, "\n", " ")
value = strings.ReplaceAll(value, "\r", " ")
value = strings.Join(strings.Fields(value), " ")
if maxLen <= 0 || len(value) <= maxLen {
return value
}
if maxLen <= 3 {
return value[:maxLen]
}
return value[:maxLen-3] + "..."
}
func boolToInt(value bool) int {
if value {
return 1
}
return 0
}
func (s *OrchStore) WaitForCouncil(ctx context.Context, input CouncilWaitInput) (CouncilWaitResult, error) {
runID := strings.TrimSpace(input.RunID)
if runID == "" {
return CouncilWaitResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
if _, err := s.GetCouncilRun(ctx, runID); err != nil {
return CouncilWaitResult{}, err
}
waitCtx := ctx
cancel := func() {}
if input.Timeout > 0 {
waitCtx, cancel = context.WithTimeout(ctx, input.Timeout)
}
defer cancel()
for {
reviewers, allComplete, err := s.GetCouncilReviewerStatuses(waitCtx, runID)
if err != nil {
if isDeadlineExceeded(waitCtx) {
return CouncilWaitResult{
Woke: false,
RunID: runID,
AllComplete: false,
ReviewerStatuses: reviewers,
}, nil
}
return CouncilWaitResult{}, err
}
if allComplete {
return CouncilWaitResult{
Woke: true,
RunID: runID,
AllComplete: true,
ReviewerStatuses: reviewers,
}, nil
}
if _, err := s.ReconcileRun(waitCtx, runID); err != nil {
if isSQLiteBusyError(err) {
ok, waitErr := waitForNextPoll(waitCtx, 25*time.Millisecond)
if waitErr != nil {
if errors.Is(waitErr, context.DeadlineExceeded) {
reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID)
return CouncilWaitResult{
Woke: false,
RunID: runID,
AllComplete: false,
ReviewerStatuses: reviewers,
}, nil
}
return CouncilWaitResult{}, waitErr
}
if !ok {
reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID)
return CouncilWaitResult{
Woke: false,
RunID: runID,
AllComplete: false,
ReviewerStatuses: reviewers,
}, nil
}
continue
}
if isDeadlineExceeded(waitCtx) {
reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID)
return CouncilWaitResult{
Woke: false,
RunID: runID,
AllComplete: false,
ReviewerStatuses: reviewers,
}, nil
}
return CouncilWaitResult{}, err
}
ok, err := waitForNextPoll(waitCtx, 200*time.Millisecond)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID)
return CouncilWaitResult{
Woke: false,
RunID: runID,
AllComplete: false,
ReviewerStatuses: reviewers,
}, nil
}
return CouncilWaitResult{}, err
}
if !ok {
reviewers, _, _ := s.GetCouncilReviewerStatuses(ctx, runID)
return CouncilWaitResult{
Woke: false,
RunID: runID,
AllComplete: false,
ReviewerStatuses: reviewers,
}, nil
}
}
}
func (s *OrchStore) GetCouncilRun(ctx context.Context, runID string) (CouncilRun, error) {
row := s.db.QueryRowContext(
ctx,
`SELECT run_id, mode, target_type, output_mode, only_unanimous
FROM council_runs
WHERE run_id = ?`,
runID,
)
var (
run CouncilRun
onlyUnanimous int
)
err := row.Scan(&run.RunID, &run.Mode, &run.TargetType, &run.OutputMode, &onlyUnanimous)
if errors.Is(err, sql.ErrNoRows) {
return CouncilRun{}, fmt.Errorf("%w: council run %s not found", ErrRunNotFound, runID)
}
if err != nil {
return CouncilRun{}, fmt.Errorf("scan council run: %w", err)
}
run.OnlyUnanimous = onlyUnanimous != 0
return run, nil
}
func (s *OrchStore) GetCouncilReviewerStatuses(ctx context.Context, runID string) ([]CouncilReviewer, bool, error) {
rows, err := s.db.QueryContext(
ctx,
`SELECT cr.reviewer_role, cr.task_id, t.status
FROM council_reviewers cr
JOIN tasks t
ON t.run_id = cr.run_id
AND t.task_id = cr.task_id
WHERE cr.run_id = ?
ORDER BY cr.reviewer_role ASC`,
runID,
)
if err != nil {
return nil, false, fmt.Errorf("query council reviewer statuses: %w", err)
}
defer rows.Close()
reviewers := make([]CouncilReviewer, 0, len(councilReviewerRoles))
allComplete := true
for rows.Next() {
var reviewer CouncilReviewer
if err := rows.Scan(&reviewer.ReviewerRole, &reviewer.TaskID, &reviewer.Status); err != nil {
return nil, false, fmt.Errorf("scan council reviewer status: %w", err)
}
if reviewer.Status != "done" && reviewer.Status != "failed" && reviewer.Status != "cancelled" {
allComplete = false
}
reviewers = append(reviewers, reviewer)
}
if err := rows.Err(); err != nil {
return nil, false, fmt.Errorf("iterate council reviewer statuses: %w", err)
}
if len(reviewers) == 0 {
return nil, false, fmt.Errorf("%w: council reviewers for run %s not found", ErrRunNotFound, runID)
}
return reviewers, allComplete, nil
}
func (s *OrchStore) TallyCouncil(ctx context.Context, input CouncilTallyInput) (CouncilTallyResult, error) {
runID := strings.TrimSpace(input.RunID)
if runID == "" {
return CouncilTallyResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput)
}
similarity := defaultString(strings.TrimSpace(input.Similarity), "normal")
if similarity != "normal" && similarity != "strict" {
return CouncilTallyResult{}, fmt.Errorf("%w: similarity must be strict or normal", ErrInvalidInput)
}
if _, err := s.GetCouncilRun(ctx, runID); err != nil {
return CouncilTallyResult{}, err
}
if _, err := s.ReconcileRun(ctx, runID); err != nil && !isSQLiteBusyError(err) {
return CouncilTallyResult{}, err
}
reviewers, allComplete, err := s.GetCouncilReviewerStatuses(ctx, runID)
if err != nil {
return CouncilTallyResult{}, err
}
if !allComplete {
return CouncilTallyResult{}, fmt.Errorf("%w: council reviewers are not complete yet", ErrInvalidState)
}
findings, err := s.collectCouncilFindings(ctx, runID, reviewers)
if err != nil {
return CouncilTallyResult{}, err
}
groups := groupCouncilFindings(runID, findings, reviewers, similarity)
counts := make(map[string]int)
for _, group := range groups {
counts[group.Bucket]++
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return CouncilTallyResult{}, fmt.Errorf("begin council tally transaction: %w", err)
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `DELETE FROM council_findings WHERE run_id = ?`, runID); err != nil {
return CouncilTallyResult{}, fmt.Errorf("clear council findings: %w", err)
}
if _, err := tx.ExecContext(ctx, `DELETE FROM council_groups WHERE run_id = ?`, runID); err != nil {
return CouncilTallyResult{}, fmt.Errorf("clear council groups: %w", err)
}
for _, finding := range findings {
if _, err := tx.ExecContext(
ctx,
`INSERT INTO council_findings (
run_id, reviewer_role, finding_id, title, summary, proposal, rationale,
confidence, tags_json, target_refs_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
finding.RunID,
finding.ReviewerRole,
finding.FindingID,
finding.Title,
finding.Summary,
finding.Proposal,
finding.Rationale,
finding.Confidence,
string(finding.TagsJSON),
string(finding.TargetRefsJSON),
); err != nil {
return CouncilTallyResult{}, fmt.Errorf("insert council finding: %w", err)
}
}
for _, group := range groups {
if _, err := tx.ExecContext(
ctx,
`INSERT INTO council_groups (
run_id, group_id, proposal, bucket, support_count, supporters_json,
dissenters_json, rationale_summary, tags_json, source_finding_ids_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
group.RunID,
group.GroupID,
group.Proposal,
group.Bucket,
group.SupportCount,
string(group.SupportersJSON),
string(group.DissentersJSON),
group.RationaleSummary,
string(group.TagsJSON),
string(group.SourceFindingIDsJSON),
); err != nil {
return CouncilTallyResult{}, fmt.Errorf("insert council group: %w", err)
}
}
if err := insertEvent(ctx, tx, eventInput{
RunID: runID,
Source: "orch",
EventType: "council_tallied",
Summary: "council recommendations grouped",
PayloadJSON: marshalJSON(map[string]any{
"similarity": similarity,
"counts": counts,
}),
CreatedAt: nowUTC(),
}); err != nil {
return CouncilTallyResult{}, err
}
if err := tx.Commit(); err != nil {
return CouncilTallyResult{}, fmt.Errorf("commit council tally transaction: %w", err)
}
return CouncilTallyResult{
RunID: runID,
Similarity: similarity,
Counts: counts,
GroupedRecommendations: groups,
}, nil
}
func (s *OrchStore) collectCouncilFindings(ctx context.Context, runID string, reviewers []CouncilReviewer) ([]CouncilFinding, error) {
findings := make([]CouncilFinding, 0)
for _, reviewer := range reviewers {
if reviewer.Status != "done" {
return nil, fmt.Errorf("%w: reviewer %s did not finish successfully", ErrInvalidState, reviewer.ReviewerRole)
}
message, err := s.loadCouncilReviewerResultMessage(ctx, runID, reviewer.TaskID)
if err != nil {
return nil, err
}
output, err := parseCouncilReviewerOutput(reviewer.ReviewerRole, message)
if err != nil {
return nil, err
}
for i, finding := range output.Findings {
tagsJSON, err := normalizeOptionalJSONArray(finding.Tags)
if err != nil {
return nil, fmt.Errorf("%w: reviewer %s finding %d tags must be a JSON array", ErrInvalidInput, reviewer.ReviewerRole, i+1)
}
targetRefsJSON, err := normalizeOptionalJSONObject(finding.TargetRefs)
if err != nil {
return nil, fmt.Errorf("%w: reviewer %s finding %d target_refs must be a JSON object", ErrInvalidInput, reviewer.ReviewerRole, i+1)
}
confidence := strings.TrimSpace(finding.Confidence)
switch confidence {
case "low", "medium", "high":
default:
return nil, fmt.Errorf("%w: reviewer %s finding %d confidence must be low, medium, or high", ErrInvalidInput, reviewer.ReviewerRole, i+1)
}
if strings.TrimSpace(finding.Proposal) == "" {
return nil, fmt.Errorf("%w: reviewer %s finding %d proposal is required", ErrInvalidInput, reviewer.ReviewerRole, i+1)
}
findings = append(findings, CouncilFinding{
RunID: runID,
ReviewerRole: reviewer.ReviewerRole,
FindingID: fmt.Sprintf("f%02d", i+1),
Title: strings.TrimSpace(finding.Title),
Summary: strings.TrimSpace(finding.Summary),
Proposal: strings.TrimSpace(finding.Proposal),
Rationale: strings.TrimSpace(finding.Rationale),
Confidence: confidence,
TagsJSON: json.RawMessage(tagsJSON),
TargetRefsJSON: json.RawMessage(targetRefsJSON),
})
}
}
return findings, nil
}
func (s *OrchStore) loadCouncilReviewerResultMessage(ctx context.Context, runID, taskID string) (Message, error) {
task, err := selectTask(ctx, s.db, runID, taskID)
if err != nil {
return Message{}, err
}
if task.LatestAttemptNo == 0 {
return Message{}, fmt.Errorf("%w: reviewer task %s has no attempt", ErrInvalidState, taskID)
}
attempt, err := selectAttempt(ctx, s.db, runID, taskID, task.LatestAttemptNo)
if err != nil {
return Message{}, err
}
row := s.db.QueryRowContext(
ctx,
`SELECT
message_id, thread_id, from_agent, to_agent, kind, summary, body,
payload_json, created_at
FROM messages
WHERE thread_id = ? AND kind = 'result'
ORDER BY created_at DESC
LIMIT 1`,
attempt.ThreadID,
)
message, err := scanMessage(row)
if errors.Is(err, sql.ErrNoRows) {
return Message{}, fmt.Errorf("%w: reviewer task %s has no result message", ErrInvalidState, taskID)
}
if err != nil {
return Message{}, err
}
return message, nil
}
func parseCouncilReviewerOutput(expectedRole string, message Message) (councilReviewerOutput, error) {
candidates := []string{strings.TrimSpace(message.Body), strings.TrimSpace(string(message.PayloadJSON))}
var lastErr error
for _, candidate := range candidates {
if candidate == "" || candidate == "{}" {
continue
}
var output councilReviewerOutput
if err := json.Unmarshal([]byte(candidate), &output); err != nil {
lastErr = err
continue
}
if strings.TrimSpace(output.ReviewerRole) == "" {
return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output must include reviewer_role", ErrInvalidInput)
}
if output.ReviewerRole != expectedRole {
return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output role %s does not match expected %s", ErrInvalidInput, output.ReviewerRole, expectedRole)
}
return output, nil
}
if lastErr != nil {
return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output must be valid JSON", ErrInvalidInput)
}
return councilReviewerOutput{}, fmt.Errorf("%w: reviewer result message did not contain council output JSON", ErrInvalidInput)
}
func normalizeOptionalJSONArray(raw json.RawMessage) (string, error) {
if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" {
return "[]", nil
}
var value []any
if err := json.Unmarshal(raw, &value); err != nil {
return "", err
}
return marshalJSON(value), nil
}
func normalizeOptionalJSONObject(raw json.RawMessage) (string, error) {
if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" {
return "{}", nil
}
var value map[string]any
if err := json.Unmarshal(raw, &value); err != nil {
return "", err
}
return marshalJSON(value), nil
}
func groupCouncilFindings(runID string, findings []CouncilFinding, reviewers []CouncilReviewer, similarity string) []CouncilGroup {
type groupedFinding struct {
key string
proposal string
findings []CouncilFinding
}
order := make([]string, 0)
groupsByKey := make(map[string]*groupedFinding)
for _, finding := range findings {
key := councilProposalGroupKey(finding.Proposal, similarity)
group, ok := groupsByKey[key]
if !ok {
group = &groupedFinding{
key: key,
proposal: finding.Proposal,
}
groupsByKey[key] = group
order = append(order, key)
}
group.findings = append(group.findings, finding)
}
sortedReviewers := make([]string, 0, len(reviewers))
for _, reviewer := range reviewers {
sortedReviewers = append(sortedReviewers, reviewer.ReviewerRole)
}
sort.Strings(sortedReviewers)
result := make([]CouncilGroup, 0, len(order))
for idx, key := range order {
group := groupsByKey[key]
supporterSet := make(map[string]struct{})
tagSet := make(map[string]struct{})
sourceFindingIDs := make([]string, 0, len(group.findings))
rationaleSummary := ""
for _, finding := range group.findings {
supporterSet[finding.ReviewerRole] = struct{}{}
sourceFindingIDs = append(sourceFindingIDs, finding.ReviewerRole+":"+finding.FindingID)
if rationaleSummary == "" && finding.Rationale != "" {
rationaleSummary = finding.Rationale
}
var tags []string
if len(finding.TagsJSON) > 0 {
_ = json.Unmarshal(finding.TagsJSON, &tags)
}
for _, tag := range tags {
tag = strings.TrimSpace(tag)
if tag != "" {
tagSet[tag] = struct{}{}
}
}
}
supporters := make([]string, 0, len(supporterSet))
for _, reviewer := range sortedReviewers {
if _, ok := supporterSet[reviewer]; ok {
supporters = append(supporters, reviewer)
}
}
dissenters := make([]string, 0, len(sortedReviewers)-len(supporters))
for _, reviewer := range sortedReviewers {
if _, ok := supporterSet[reviewer]; !ok {
dissenters = append(dissenters, reviewer)
}
}
tags := make([]string, 0, len(tagSet))
for tag := range tagSet {
tags = append(tags, tag)
}
sort.Strings(tags)
sort.Strings(sourceFindingIDs)
supportCount := len(supporters)
bucket := "minority"
if supportCount == 3 {
bucket = "consensus"
} else if supportCount == 2 {
bucket = "majority"
}
result = append(result, CouncilGroup{
RunID: runID,
GroupID: fmt.Sprintf("grp_%02d", idx+1),
Proposal: group.proposal,
Bucket: bucket,
SupportCount: supportCount,
SupportersJSON: json.RawMessage(marshalJSON(supporters)),
DissentersJSON: json.RawMessage(marshalJSON(dissenters)),
RationaleSummary: rationaleSummary,
TagsJSON: json.RawMessage(marshalJSON(tags)),
SourceFindingIDsJSON: json.RawMessage(marshalJSON(sourceFindingIDs)),
})
}
sort.SliceStable(result, func(i, j int) bool {
if result[i].SupportCount != result[j].SupportCount {
return result[i].SupportCount > result[j].SupportCount
}
return result[i].Proposal < result[j].Proposal
})
for i := range result {
result[i].GroupID = fmt.Sprintf("grp_%02d", i+1)
}
return result
}
func councilProposalGroupKey(proposal, similarity string) string {
tokens := proposalTokens(proposal)
if similarity == "strict" {
return strings.Join(tokens, " ")
}
stopWords := map[string]struct{}{
"a": {}, "an": {}, "the": {}, "to": {}, "into": {}, "and": {}, "or": {}, "of": {}, "for": {}, "in": {}, "on": {}, "with": {}, "from": {}, "that": {}, "this": {}, "it": {}, "is": {}, "are": {}, "be": {}, "by": {}, "as": {}, "keep": {}, "use": {}, "add": {},
}
set := make(map[string]struct{})
filtered := make([]string, 0, len(tokens))
for _, token := range tokens {
if _, stop := stopWords[token]; stop {
continue
}
if len(token) <= 2 {
continue
}
if _, seen := set[token]; seen {
continue
}
set[token] = struct{}{}
filtered = append(filtered, token)
}
sort.Strings(filtered)
if len(filtered) == 0 {
return strings.Join(tokens, " ")
}
return strings.Join(filtered, " ")
}
func proposalTokens(value string) []string {
lower := strings.ToLower(strings.TrimSpace(value))
fields := strings.FieldsFunc(lower, func(r rune) bool {
return !((r >= 'a' && r <= 'z') || (r >= '0' && r <= '9'))
})
result := make([]string, 0, len(fields))
for _, field := range fields {
if field != "" {
result = append(result, field)
}
}
return result
}