diff --git a/docs/implementation-roadmap.md b/docs/implementation-roadmap.md index e8383c0..ad1b5b6 100644 --- a/docs/implementation-roadmap.md +++ b/docs/implementation-roadmap.md @@ -40,6 +40,7 @@ As of now: - the first migration phase for the skill workspace monorepo is now complete: root `go.work` exists, `pnpm-workspace.yaml` now discovers `packages/*`, empty runtime module roots now exist under `packages/`, and a declarative `scripts/skill-bundles.json` plus `scripts/package_skill_runtimes.sh` scaffold now define package-oriented skill bundle metadata from the repo root - `packages/coord-core` now exists as the first real extracted runtime package, containing shared coordination DB/schema, protocol, and store code, and the active coordination runtimes now import `coord-core` instead of root `internal/db`, `internal/store`, and `internal/protocol` - `packages/inbox-runtime` and `packages/orch-runtime` now exist as package-owned runtimes with their own `cmd/` entrypoints and package-local CLI wiring/tests, and the root skill packaging flow now builds `skills/inbox`, `skills/orch`, and `skills/council-review` from package entrypoints instead of root `cmd/` paths +- `packages/orchd-runtime` now exists as the package-owned HTTP/query/web backend runtime, with package-local `cmd/orchd`, app, query, and HTTP transport code plus passing package-local tests - a repo-local `scripts/package_skill_clis.sh` packaging flow now builds bundled skill CLI assets for `inbox`, `orch`, and `council-review` - `orch` now implements `run init/show`, `task add`, `dep add`, `ready`, `dispatch`, `reconcile`, `wait`, `blocked`, `answer`, `retry`, `reassign`, `cancel`, `cleanup`, and `status` - `orch` can create runs, gate tasks through dependencies, dispatch work through `inbox`, reconcile worker thread state back into task state, answer blocked tasks, retry or reassign work, cancel tasks or runs, clean attempt worktrees, and create per-attempt Git worktrees during strict dispatch @@ -507,10 +508,10 @@ Completed so far: - `scripts/skill-bundles.json` now marks `inbox`, `orch`, and `council-review` as ready package-backed bundles - `scripts/package_skill_runtimes.sh package` now builds and installs `skills/inbox/assets/inbox`, `skills/orch/assets/orch`, and `skills/council-review/assets/orch` from package entrypoints - the legacy `scripts/package_skill_clis.sh` entrypoint now delegates to the declarative package-oriented packaging flow instead of hardcoding root `cmd/` paths +- `packages/orchd-runtime/cmd/orchd` plus `packages/orchd-runtime/internal/{app,httpapi,query}` now provide a package-owned web backend runtime and pass `go test ./...` Remaining: -- extract `orchd` into a package-owned runtime - import `repo-memory` as its own runtime package and add the corresponding skill bundle - graduate the bundle scaffold into the primary packaging flow once package-owned runtime entrypoints exist @@ -521,11 +522,11 @@ If a new agent is taking over now, the next concrete step should be: 1. treat `Milestone 9: Web Product Phase 2 Read-Only Operator UI` as complete for the initial operator surface and do not expand web feature scope further until the workspace split is decided package-by-package 2. treat the Phase 1 workspace bootstrap for `Milestone 10` as complete and keep the new `go.work`, `packages/`, and declarative bundle metadata as the baseline for all further migration steps 3. treat the shared coordination kernel extraction into `packages/coord-core` as complete and move `inbox` plus `orch` into package-owned runtimes next -4. treat `inbox-runtime` and `orch-runtime` as package-owned and move `orchd` into `packages/orchd-runtime` next so the web backend stops depending on root-owned runtime code -5. keep the authored skill forward-test plans under `docs/tests/*-skill/` synchronized as runtime ownership moves from root paths to package paths -6. import `repo-memory` only after the package-based runtime and skill packaging pattern exists +4. treat `inbox-runtime`, `orch-runtime`, and `orchd-runtime` as package-owned and import `repo-memory` next so the last planned runtime package joins the workspace +5. add `skills/repo-memory` and the corresponding forward-test plan once the runtime import is in place +6. keep the authored skill forward-test plans under `docs/tests/*-skill/` synchronized as runtime ownership moves from root paths to package paths -The inbox implementation and its human-readable test-plan set are already in place, `orch` supports the main scheduler loop plus the complete council start/wait/tally/report workflow, the web product now has its first real operator-facing read surfaces, and the repository has completed the workspace bootstrap, the shared coordination-kernel extraction, and the first package-owned runtime extraction phases of the skill monorepo migration, so the next step should be moving `orchd` and then importing `repo-memory`, not continuing to accrete new root-owned runtime paths. +The inbox implementation and its human-readable test-plan set are already in place, `orch` supports the main scheduler loop plus the complete council start/wait/tally/report workflow, the web product now has its first real operator-facing read surfaces, and the repository has completed the workspace bootstrap plus the shared coordination, inbox, orch, and orchd package-runtime extraction phases of the skill monorepo migration, so the next step should be importing `repo-memory` and then removing legacy root ownership, not continuing to accrete new root-owned runtime paths. ## Recommended Driver Choices diff --git a/docs/roadmaps/active/skill-workspace-monorepo-migration.md b/docs/roadmaps/active/skill-workspace-monorepo-migration.md index bccd186..91fbeee 100644 --- a/docs/roadmaps/active/skill-workspace-monorepo-migration.md +++ b/docs/roadmaps/active/skill-workspace-monorepo-migration.md @@ -28,7 +28,7 @@ - [x] Phase 1: bootstrap `go.work`, expanded workspace manifests, package roots, and declarative skill bundle metadata - [x] Phase 2: extract shared coordination code into `packages/coord-core` - [x] Phase 3: extract `inbox-runtime` and `orch-runtime` -- [ ] Phase 4: extract `orchd-runtime` +- [x] Phase 4: extract `orchd-runtime` - [ ] Phase 5: import `repo-memory-runtime` and add `skills/repo-memory` - [ ] Phase 6: remove root runtime ownership and normalize package-based packaging @@ -54,4 +54,4 @@ ## Next Step -- start Phase 4 by moving the HTTP/query/web backend runtime into `packages/orchd-runtime` on top of the extracted `coord-core`, `inbox-runtime`, and `orch-runtime` packages +- start Phase 5 by importing the exploratory `repo-memory` runtime into `packages/repo-memory-runtime`, adding `skills/repo-memory`, and wiring it into the declarative bundle packaging flow diff --git a/packages/orchd-runtime/cmd/orchd/main.go b/packages/orchd-runtime/cmd/orchd/main.go new file mode 100644 index 0000000..d07af22 --- /dev/null +++ b/packages/orchd-runtime/cmd/orchd/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "errors" + "flag" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "ai-workflow-skill/packages/orchd-runtime/internal/app" + "ai-workflow-skill/packages/coord-core/db" + "ai-workflow-skill/packages/orchd-runtime/internal/httpapi" +) + +func main() { + var ( + dbPath string + listen string + shutdown time.Duration + ) + + flag.StringVar(&dbPath, "db", ".agents/coord.db", "SQLite database path") + flag.StringVar(&listen, "listen", ":8080", "HTTP listen address") + flag.DurationVar(&shutdown, "shutdown-timeout", 5*time.Second, "Graceful shutdown timeout") + flag.Parse() + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + sqlDB, err := db.Open(ctx, dbPath) + if err != nil { + log.Fatalf("open database: %v", err) + } + defer sqlDB.Close() + + if err := db.ApplyMigrations(ctx, sqlDB); err != nil { + log.Fatalf("apply migrations: %v", err) + } + + webApp := app.NewWebService(sqlDB) + server := &http.Server{ + Addr: listen, + Handler: httpapi.NewRouter(webApp), + ReadHeaderTimeout: 5 * time.Second, + } + + go func() { + <-ctx.Done() + + shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdown) + defer cancel() + + if err := server.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Printf("http shutdown: %v", err) + } + }() + + log.Printf("orchd listening on %s", listen) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("serve http api: %v", err) + } +} diff --git a/packages/orchd-runtime/go.mod b/packages/orchd-runtime/go.mod index a50b07b..1830c81 100644 --- a/packages/orchd-runtime/go.mod +++ b/packages/orchd-runtime/go.mod @@ -1,3 +1,5 @@ module ai-workflow-skill/packages/orchd-runtime go 1.26 + +require github.com/go-chi/chi/v5 v5.2.5 diff --git a/packages/orchd-runtime/internal/app/web.go b/packages/orchd-runtime/internal/app/web.go new file mode 100644 index 0000000..56ce592 --- /dev/null +++ b/packages/orchd-runtime/internal/app/web.go @@ -0,0 +1,39 @@ +package app + +import ( + "context" + "database/sql" + + "ai-workflow-skill/packages/orchd-runtime/internal/query" + "ai-workflow-skill/packages/coord-core/store" +) + +type WebService struct { + reads *query.ReadService +} + +func NewWebService(db *sql.DB) *WebService { + return &WebService{ + reads: query.NewReadService(db), + } +} + +func (s *WebService) ListRuns(ctx context.Context) ([]query.RunListItem, error) { + return s.reads.ListRuns(ctx) +} + +func (s *WebService) GetRunDetail(ctx context.Context, runID string) (query.RunDetail, error) { + return s.reads.GetRunDetail(ctx, runID) +} + +func (s *WebService) ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) { + return s.reads.ListRunTasks(ctx, runID) +} + +func (s *WebService) ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) { + return s.reads.ListBlockedTasks(ctx, runID) +} + +func (s *WebService) GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) { + return s.reads.GetThreadDetail(ctx, threadID) +} diff --git a/packages/orchd-runtime/internal/httpapi/response.go b/packages/orchd-runtime/internal/httpapi/response.go new file mode 100644 index 0000000..1b3aa3e --- /dev/null +++ b/packages/orchd-runtime/internal/httpapi/response.go @@ -0,0 +1,58 @@ +package httpapi + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + + "ai-workflow-skill/packages/coord-core/store" +) + +type errorEnvelope struct { + Error errorPayload `json:"error"` +} + +type errorPayload struct { + Code string `json:"code"` + Message string `json:"message"` +} + +func writeJSON(w http.ResponseWriter, status int, payload any) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(status) + + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + _ = enc.Encode(payload) +} + +func writeError(w http.ResponseWriter, err error) { + status, code := classifyError(err) + writeJSON(w, status, errorEnvelope{ + Error: errorPayload{ + Code: code, + Message: errorMessage(err), + }, + }) +} + +func classifyError(err error) (int, string) { + switch { + case errors.Is(err, store.ErrInvalidInput): + return http.StatusBadRequest, "invalid_input" + case errors.Is(err, store.ErrRunNotFound), errors.Is(err, store.ErrTaskNotFound), errors.Is(err, store.ErrThreadNotFound): + return http.StatusNotFound, "not_found" + case errors.Is(err, store.ErrInvalidState): + return http.StatusConflict, "invalid_state" + default: + return http.StatusInternalServerError, "internal_error" + } +} + +func errorMessage(err error) string { + if err == nil { + return "unknown error" + } + return fmt.Sprintf("%v", err) +} diff --git a/packages/orchd-runtime/internal/httpapi/router.go b/packages/orchd-runtime/internal/httpapi/router.go new file mode 100644 index 0000000..633763f --- /dev/null +++ b/packages/orchd-runtime/internal/httpapi/router.go @@ -0,0 +1,87 @@ +package httpapi + +import ( + "context" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + + "ai-workflow-skill/packages/orchd-runtime/internal/query" + "ai-workflow-skill/packages/coord-core/store" +) + +type readService interface { + ListRuns(ctx context.Context) ([]query.RunListItem, error) + GetRunDetail(ctx context.Context, runID string) (query.RunDetail, error) + ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) + ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) + GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) +} + +func NewRouter(service readService) http.Handler { + router := chi.NewRouter() + router.Use(middleware.RequestID) + router.Use(middleware.Recoverer) + router.Use(middleware.Timeout(30 * time.Second)) + + router.Get("/health", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + }) + }) + + router.Route("/api", func(r chi.Router) { + r.Get("/runs", func(w http.ResponseWriter, r *http.Request) { + runs, err := service.ListRuns(r.Context()) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"runs": runs}) + }) + + r.Get("/runs/{runID}", func(w http.ResponseWriter, r *http.Request) { + runID := chi.URLParam(r, "runID") + run, err := service.GetRunDetail(r.Context(), runID) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"run": run}) + }) + + r.Get("/runs/{runID}/tasks", func(w http.ResponseWriter, r *http.Request) { + runID := chi.URLParam(r, "runID") + tasks, err := service.ListRunTasks(r.Context(), runID) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"tasks": tasks}) + }) + + r.Get("/runs/{runID}/blocked", func(w http.ResponseWriter, r *http.Request) { + runID := chi.URLParam(r, "runID") + blocked, err := service.ListBlockedTasks(r.Context(), runID) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"blocked": blocked}) + }) + + r.Get("/threads/{threadID}", func(w http.ResponseWriter, r *http.Request) { + threadID := chi.URLParam(r, "threadID") + thread, err := service.GetThreadDetail(r.Context(), threadID) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"thread": thread}) + }) + }) + + return router +} diff --git a/packages/orchd-runtime/internal/httpapi/router_test.go b/packages/orchd-runtime/internal/httpapi/router_test.go new file mode 100644 index 0000000..921d93e --- /dev/null +++ b/packages/orchd-runtime/internal/httpapi/router_test.go @@ -0,0 +1,196 @@ +package httpapi + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + "ai-workflow-skill/packages/orchd-runtime/internal/app" + dbpkg "ai-workflow-skill/packages/coord-core/db" + "ai-workflow-skill/packages/coord-core/store" +) + +func TestRouterExposesReadOnlyWebEndpoints(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + sqlDB, err := dbpkg.Open(ctx, dbPath) + if err != nil { + t.Fatalf("open db: %v", err) + } + defer sqlDB.Close() + + if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil { + t.Fatalf("apply migrations: %v", err) + } + + orchStore := store.NewOrchStore(sqlDB) + inboxStore := store.NewInboxStore(sqlDB) + + _, err = orchStore.CreateRun(ctx, store.CreateRunInput{ + RunID: "run_web_001", + Goal: "Build the web control plane", + Summary: "Initial HTTP slice", + }) + if err != nil { + t.Fatalf("create run: %v", err) + } + + _, err = orchStore.AddTask(ctx, store.AddTaskInput{ + RunID: "run_web_001", + TaskID: "T1", + Title: "Implement read API", + Summary: "Expose run state over HTTP", + DefaultTo: "worker-a", + }) + if err != nil { + t.Fatalf("add task T1: %v", err) + } + + _, err = orchStore.AddTask(ctx, store.AddTaskInput{ + RunID: "run_web_001", + TaskID: "T2", + Title: "Build React shell", + Summary: "Scaffold the frontend workspace", + DefaultTo: "worker-b", + }) + if err != nil { + t.Fatalf("add task T2: %v", err) + } + + dispatch, err := orchStore.DispatchTask(ctx, store.DispatchInput{ + RunID: "run_web_001", + TaskID: "T1", + ToAgent: "worker-a", + Body: "Expose the initial HTTP API.", + }) + if err != nil { + t.Fatalf("dispatch task: %v", err) + } + + if _, err := inboxStore.ClaimThread(ctx, store.ClaimInput{ + ThreadID: dispatch.Attempt.ThreadID, + Agent: "worker-a", + LeaseSeconds: 300, + }); err != nil { + t.Fatalf("claim thread: %v", err) + } + + if _, _, err := inboxStore.UpdateThreadStatus(ctx, store.UpdateInput{ + ThreadID: dispatch.Attempt.ThreadID, + Agent: "worker-a", + Status: "blocked", + Summary: "Need the API shape", + Body: "Confirm whether run detail should include blocked tasks.", + }); err != nil { + t.Fatalf("mark thread blocked: %v", err) + } + + if _, err := orchStore.ReconcileRun(ctx, "run_web_001"); err != nil { + t.Fatalf("reconcile run: %v", err) + } + + handler := NewRouter(app.NewWebService(sqlDB)) + + assertStatusAndJSONField(t, handler, "/health", http.StatusOK, []string{"status"}, "ok") + assertStatusAndJSONField(t, handler, "/api/runs", http.StatusOK, []string{"runs", "0", "run", "run_id"}, "run_web_001") + assertStatusAndJSONField(t, handler, "/api/runs/run_web_001", http.StatusOK, []string{"run", "run", "run_id"}, "run_web_001") + assertStatusAndJSONField(t, handler, "/api/runs/run_web_001/tasks", http.StatusOK, []string{"tasks", "0", "task_id"}, "T1") + assertStatusAndJSONField(t, handler, "/api/runs/run_web_001/blocked", http.StatusOK, []string{"blocked", "0", "task", "task_id"}, "T1") + assertStatusAndJSONField(t, handler, "/api/threads/"+dispatch.Attempt.ThreadID, http.StatusOK, []string{"thread", "thread", "thread_id"}, dispatch.Attempt.ThreadID) +} + +func TestRouterMapsNotFoundErrors(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + dbPath := filepath.Join(t.TempDir(), "coord.db") + sqlDB, err := dbpkg.Open(ctx, dbPath) + if err != nil { + t.Fatalf("open db: %v", err) + } + defer sqlDB.Close() + + if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil { + t.Fatalf("apply migrations: %v", err) + } + + handler := NewRouter(app.NewWebService(sqlDB)) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/runs/missing-run", nil) + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", rec.Code) + } + + var payload map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode response: %v", err) + } + + code := nestedString(t, payload, "error", "code") + if code != "not_found" { + t.Fatalf("expected not_found error code, got %q", code) + } +} + +func assertStatusAndJSONField(t *testing.T, handler http.Handler, path string, wantStatus int, fieldPath []string, want string) { + t.Helper() + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, path, nil) + handler.ServeHTTP(rec, req) + + if rec.Code != wantStatus { + t.Fatalf("GET %s: expected status %d, got %d", path, wantStatus, rec.Code) + } + + var payload map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("GET %s: decode response: %v", path, err) + } + + got := nestedString(t, payload, fieldPath...) + if got != want { + t.Fatalf("GET %s: expected %q at %v, got %q", path, want, fieldPath, got) + } +} + +func nestedString(t *testing.T, value any, path ...string) string { + t.Helper() + + current := value + for _, part := range path { + switch typed := current.(type) { + case map[string]any: + current = typed[part] + case []any: + if len(part) != 1 || part[0] < '0' || part[0] > '9' { + t.Fatalf("path segment %q is not a numeric index", part) + } + index := int(part[0] - '0') + if index >= len(typed) { + t.Fatalf("index %d out of range for path %v", index, path) + } + current = typed[index] + default: + t.Fatalf("unsupported type %T at path %v", current, path) + } + } + + got, ok := current.(string) + if !ok { + t.Fatalf("expected string at path %v, got %T", path, current) + } + return got +} diff --git a/packages/orchd-runtime/internal/query/read_service.go b/packages/orchd-runtime/internal/query/read_service.go new file mode 100644 index 0000000..bc4093d --- /dev/null +++ b/packages/orchd-runtime/internal/query/read_service.go @@ -0,0 +1,211 @@ +package query + +import ( + "context" + "database/sql" + "fmt" + "time" + + "ai-workflow-skill/packages/coord-core/store" +) + +type ReadService struct { + db *sql.DB + orch *store.OrchStore + inbox *store.InboxStore +} + +type RunListItem struct { + Run store.Run `json:"run"` + TaskCounts map[string]int `json:"task_counts"` + TotalTasks int `json:"total_tasks"` +} + +type RunDetail struct { + Run store.Run `json:"run"` + TaskCounts map[string]int `json:"task_counts"` + TotalTasks int `json:"total_tasks"` + Tasks []store.Task `json:"tasks"` + BlockedTasks []store.BlockedTask `json:"blocked_tasks"` +} + +func NewReadService(db *sql.DB) *ReadService { + return &ReadService{ + db: db, + orch: store.NewOrchStore(db), + inbox: store.NewInboxStore(db), + } +} + +func (s *ReadService) ListRuns(ctx context.Context) ([]RunListItem, error) { + rows, err := s.db.QueryContext( + ctx, + `SELECT run_id, goal, summary, status, created_at, updated_at + FROM runs + ORDER BY updated_at DESC, created_at DESC`, + ) + if err != nil { + return nil, fmt.Errorf("query runs: %w", err) + } + defer rows.Close() + + var runs []store.Run + runIDs := make([]string, 0) + for rows.Next() { + var ( + run store.Run + createdAt, updated string + ) + if err := rows.Scan( + &run.RunID, + &run.Goal, + &run.Summary, + &run.Status, + &createdAt, + &updated, + ); err != nil { + return nil, fmt.Errorf("scan run list row: %w", err) + } + + run.CreatedAt = parseRFC3339(createdAt) + run.UpdatedAt = parseRFC3339(updated) + runs = append(runs, run) + runIDs = append(runIDs, run.RunID) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate runs: %w", err) + } + + countsByRunID, err := s.collectTaskCounts(ctx, runIDs) + if err != nil { + return nil, err + } + + items := make([]RunListItem, 0, len(runs)) + for _, run := range runs { + taskCounts := countsByRunID[run.RunID] + if taskCounts == nil { + taskCounts = map[string]int{} + } + + items = append(items, RunListItem{ + Run: run, + TaskCounts: taskCounts, + TotalTasks: totalTasks(taskCounts), + }) + } + + return items, nil +} + +func (s *ReadService) GetRunDetail(ctx context.Context, runID string) (RunDetail, error) { + overview, err := s.orch.GetRunOverview(ctx, runID) + if err != nil { + return RunDetail{}, err + } + + blocked, err := s.orch.ListBlockedTasks(ctx, runID) + if err != nil { + return RunDetail{}, err + } + + return RunDetail{ + Run: overview.Run, + TaskCounts: overview.TaskCounts, + TotalTasks: totalTasks(overview.TaskCounts), + Tasks: overview.Tasks, + BlockedTasks: blocked, + }, nil +} + +func (s *ReadService) ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) { + detail, err := s.GetRunDetail(ctx, runID) + if err != nil { + return nil, err + } + return detail.Tasks, nil +} + +func (s *ReadService) ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) { + return s.orch.ListBlockedTasks(ctx, runID) +} + +func (s *ReadService) GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) { + return s.inbox.GetThread(ctx, threadID) +} + +func (s *ReadService) collectTaskCounts(ctx context.Context, runIDs []string) (map[string]map[string]int, error) { + result := make(map[string]map[string]int, len(runIDs)) + if len(runIDs) == 0 { + return result, nil + } + + args := make([]any, 0, len(runIDs)) + for _, runID := range runIDs { + args = append(args, runID) + } + + rows, err := s.db.QueryContext( + ctx, + `SELECT run_id, status, COUNT(*) + FROM tasks + WHERE run_id IN (`+placeholders(len(runIDs))+`) + GROUP BY run_id, status`, + args..., + ) + if err != nil { + return nil, fmt.Errorf("query task counts for runs: %w", err) + } + defer rows.Close() + + for rows.Next() { + var ( + runID string + status string + count int + ) + if err := rows.Scan(&runID, &status, &count); err != nil { + return nil, fmt.Errorf("scan run task count: %w", err) + } + if result[runID] == nil { + result[runID] = make(map[string]int) + } + result[runID][status] = count + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate run task counts: %w", err) + } + + return result, nil +} + +func totalTasks(counts map[string]int) int { + total := 0 + for _, count := range counts { + total += count + } + return total +} + +func placeholders(count int) string { + if count <= 0 { + return "" + } + + buf := make([]byte, 0, count*2-1) + for i := 0; i < count; i++ { + if i > 0 { + buf = append(buf, ',') + } + buf = append(buf, '?') + } + return string(buf) +} + +func parseRFC3339(value string) time.Time { + parsed, err := time.Parse(time.RFC3339Nano, value) + if err != nil { + return time.Time{} + } + return parsed +}