From 740a7b4acdaf4328516ced2219006fe6377920fb Mon Sep 17 00:00:00 2001 From: kurihada Date: Thu, 19 Mar 2026 15:26:11 +0800 Subject: [PATCH] Add council review tally command --- docs/council-review.md | 1 + docs/implementation-roadmap.md | 23 +- docs/roadmaps/archive/orch-council-tally.md | 64 +++ internal/cli/orch/council.go | 1 + internal/cli/orch/council_tally.go | 64 +++ internal/cli/orch/integration_test.go | 204 +++++++++ internal/store/council.go | 463 ++++++++++++++++++++ 7 files changed, 810 insertions(+), 10 deletions(-) create mode 100644 docs/roadmaps/archive/orch-council-tally.md create mode 100644 internal/cli/orch/council_tally.go diff --git a/docs/council-review.md b/docs/council-review.md index 9f9edff..bb39077 100644 --- a/docs/council-review.md +++ b/docs/council-review.md @@ -161,6 +161,7 @@ Suggested flags: Behavior: - reads the three reviewer outputs +- parses structured reviewer findings from completed reviewer result messages - groups proposals by normalized intent - records supporter count and dissent - persists grouped recommendations in `orch` storage diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index a63c10a..9b06f9d 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -30,9 +30,10 @@ As of now: - `orch wait` now blocks on run-scoped task events and reconciles inbox state while polling so leader waits can wake on worker progress without manual sleep loops - `orch council start` now creates a dedicated council run, persists council target input metadata, and dispatches the three fixed reviewer roles through the existing scheduler - `orch council wait` now blocks until the three reviewer tasks reach terminal states or a timeout is reached -- automated integration tests now cover the main `orch` scheduler slice, including dependency gating, dispatch, blocked-answer flow, retry, reassign, cancel, cleanup, strict worktree creation, automatic code-task worktree enablement, dirty-repo rejection rules, wait wake/timeout behavior, and council start/wait behavior +- `orch council tally` now parses completed reviewer outputs, persists `council_findings`, groups recommendations into `consensus`, `majority`, and `minority`, and persists `council_groups` +- automated integration tests now cover the main `orch` scheduler slice, including dependency gating, dispatch, blocked-answer flow, retry, reassign, cancel, cleanup, strict worktree creation, automatic code-task worktree enablement, dirty-repo rejection rules, wait wake/timeout behavior, and council start/wait/tally behavior -This means the project now has a working `orch` core scheduler with automatic worktree selection for code-like tasks, strict worktree-backed dispatch, the main leader-side control loop, and the first two council workflow slices. +This means the project now has a working `orch` core scheduler with automatic worktree selection for code-like tasks, strict worktree-backed dispatch, the main leader-side control loop, and the first three council workflow slices. ## Source Of Truth @@ -75,9 +76,9 @@ Current implementation status: - `Milestone 4: Orch Core Scheduling` is complete for the current non-worktree scheduler scope - `Milestone 5: Strict Worktree Support` is complete - `Milestone 6: Waiting Primitives` is complete -- `Milestone 7: Council Review` is partially complete through `orch council start` and `orch council wait` +- `Milestone 7: Council Review` is partially complete through `orch council start`, `orch council wait`, and `orch council tally` -The next practical coding target is the next `Milestone 7` slice: `orch council tally`. +The next practical coding target is the final `Milestone 7` slice: `orch council report`. ### Milestone 1: Go Skeleton @@ -336,31 +337,32 @@ Definition of done: Status: -- partially complete through `orch council start` +- partially complete through `orch council start`, `orch council wait`, and `orch council tally` Completed so far: - council-specific storage now includes run metadata, reviewer assignment rows, reviewer findings/groups tables, and persisted council input references - `orch council start` - `orch council wait` +- `orch council tally` - council start creates a dedicated run, stores council target input metadata, creates reviewer tasks `CR1` through `CR3`, and dispatches the fixed reviewer roles `architecture-reviewer`, `implementation-reviewer`, and `risk-reviewer` - council wait blocks until all three reviewer tasks reach terminal states or timeout -- CLI integration tests cover council start dispatch, metadata persistence, and council wait wake/timeout behavior +- council tally parses structured reviewer outputs from completed reviewer result messages and persists grouped recommendations +- CLI integration tests cover council start dispatch, metadata persistence, council wait wake/timeout behavior, and council tally grouping in `normal` and `strict` modes Remaining: -- `orch council tally` - `orch council report` ## Immediate Next Task If a new agent is taking over now, the next concrete step should be: -1. continue `Milestone 7: Council Review` with `orch council tally` -2. define how reviewer findings are parsed from completed reviewer outputs into `council_findings` +1. continue `Milestone 7: Council Review` with `orch council report` +2. define the persisted report artifact shape and how markdown output should be rendered from grouped recommendations 3. keep the authored inbox test-plan set in `docs/tests/inbox/` synchronized if CLI behavior changes during further `orch` work -The inbox implementation and its human-readable test-plan set are already in place, and `orch` now supports the main scheduler loop plus council start/wait, so the next meaningful project step is parsing reviewer outputs and implementing tally/report. +The inbox implementation and its human-readable test-plan set are already in place, and `orch` now supports the main scheduler loop plus council start/wait/tally, so the next meaningful project step is rendering final council reports. ## Recommended Driver Choices @@ -388,6 +390,7 @@ Completed so far: - orch retry, reassign, cancel, and cleanup coverage - orch council start dispatch and persistence coverage - orch council wait wake and timeout coverage +- orch council tally grouping coverage Still recommended before the codebase grows too much: diff --git a/docs/roadmaps/archive/orch-council-tally.md b/docs/roadmaps/archive/orch-council-tally.md new file mode 100644 index 0000000..708b9a4 --- /dev/null +++ b/docs/roadmaps/archive/orch-council-tally.md @@ -0,0 +1,64 @@ +# Orch Council Tally + +## Status + +- `completed` + +## Owner + +- codex + +## Started At + +- `2026-03-19` + +## Goal + +- implement `orch council tally` so completed reviewer outputs can be parsed, grouped, and persisted as council findings and grouped recommendations + +## Scope + +- parse reviewer output JSON from completed council reviewer threads +- persist parsed findings into `council_findings` +- group recommendations into `consensus`, `majority`, and `minority` buckets and persist them into `council_groups` +- add integration tests for normal and strict similarity modes +- update the implementation roadmap and archive this workstream when complete + +## Checklist + +- [x] inspect council tally docs and current council storage +- [x] implement council tally store and CLI command +- [x] add integration coverage for normal and strict tally behavior +- [x] run `go test ./...` +- [x] update `docs/implementation-roadmap.md` +- [x] archive this roadmap with a completion summary + +## Files + +- `docs/roadmaps/archive/orch-council-tally.md` +- `docs/implementation-roadmap.md` +- `docs/council-review.md` +- `internal/store/council.go` +- `internal/cli/orch/council.go` +- `internal/cli/orch/council_tally.go` +- `internal/cli/orch/integration_test.go` + +## Decisions + +- read reviewer outputs from the latest `result` message on each reviewer thread +- require reviewer outputs to be valid structured JSON before tallying +- use a simple normalized proposal grouping strategy for v1, with `strict` preserving wording differences and `normal` grouping by normalized intent tokens + +## Blockers + +- none + +## Next Step + +- move on to `orch council report` + +## Completion Summary + +- `orch council tally` now parses structured reviewer output JSON from completed reviewer result messages +- parsed reviewer findings are persisted into `council_findings`, and grouped recommendations are persisted into `council_groups` +- tally supports `normal` and `strict` similarity modes and computes `consensus`, `majority`, and `minority` buckets diff --git a/internal/cli/orch/council.go b/internal/cli/orch/council.go index 951de10..b1084b0 100644 --- a/internal/cli/orch/council.go +++ b/internal/cli/orch/council.go @@ -10,5 +10,6 @@ func newCouncilCmd(root *rootOptions) *cobra.Command { cmd.AddCommand(newCouncilStartCmd(root)) cmd.AddCommand(newCouncilWaitCmd(root)) + cmd.AddCommand(newCouncilTallyCmd(root)) return cmd } diff --git a/internal/cli/orch/council_tally.go b/internal/cli/orch/council_tally.go new file mode 100644 index 0000000..db2ce38 --- /dev/null +++ b/internal/cli/orch/council_tally.go @@ -0,0 +1,64 @@ +package orch + +import ( + "fmt" + + "ai-workflow-skill/internal/protocol" + "ai-workflow-skill/internal/store" + + "github.com/spf13/cobra" +) + +type councilTallyOptions struct { + runID string + similarity string +} + +func newCouncilTallyCmd(root *rootOptions) *cobra.Command { + opts := &councilTallyOptions{} + + cmd := &cobra.Command{ + Use: "tally", + Short: "Group reviewer findings and compute council support counts", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + sqlDB, err := openOrchDB(ctx, root.dbPath) + if err != nil { + return err + } + defer sqlDB.Close() + + result, err := store.NewOrchStore(sqlDB).TallyCouncil(ctx, store.CouncilTallyInput{ + RunID: opts.runID, + Similarity: opts.similarity, + }) + if err != nil { + return err + } + + resp := protocol.Success{ + OK: true, + Command: "council tally", + Data: map[string]any{ + "run_id": result.RunID, + "similarity": result.Similarity, + "counts": result.Counts, + "grouped_recommendations": result.GroupedRecommendations, + }, + } + if root.json { + return protocol.WriteJSON(cmd.OutOrStdout(), resp) + } + + _, err = fmt.Fprintf(cmd.OutOrStdout(), "tallied council run %s into %d groups\n", result.RunID, len(result.GroupedRecommendations)) + return err + }, + } + + cmd.Flags().StringVar(&opts.runID, "run", "", "Council run ID") + cmd.Flags().StringVar(&opts.similarity, "similarity", "normal", "Grouping mode: strict or normal") + _ = cmd.MarkFlagRequired("run") + + return cmd +} diff --git a/internal/cli/orch/integration_test.go b/internal/cli/orch/integration_test.go index a887a4e..44f9b36 100644 --- a/internal/cli/orch/integration_test.go +++ b/internal/cli/orch/integration_test.go @@ -1712,6 +1712,210 @@ func TestOrchCouncilWaitTimesOutWhenReviewersIncomplete(t *testing.T) { } } +func TestOrchCouncilTallyGroupsReviewerFindingsNormal(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "start", + "--run", "council_blog_tally_001", + "--target", "Review the current blog architecture.", + ) + + completeCouncilReviewer( + t, + dbPath, + "council_blog_tally_001", + "architecture-reviewer", + `{"reviewer_role":"architecture-reviewer","findings":[{"title":"Split contracts","summary":"Transport contracts are mixed into UI code.","proposal":"Move API contract definitions into a dedicated module.","rationale":"This lowers coupling.","confidence":"high","tags":["architecture","coupling"],"target_refs":{"repo_path":"."}}]}`, + ) + completeCouncilReviewer( + t, + dbPath, + "council_blog_tally_001", + "implementation-reviewer", + `{"reviewer_role":"implementation-reviewer","findings":[{"title":"Extract API contracts","summary":"Shared transport shapes are duplicated.","proposal":"Move API contract definitions into dedicated module","rationale":"This reduces duplication.","confidence":"medium","tags":["maintainability"],"target_refs":{"repo_path":"."}}]}`, + ) + completeCouncilReviewer( + t, + dbPath, + "council_blog_tally_001", + "risk-reviewer", + `{"reviewer_role":"risk-reviewer","findings":[{"title":"Add auth integration tests","summary":"Login regressions are hard to catch.","proposal":"Add integration tests for auth flows.","rationale":"This catches regressions earlier.","confidence":"high","tags":["risk","testing"],"target_refs":{"repo_path":"."}}]}`, + ) + + tallyOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "tally", + "--run", "council_blog_tally_001", + "--similarity", "normal", + ) + + var tallyResp map[string]any + mustDecodeJSON(t, tallyOut, &tallyResp) + if got := nestedString(t, tallyResp, "data", "similarity"); got != "normal" { + t.Fatalf("expected normal similarity, got %q", got) + } + counts, ok := nestedValue(t, tallyResp, "data", "counts").(map[string]any) + if !ok { + t.Fatalf("expected counts object, got %#v", nestedValue(t, tallyResp, "data", "counts")) + } + if got, _ := counts["majority"].(float64); got != 1 { + t.Fatalf("expected one majority group, got %#v", counts["majority"]) + } + if got, _ := counts["minority"].(float64); got != 1 { + t.Fatalf("expected one minority group, got %#v", counts["minority"]) + } + + groups := nestedArray(t, tallyResp, "data", "grouped_recommendations") + if len(groups) != 2 { + t.Fatalf("expected two grouped recommendations, got %#v", groups) + } + firstGroup, ok := groups[0].(map[string]any) + if !ok { + t.Fatalf("expected group object, got %#v", groups[0]) + } + if got, _ := firstGroup["bucket"].(string); got != "majority" { + t.Fatalf("expected first group majority, got %#v", firstGroup["bucket"]) + } + if got, _ := firstGroup["support_count"].(float64); got != 2 { + t.Fatalf("expected support_count 2, got %#v", firstGroup["support_count"]) + } + + sqlDB, err := openOrchDB(t.Context(), dbPath) + if err != nil { + t.Fatalf("open orch db: %v", err) + } + defer sqlDB.Close() + + var findingsCount int + if err := sqlDB.QueryRowContext(t.Context(), `SELECT COUNT(*) FROM council_findings WHERE run_id = ?`, "council_blog_tally_001").Scan(&findingsCount); err != nil { + t.Fatalf("count council_findings: %v", err) + } + if findingsCount != 3 { + t.Fatalf("expected 3 council findings, got %d", findingsCount) + } + var groupsCount int + if err := sqlDB.QueryRowContext(t.Context(), `SELECT COUNT(*) FROM council_groups WHERE run_id = ?`, "council_blog_tally_001").Scan(&groupsCount); err != nil { + t.Fatalf("count council_groups: %v", err) + } + if groupsCount != 2 { + t.Fatalf("expected 2 council groups, got %d", groupsCount) + } +} + +func TestOrchCouncilTallyStrictKeepsDistinctProposals(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + + runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "start", + "--run", "council_blog_tally_002", + "--target", "Review the current blog architecture.", + ) + + completeCouncilReviewer( + t, + dbPath, + "council_blog_tally_002", + "architecture-reviewer", + `{"reviewer_role":"architecture-reviewer","findings":[{"title":"Split contracts","summary":"Transport contracts are mixed into UI code.","proposal":"Move API contract definitions into a dedicated module.","rationale":"This lowers coupling.","confidence":"high","tags":["architecture"],"target_refs":{"repo_path":"."}}]}`, + ) + completeCouncilReviewer( + t, + dbPath, + "council_blog_tally_002", + "implementation-reviewer", + `{"reviewer_role":"implementation-reviewer","findings":[{"title":"Extract API contracts","summary":"Shared transport shapes are duplicated.","proposal":"Move API contract definitions into dedicated module","rationale":"This reduces duplication.","confidence":"medium","tags":["maintainability"],"target_refs":{"repo_path":"."}}]}`, + ) + completeCouncilReviewer( + t, + dbPath, + "council_blog_tally_002", + "risk-reviewer", + `{"reviewer_role":"risk-reviewer","findings":[{"title":"Add auth integration tests","summary":"Login regressions are hard to catch.","proposal":"Add integration tests for auth flows.","rationale":"This catches regressions earlier.","confidence":"high","tags":["risk"],"target_refs":{"repo_path":"."}}]}`, + ) + + tallyOut := runOrchCommand( + t, + "--db", dbPath, + "--json", + "council", "tally", + "--run", "council_blog_tally_002", + "--similarity", "strict", + ) + + var tallyResp map[string]any + mustDecodeJSON(t, tallyOut, &tallyResp) + counts, ok := nestedValue(t, tallyResp, "data", "counts").(map[string]any) + if !ok { + t.Fatalf("expected counts object, got %#v", nestedValue(t, tallyResp, "data", "counts")) + } + if got, _ := counts["minority"].(float64); got != 3 { + t.Fatalf("expected three minority groups in strict mode, got %#v", counts["minority"]) + } + groups := nestedArray(t, tallyResp, "data", "grouped_recommendations") + if len(groups) != 3 { + t.Fatalf("expected three distinct groups in strict mode, got %#v", groups) + } +} + +func completeCouncilReviewer(t *testing.T, dbPath, runID, reviewerRole, bodyJSON string) { + t.Helper() + + sqlDB, err := openOrchDB(t.Context(), dbPath) + if err != nil { + t.Fatalf("open orch db: %v", err) + } + + var threadID string + if err := sqlDB.QueryRowContext( + t.Context(), + `SELECT a.thread_id + FROM council_reviewers cr + JOIN task_attempts a + ON a.run_id = cr.run_id + AND a.task_id = cr.task_id + AND a.attempt_no = 1 + WHERE cr.run_id = ? AND cr.reviewer_role = ?`, + runID, + reviewerRole, + ).Scan(&threadID); err != nil { + sqlDB.Close() + t.Fatalf("query council reviewer thread: %v", err) + } + sqlDB.Close() + + runInboxCommand( + t, + "--db", dbPath, + "--json", + "claim", + "--agent", reviewerRole, + "--thread", threadID, + ) + runInboxCommand( + t, + "--db", dbPath, + "--json", + "done", + "--agent", reviewerRole, + "--thread", threadID, + "--summary", "Review complete", + "--body", bodyJSON, + ) +} + func runInboxCommandEventually(t *testing.T, args ...string) string { t.Helper() diff --git a/internal/store/council.go b/internal/store/council.go index 6df3129..eebc5d0 100644 --- a/internal/store/council.go +++ b/internal/store/council.go @@ -3,8 +3,10 @@ package store import ( "context" "database/sql" + "encoding/json" "errors" "fmt" + "sort" "strings" "time" ) @@ -67,6 +69,59 @@ type CouncilWaitResult struct { ReviewerStatuses []CouncilReviewer `json:"reviewers"` } +type CouncilFinding struct { + RunID string `json:"run_id"` + ReviewerRole string `json:"reviewer_role"` + FindingID string `json:"finding_id"` + Title string `json:"title"` + Summary string `json:"summary"` + Proposal string `json:"proposal"` + Rationale string `json:"rationale"` + Confidence string `json:"confidence"` + TagsJSON json.RawMessage `json:"tags_json"` + TargetRefsJSON json.RawMessage `json:"target_refs_json"` +} + +type CouncilGroup struct { + RunID string `json:"run_id"` + GroupID string `json:"group_id"` + Proposal string `json:"proposal"` + Bucket string `json:"bucket"` + SupportCount int `json:"support_count"` + SupportersJSON json.RawMessage `json:"supporters_json"` + DissentersJSON json.RawMessage `json:"dissenters_json"` + RationaleSummary string `json:"rationale_summary"` + TagsJSON json.RawMessage `json:"tags_json"` + SourceFindingIDsJSON json.RawMessage `json:"source_finding_ids_json"` +} + +type CouncilTallyInput struct { + RunID string + Similarity string +} + +type CouncilTallyResult struct { + RunID string `json:"run_id"` + Similarity string `json:"similarity"` + Counts map[string]int `json:"counts"` + GroupedRecommendations []CouncilGroup `json:"grouped_recommendations"` +} + +type councilReviewerOutput struct { + ReviewerRole string `json:"reviewer_role"` + Findings []councilFindingOutput `json:"findings"` +} + +type councilFindingOutput struct { + Title string `json:"title"` + Summary string `json:"summary"` + Proposal string `json:"proposal"` + Rationale string `json:"rationale"` + Confidence string `json:"confidence"` + Tags json.RawMessage `json:"tags"` + TargetRefs json.RawMessage `json:"target_refs"` +} + func (s *OrchStore) StartCouncil(ctx context.Context, input CouncilStartInput) (CouncilStartResult, error) { runID := strings.TrimSpace(input.RunID) if runID == "" { @@ -621,3 +676,411 @@ func (s *OrchStore) GetCouncilReviewerStatuses(ctx context.Context, runID string return reviewers, allComplete, nil } + +func (s *OrchStore) TallyCouncil(ctx context.Context, input CouncilTallyInput) (CouncilTallyResult, error) { + runID := strings.TrimSpace(input.RunID) + if runID == "" { + return CouncilTallyResult{}, fmt.Errorf("%w: run id is required", ErrInvalidInput) + } + + similarity := defaultString(strings.TrimSpace(input.Similarity), "normal") + if similarity != "normal" && similarity != "strict" { + return CouncilTallyResult{}, fmt.Errorf("%w: similarity must be strict or normal", ErrInvalidInput) + } + + if _, err := s.GetCouncilRun(ctx, runID); err != nil { + return CouncilTallyResult{}, err + } + + if _, err := s.ReconcileRun(ctx, runID); err != nil && !isSQLiteBusyError(err) { + return CouncilTallyResult{}, err + } + + reviewers, allComplete, err := s.GetCouncilReviewerStatuses(ctx, runID) + if err != nil { + return CouncilTallyResult{}, err + } + if !allComplete { + return CouncilTallyResult{}, fmt.Errorf("%w: council reviewers are not complete yet", ErrInvalidState) + } + + findings, err := s.collectCouncilFindings(ctx, runID, reviewers) + if err != nil { + return CouncilTallyResult{}, err + } + groups := groupCouncilFindings(runID, findings, reviewers, similarity) + counts := make(map[string]int) + for _, group := range groups { + counts[group.Bucket]++ + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return CouncilTallyResult{}, fmt.Errorf("begin council tally transaction: %w", err) + } + defer tx.Rollback() + + if _, err := tx.ExecContext(ctx, `DELETE FROM council_findings WHERE run_id = ?`, runID); err != nil { + return CouncilTallyResult{}, fmt.Errorf("clear council findings: %w", err) + } + if _, err := tx.ExecContext(ctx, `DELETE FROM council_groups WHERE run_id = ?`, runID); err != nil { + return CouncilTallyResult{}, fmt.Errorf("clear council groups: %w", err) + } + + for _, finding := range findings { + if _, err := tx.ExecContext( + ctx, + `INSERT INTO council_findings ( + run_id, reviewer_role, finding_id, title, summary, proposal, rationale, + confidence, tags_json, target_refs_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + finding.RunID, + finding.ReviewerRole, + finding.FindingID, + finding.Title, + finding.Summary, + finding.Proposal, + finding.Rationale, + finding.Confidence, + string(finding.TagsJSON), + string(finding.TargetRefsJSON), + ); err != nil { + return CouncilTallyResult{}, fmt.Errorf("insert council finding: %w", err) + } + } + + for _, group := range groups { + if _, err := tx.ExecContext( + ctx, + `INSERT INTO council_groups ( + run_id, group_id, proposal, bucket, support_count, supporters_json, + dissenters_json, rationale_summary, tags_json, source_finding_ids_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + group.RunID, + group.GroupID, + group.Proposal, + group.Bucket, + group.SupportCount, + string(group.SupportersJSON), + string(group.DissentersJSON), + group.RationaleSummary, + string(group.TagsJSON), + string(group.SourceFindingIDsJSON), + ); err != nil { + return CouncilTallyResult{}, fmt.Errorf("insert council group: %w", err) + } + } + + if err := insertEvent(ctx, tx, eventInput{ + RunID: runID, + Source: "orch", + EventType: "council_tallied", + Summary: "council recommendations grouped", + PayloadJSON: marshalJSON(map[string]any{ + "similarity": similarity, + "counts": counts, + }), + CreatedAt: nowUTC(), + }); err != nil { + return CouncilTallyResult{}, err + } + + if err := tx.Commit(); err != nil { + return CouncilTallyResult{}, fmt.Errorf("commit council tally transaction: %w", err) + } + + return CouncilTallyResult{ + RunID: runID, + Similarity: similarity, + Counts: counts, + GroupedRecommendations: groups, + }, nil +} + +func (s *OrchStore) collectCouncilFindings(ctx context.Context, runID string, reviewers []CouncilReviewer) ([]CouncilFinding, error) { + findings := make([]CouncilFinding, 0) + for _, reviewer := range reviewers { + if reviewer.Status != "done" { + return nil, fmt.Errorf("%w: reviewer %s did not finish successfully", ErrInvalidState, reviewer.ReviewerRole) + } + + message, err := s.loadCouncilReviewerResultMessage(ctx, runID, reviewer.TaskID) + if err != nil { + return nil, err + } + output, err := parseCouncilReviewerOutput(reviewer.ReviewerRole, message) + if err != nil { + return nil, err + } + for i, finding := range output.Findings { + tagsJSON, err := normalizeOptionalJSONArray(finding.Tags) + if err != nil { + return nil, fmt.Errorf("%w: reviewer %s finding %d tags must be a JSON array", ErrInvalidInput, reviewer.ReviewerRole, i+1) + } + targetRefsJSON, err := normalizeOptionalJSONObject(finding.TargetRefs) + if err != nil { + return nil, fmt.Errorf("%w: reviewer %s finding %d target_refs must be a JSON object", ErrInvalidInput, reviewer.ReviewerRole, i+1) + } + confidence := strings.TrimSpace(finding.Confidence) + switch confidence { + case "low", "medium", "high": + default: + return nil, fmt.Errorf("%w: reviewer %s finding %d confidence must be low, medium, or high", ErrInvalidInput, reviewer.ReviewerRole, i+1) + } + if strings.TrimSpace(finding.Proposal) == "" { + return nil, fmt.Errorf("%w: reviewer %s finding %d proposal is required", ErrInvalidInput, reviewer.ReviewerRole, i+1) + } + findings = append(findings, CouncilFinding{ + RunID: runID, + ReviewerRole: reviewer.ReviewerRole, + FindingID: fmt.Sprintf("f%02d", i+1), + Title: strings.TrimSpace(finding.Title), + Summary: strings.TrimSpace(finding.Summary), + Proposal: strings.TrimSpace(finding.Proposal), + Rationale: strings.TrimSpace(finding.Rationale), + Confidence: confidence, + TagsJSON: json.RawMessage(tagsJSON), + TargetRefsJSON: json.RawMessage(targetRefsJSON), + }) + } + } + return findings, nil +} + +func (s *OrchStore) loadCouncilReviewerResultMessage(ctx context.Context, runID, taskID string) (Message, error) { + task, err := selectTask(ctx, s.db, runID, taskID) + if err != nil { + return Message{}, err + } + if task.LatestAttemptNo == 0 { + return Message{}, fmt.Errorf("%w: reviewer task %s has no attempt", ErrInvalidState, taskID) + } + + attempt, err := selectAttempt(ctx, s.db, runID, taskID, task.LatestAttemptNo) + if err != nil { + return Message{}, err + } + + row := s.db.QueryRowContext( + ctx, + `SELECT + message_id, thread_id, from_agent, to_agent, kind, summary, body, + payload_json, created_at + FROM messages + WHERE thread_id = ? AND kind = 'result' + ORDER BY created_at DESC + LIMIT 1`, + attempt.ThreadID, + ) + message, err := scanMessage(row) + if errors.Is(err, sql.ErrNoRows) { + return Message{}, fmt.Errorf("%w: reviewer task %s has no result message", ErrInvalidState, taskID) + } + if err != nil { + return Message{}, err + } + return message, nil +} + +func parseCouncilReviewerOutput(expectedRole string, message Message) (councilReviewerOutput, error) { + candidates := []string{strings.TrimSpace(message.Body), strings.TrimSpace(string(message.PayloadJSON))} + var lastErr error + for _, candidate := range candidates { + if candidate == "" || candidate == "{}" { + continue + } + var output councilReviewerOutput + if err := json.Unmarshal([]byte(candidate), &output); err != nil { + lastErr = err + continue + } + if strings.TrimSpace(output.ReviewerRole) == "" { + return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output must include reviewer_role", ErrInvalidInput) + } + if output.ReviewerRole != expectedRole { + return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output role %s does not match expected %s", ErrInvalidInput, output.ReviewerRole, expectedRole) + } + return output, nil + } + if lastErr != nil { + return councilReviewerOutput{}, fmt.Errorf("%w: reviewer output must be valid JSON", ErrInvalidInput) + } + return councilReviewerOutput{}, fmt.Errorf("%w: reviewer result message did not contain council output JSON", ErrInvalidInput) +} + +func normalizeOptionalJSONArray(raw json.RawMessage) (string, error) { + if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { + return "[]", nil + } + var value []any + if err := json.Unmarshal(raw, &value); err != nil { + return "", err + } + return marshalJSON(value), nil +} + +func normalizeOptionalJSONObject(raw json.RawMessage) (string, error) { + if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { + return "{}", nil + } + var value map[string]any + if err := json.Unmarshal(raw, &value); err != nil { + return "", err + } + return marshalJSON(value), nil +} + +func groupCouncilFindings(runID string, findings []CouncilFinding, reviewers []CouncilReviewer, similarity string) []CouncilGroup { + type groupedFinding struct { + key string + proposal string + findings []CouncilFinding + } + + order := make([]string, 0) + groupsByKey := make(map[string]*groupedFinding) + for _, finding := range findings { + key := councilProposalGroupKey(finding.Proposal, similarity) + group, ok := groupsByKey[key] + if !ok { + group = &groupedFinding{ + key: key, + proposal: finding.Proposal, + } + groupsByKey[key] = group + order = append(order, key) + } + group.findings = append(group.findings, finding) + } + + sortedReviewers := make([]string, 0, len(reviewers)) + for _, reviewer := range reviewers { + sortedReviewers = append(sortedReviewers, reviewer.ReviewerRole) + } + sort.Strings(sortedReviewers) + + result := make([]CouncilGroup, 0, len(order)) + for idx, key := range order { + group := groupsByKey[key] + supporterSet := make(map[string]struct{}) + tagSet := make(map[string]struct{}) + sourceFindingIDs := make([]string, 0, len(group.findings)) + rationaleSummary := "" + + for _, finding := range group.findings { + supporterSet[finding.ReviewerRole] = struct{}{} + sourceFindingIDs = append(sourceFindingIDs, finding.ReviewerRole+":"+finding.FindingID) + if rationaleSummary == "" && finding.Rationale != "" { + rationaleSummary = finding.Rationale + } + + var tags []string + if len(finding.TagsJSON) > 0 { + _ = json.Unmarshal(finding.TagsJSON, &tags) + } + for _, tag := range tags { + tag = strings.TrimSpace(tag) + if tag != "" { + tagSet[tag] = struct{}{} + } + } + } + + supporters := make([]string, 0, len(supporterSet)) + for _, reviewer := range sortedReviewers { + if _, ok := supporterSet[reviewer]; ok { + supporters = append(supporters, reviewer) + } + } + dissenters := make([]string, 0, len(sortedReviewers)-len(supporters)) + for _, reviewer := range sortedReviewers { + if _, ok := supporterSet[reviewer]; !ok { + dissenters = append(dissenters, reviewer) + } + } + + tags := make([]string, 0, len(tagSet)) + for tag := range tagSet { + tags = append(tags, tag) + } + sort.Strings(tags) + sort.Strings(sourceFindingIDs) + + supportCount := len(supporters) + bucket := "minority" + if supportCount == 3 { + bucket = "consensus" + } else if supportCount == 2 { + bucket = "majority" + } + + result = append(result, CouncilGroup{ + RunID: runID, + GroupID: fmt.Sprintf("grp_%02d", idx+1), + Proposal: group.proposal, + Bucket: bucket, + SupportCount: supportCount, + SupportersJSON: json.RawMessage(marshalJSON(supporters)), + DissentersJSON: json.RawMessage(marshalJSON(dissenters)), + RationaleSummary: rationaleSummary, + TagsJSON: json.RawMessage(marshalJSON(tags)), + SourceFindingIDsJSON: json.RawMessage(marshalJSON(sourceFindingIDs)), + }) + } + + sort.SliceStable(result, func(i, j int) bool { + if result[i].SupportCount != result[j].SupportCount { + return result[i].SupportCount > result[j].SupportCount + } + return result[i].Proposal < result[j].Proposal + }) + for i := range result { + result[i].GroupID = fmt.Sprintf("grp_%02d", i+1) + } + return result +} + +func councilProposalGroupKey(proposal, similarity string) string { + tokens := proposalTokens(proposal) + if similarity == "strict" { + return strings.Join(tokens, " ") + } + + stopWords := map[string]struct{}{ + "a": {}, "an": {}, "the": {}, "to": {}, "into": {}, "and": {}, "or": {}, "of": {}, "for": {}, "in": {}, "on": {}, "with": {}, "from": {}, "that": {}, "this": {}, "it": {}, "is": {}, "are": {}, "be": {}, "by": {}, "as": {}, "keep": {}, "use": {}, "add": {}, + } + set := make(map[string]struct{}) + filtered := make([]string, 0, len(tokens)) + for _, token := range tokens { + if _, stop := stopWords[token]; stop { + continue + } + if len(token) <= 2 { + continue + } + if _, seen := set[token]; seen { + continue + } + set[token] = struct{}{} + filtered = append(filtered, token) + } + sort.Strings(filtered) + if len(filtered) == 0 { + return strings.Join(tokens, " ") + } + return strings.Join(filtered, " ") +} + +func proposalTokens(value string) []string { + lower := strings.ToLower(strings.TrimSpace(value)) + fields := strings.FieldsFunc(lower, func(r rune) bool { + return !((r >= 'a' && r <= 'z') || (r >= '0' && r <= '9')) + }) + result := make([]string, 0, len(fields)) + for _, field := range fields { + if field != "" { + result = append(result, field) + } + } + return result +}