refactor(monorepo): extract orchd runtime
This commit is contained in:
@@ -40,6 +40,7 @@ As of now:
|
|||||||
- the first migration phase for the skill workspace monorepo is now complete: root `go.work` exists, `pnpm-workspace.yaml` now discovers `packages/*`, empty runtime module roots now exist under `packages/`, and a declarative `scripts/skill-bundles.json` plus `scripts/package_skill_runtimes.sh` scaffold now define package-oriented skill bundle metadata from the repo root
|
- the first migration phase for the skill workspace monorepo is now complete: root `go.work` exists, `pnpm-workspace.yaml` now discovers `packages/*`, empty runtime module roots now exist under `packages/`, and a declarative `scripts/skill-bundles.json` plus `scripts/package_skill_runtimes.sh` scaffold now define package-oriented skill bundle metadata from the repo root
|
||||||
- `packages/coord-core` now exists as the first real extracted runtime package, containing shared coordination DB/schema, protocol, and store code, and the active coordination runtimes now import `coord-core` instead of root `internal/db`, `internal/store`, and `internal/protocol`
|
- `packages/coord-core` now exists as the first real extracted runtime package, containing shared coordination DB/schema, protocol, and store code, and the active coordination runtimes now import `coord-core` instead of root `internal/db`, `internal/store`, and `internal/protocol`
|
||||||
- `packages/inbox-runtime` and `packages/orch-runtime` now exist as package-owned runtimes with their own `cmd/` entrypoints and package-local CLI wiring/tests, and the root skill packaging flow now builds `skills/inbox`, `skills/orch`, and `skills/council-review` from package entrypoints instead of root `cmd/` paths
|
- `packages/inbox-runtime` and `packages/orch-runtime` now exist as package-owned runtimes with their own `cmd/` entrypoints and package-local CLI wiring/tests, and the root skill packaging flow now builds `skills/inbox`, `skills/orch`, and `skills/council-review` from package entrypoints instead of root `cmd/` paths
|
||||||
|
- `packages/orchd-runtime` now exists as the package-owned HTTP/query/web backend runtime, with package-local `cmd/orchd`, app, query, and HTTP transport code plus passing package-local tests
|
||||||
- a repo-local `scripts/package_skill_clis.sh` packaging flow now builds bundled skill CLI assets for `inbox`, `orch`, and `council-review`
|
- a repo-local `scripts/package_skill_clis.sh` packaging flow now builds bundled skill CLI assets for `inbox`, `orch`, and `council-review`
|
||||||
- `orch` now implements `run init/show`, `task add`, `dep add`, `ready`, `dispatch`, `reconcile`, `wait`, `blocked`, `answer`, `retry`, `reassign`, `cancel`, `cleanup`, and `status`
|
- `orch` now implements `run init/show`, `task add`, `dep add`, `ready`, `dispatch`, `reconcile`, `wait`, `blocked`, `answer`, `retry`, `reassign`, `cancel`, `cleanup`, and `status`
|
||||||
- `orch` can create runs, gate tasks through dependencies, dispatch work through `inbox`, reconcile worker thread state back into task state, answer blocked tasks, retry or reassign work, cancel tasks or runs, clean attempt worktrees, and create per-attempt Git worktrees during strict dispatch
|
- `orch` can create runs, gate tasks through dependencies, dispatch work through `inbox`, reconcile worker thread state back into task state, answer blocked tasks, retry or reassign work, cancel tasks or runs, clean attempt worktrees, and create per-attempt Git worktrees during strict dispatch
|
||||||
@@ -507,10 +508,10 @@ Completed so far:
|
|||||||
- `scripts/skill-bundles.json` now marks `inbox`, `orch`, and `council-review` as ready package-backed bundles
|
- `scripts/skill-bundles.json` now marks `inbox`, `orch`, and `council-review` as ready package-backed bundles
|
||||||
- `scripts/package_skill_runtimes.sh package` now builds and installs `skills/inbox/assets/inbox`, `skills/orch/assets/orch`, and `skills/council-review/assets/orch` from package entrypoints
|
- `scripts/package_skill_runtimes.sh package` now builds and installs `skills/inbox/assets/inbox`, `skills/orch/assets/orch`, and `skills/council-review/assets/orch` from package entrypoints
|
||||||
- the legacy `scripts/package_skill_clis.sh` entrypoint now delegates to the declarative package-oriented packaging flow instead of hardcoding root `cmd/` paths
|
- the legacy `scripts/package_skill_clis.sh` entrypoint now delegates to the declarative package-oriented packaging flow instead of hardcoding root `cmd/` paths
|
||||||
|
- `packages/orchd-runtime/cmd/orchd` plus `packages/orchd-runtime/internal/{app,httpapi,query}` now provide a package-owned web backend runtime and pass `go test ./...`
|
||||||
|
|
||||||
Remaining:
|
Remaining:
|
||||||
|
|
||||||
- extract `orchd` into a package-owned runtime
|
|
||||||
- import `repo-memory` as its own runtime package and add the corresponding skill bundle
|
- import `repo-memory` as its own runtime package and add the corresponding skill bundle
|
||||||
- graduate the bundle scaffold into the primary packaging flow once package-owned runtime entrypoints exist
|
- graduate the bundle scaffold into the primary packaging flow once package-owned runtime entrypoints exist
|
||||||
|
|
||||||
@@ -521,11 +522,11 @@ If a new agent is taking over now, the next concrete step should be:
|
|||||||
1. treat `Milestone 9: Web Product Phase 2 Read-Only Operator UI` as complete for the initial operator surface and do not expand web feature scope further until the workspace split is decided package-by-package
|
1. treat `Milestone 9: Web Product Phase 2 Read-Only Operator UI` as complete for the initial operator surface and do not expand web feature scope further until the workspace split is decided package-by-package
|
||||||
2. treat the Phase 1 workspace bootstrap for `Milestone 10` as complete and keep the new `go.work`, `packages/`, and declarative bundle metadata as the baseline for all further migration steps
|
2. treat the Phase 1 workspace bootstrap for `Milestone 10` as complete and keep the new `go.work`, `packages/`, and declarative bundle metadata as the baseline for all further migration steps
|
||||||
3. treat the shared coordination kernel extraction into `packages/coord-core` as complete and move `inbox` plus `orch` into package-owned runtimes next
|
3. treat the shared coordination kernel extraction into `packages/coord-core` as complete and move `inbox` plus `orch` into package-owned runtimes next
|
||||||
4. treat `inbox-runtime` and `orch-runtime` as package-owned and move `orchd` into `packages/orchd-runtime` next so the web backend stops depending on root-owned runtime code
|
4. treat `inbox-runtime`, `orch-runtime`, and `orchd-runtime` as package-owned and import `repo-memory` next so the last planned runtime package joins the workspace
|
||||||
5. keep the authored skill forward-test plans under `docs/tests/*-skill/` synchronized as runtime ownership moves from root paths to package paths
|
5. add `skills/repo-memory` and the corresponding forward-test plan once the runtime import is in place
|
||||||
6. import `repo-memory` only after the package-based runtime and skill packaging pattern exists
|
6. keep the authored skill forward-test plans under `docs/tests/*-skill/` synchronized as runtime ownership moves from root paths to package paths
|
||||||
|
|
||||||
The inbox implementation and its human-readable test-plan set are already in place, `orch` supports the main scheduler loop plus the complete council start/wait/tally/report workflow, the web product now has its first real operator-facing read surfaces, and the repository has completed the workspace bootstrap, the shared coordination-kernel extraction, and the first package-owned runtime extraction phases of the skill monorepo migration, so the next step should be moving `orchd` and then importing `repo-memory`, not continuing to accrete new root-owned runtime paths.
|
The inbox implementation and its human-readable test-plan set are already in place, `orch` supports the main scheduler loop plus the complete council start/wait/tally/report workflow, the web product now has its first real operator-facing read surfaces, and the repository has completed the workspace bootstrap plus the shared coordination, inbox, orch, and orchd package-runtime extraction phases of the skill monorepo migration, so the next step should be importing `repo-memory` and then removing legacy root ownership, not continuing to accrete new root-owned runtime paths.
|
||||||
|
|
||||||
## Recommended Driver Choices
|
## Recommended Driver Choices
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@
|
|||||||
- [x] Phase 1: bootstrap `go.work`, expanded workspace manifests, package roots, and declarative skill bundle metadata
|
- [x] Phase 1: bootstrap `go.work`, expanded workspace manifests, package roots, and declarative skill bundle metadata
|
||||||
- [x] Phase 2: extract shared coordination code into `packages/coord-core`
|
- [x] Phase 2: extract shared coordination code into `packages/coord-core`
|
||||||
- [x] Phase 3: extract `inbox-runtime` and `orch-runtime`
|
- [x] Phase 3: extract `inbox-runtime` and `orch-runtime`
|
||||||
- [ ] Phase 4: extract `orchd-runtime`
|
- [x] Phase 4: extract `orchd-runtime`
|
||||||
- [ ] Phase 5: import `repo-memory-runtime` and add `skills/repo-memory`
|
- [ ] Phase 5: import `repo-memory-runtime` and add `skills/repo-memory`
|
||||||
- [ ] Phase 6: remove root runtime ownership and normalize package-based packaging
|
- [ ] Phase 6: remove root runtime ownership and normalize package-based packaging
|
||||||
|
|
||||||
@@ -54,4 +54,4 @@
|
|||||||
|
|
||||||
## Next Step
|
## Next Step
|
||||||
|
|
||||||
- start Phase 4 by moving the HTTP/query/web backend runtime into `packages/orchd-runtime` on top of the extracted `coord-core`, `inbox-runtime`, and `orch-runtime` packages
|
- start Phase 5 by importing the exploratory `repo-memory` runtime into `packages/repo-memory-runtime`, adding `skills/repo-memory`, and wiring it into the declarative bundle packaging flow
|
||||||
|
|||||||
@@ -0,0 +1,66 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"flag"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"ai-workflow-skill/packages/orchd-runtime/internal/app"
|
||||||
|
"ai-workflow-skill/packages/coord-core/db"
|
||||||
|
"ai-workflow-skill/packages/orchd-runtime/internal/httpapi"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var (
|
||||||
|
dbPath string
|
||||||
|
listen string
|
||||||
|
shutdown time.Duration
|
||||||
|
)
|
||||||
|
|
||||||
|
flag.StringVar(&dbPath, "db", ".agents/coord.db", "SQLite database path")
|
||||||
|
flag.StringVar(&listen, "listen", ":8080", "HTTP listen address")
|
||||||
|
flag.DurationVar(&shutdown, "shutdown-timeout", 5*time.Second, "Graceful shutdown timeout")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
sqlDB, err := db.Open(ctx, dbPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("open database: %v", err)
|
||||||
|
}
|
||||||
|
defer sqlDB.Close()
|
||||||
|
|
||||||
|
if err := db.ApplyMigrations(ctx, sqlDB); err != nil {
|
||||||
|
log.Fatalf("apply migrations: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
webApp := app.NewWebService(sqlDB)
|
||||||
|
server := &http.Server{
|
||||||
|
Addr: listen,
|
||||||
|
Handler: httpapi.NewRouter(webApp),
|
||||||
|
ReadHeaderTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdown)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := server.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
log.Printf("http shutdown: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Printf("orchd listening on %s", listen)
|
||||||
|
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
log.Fatalf("serve http api: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,5 @@
|
|||||||
module ai-workflow-skill/packages/orchd-runtime
|
module ai-workflow-skill/packages/orchd-runtime
|
||||||
|
|
||||||
go 1.26
|
go 1.26
|
||||||
|
|
||||||
|
require github.com/go-chi/chi/v5 v5.2.5
|
||||||
|
|||||||
@@ -0,0 +1,39 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"ai-workflow-skill/packages/orchd-runtime/internal/query"
|
||||||
|
"ai-workflow-skill/packages/coord-core/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WebService struct {
|
||||||
|
reads *query.ReadService
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWebService(db *sql.DB) *WebService {
|
||||||
|
return &WebService{
|
||||||
|
reads: query.NewReadService(db),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *WebService) ListRuns(ctx context.Context) ([]query.RunListItem, error) {
|
||||||
|
return s.reads.ListRuns(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *WebService) GetRunDetail(ctx context.Context, runID string) (query.RunDetail, error) {
|
||||||
|
return s.reads.GetRunDetail(ctx, runID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *WebService) ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) {
|
||||||
|
return s.reads.ListRunTasks(ctx, runID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *WebService) ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) {
|
||||||
|
return s.reads.ListBlockedTasks(ctx, runID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *WebService) GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) {
|
||||||
|
return s.reads.GetThreadDetail(ctx, threadID)
|
||||||
|
}
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package httpapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"ai-workflow-skill/packages/coord-core/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type errorEnvelope struct {
|
||||||
|
Error errorPayload `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type errorPayload struct {
|
||||||
|
Code string `json:"code"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeJSON(w http.ResponseWriter, status int, payload any) {
|
||||||
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
w.WriteHeader(status)
|
||||||
|
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
enc.SetIndent("", " ")
|
||||||
|
_ = enc.Encode(payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeError(w http.ResponseWriter, err error) {
|
||||||
|
status, code := classifyError(err)
|
||||||
|
writeJSON(w, status, errorEnvelope{
|
||||||
|
Error: errorPayload{
|
||||||
|
Code: code,
|
||||||
|
Message: errorMessage(err),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func classifyError(err error) (int, string) {
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, store.ErrInvalidInput):
|
||||||
|
return http.StatusBadRequest, "invalid_input"
|
||||||
|
case errors.Is(err, store.ErrRunNotFound), errors.Is(err, store.ErrTaskNotFound), errors.Is(err, store.ErrThreadNotFound):
|
||||||
|
return http.StatusNotFound, "not_found"
|
||||||
|
case errors.Is(err, store.ErrInvalidState):
|
||||||
|
return http.StatusConflict, "invalid_state"
|
||||||
|
default:
|
||||||
|
return http.StatusInternalServerError, "internal_error"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func errorMessage(err error) string {
|
||||||
|
if err == nil {
|
||||||
|
return "unknown error"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%v", err)
|
||||||
|
}
|
||||||
@@ -0,0 +1,87 @@
|
|||||||
|
package httpapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
|
|
||||||
|
"ai-workflow-skill/packages/orchd-runtime/internal/query"
|
||||||
|
"ai-workflow-skill/packages/coord-core/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type readService interface {
|
||||||
|
ListRuns(ctx context.Context) ([]query.RunListItem, error)
|
||||||
|
GetRunDetail(ctx context.Context, runID string) (query.RunDetail, error)
|
||||||
|
ListRunTasks(ctx context.Context, runID string) ([]store.Task, error)
|
||||||
|
ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error)
|
||||||
|
GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRouter(service readService) http.Handler {
|
||||||
|
router := chi.NewRouter()
|
||||||
|
router.Use(middleware.RequestID)
|
||||||
|
router.Use(middleware.Recoverer)
|
||||||
|
router.Use(middleware.Timeout(30 * time.Second))
|
||||||
|
|
||||||
|
router.Get("/health", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "ok",
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
router.Route("/api", func(r chi.Router) {
|
||||||
|
r.Get("/runs", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
runs, err := service.ListRuns(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"runs": runs})
|
||||||
|
})
|
||||||
|
|
||||||
|
r.Get("/runs/{runID}", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
runID := chi.URLParam(r, "runID")
|
||||||
|
run, err := service.GetRunDetail(r.Context(), runID)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"run": run})
|
||||||
|
})
|
||||||
|
|
||||||
|
r.Get("/runs/{runID}/tasks", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
runID := chi.URLParam(r, "runID")
|
||||||
|
tasks, err := service.ListRunTasks(r.Context(), runID)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"tasks": tasks})
|
||||||
|
})
|
||||||
|
|
||||||
|
r.Get("/runs/{runID}/blocked", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
runID := chi.URLParam(r, "runID")
|
||||||
|
blocked, err := service.ListBlockedTasks(r.Context(), runID)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"blocked": blocked})
|
||||||
|
})
|
||||||
|
|
||||||
|
r.Get("/threads/{threadID}", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
threadID := chi.URLParam(r, "threadID")
|
||||||
|
thread, err := service.GetThreadDetail(r.Context(), threadID)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"thread": thread})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
return router
|
||||||
|
}
|
||||||
@@ -0,0 +1,196 @@
|
|||||||
|
package httpapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"ai-workflow-skill/packages/orchd-runtime/internal/app"
|
||||||
|
dbpkg "ai-workflow-skill/packages/coord-core/db"
|
||||||
|
"ai-workflow-skill/packages/coord-core/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRouterExposesReadOnlyWebEndpoints(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||||
|
sqlDB, err := dbpkg.Open(ctx, dbPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open db: %v", err)
|
||||||
|
}
|
||||||
|
defer sqlDB.Close()
|
||||||
|
|
||||||
|
if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil {
|
||||||
|
t.Fatalf("apply migrations: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
orchStore := store.NewOrchStore(sqlDB)
|
||||||
|
inboxStore := store.NewInboxStore(sqlDB)
|
||||||
|
|
||||||
|
_, err = orchStore.CreateRun(ctx, store.CreateRunInput{
|
||||||
|
RunID: "run_web_001",
|
||||||
|
Goal: "Build the web control plane",
|
||||||
|
Summary: "Initial HTTP slice",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create run: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = orchStore.AddTask(ctx, store.AddTaskInput{
|
||||||
|
RunID: "run_web_001",
|
||||||
|
TaskID: "T1",
|
||||||
|
Title: "Implement read API",
|
||||||
|
Summary: "Expose run state over HTTP",
|
||||||
|
DefaultTo: "worker-a",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("add task T1: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = orchStore.AddTask(ctx, store.AddTaskInput{
|
||||||
|
RunID: "run_web_001",
|
||||||
|
TaskID: "T2",
|
||||||
|
Title: "Build React shell",
|
||||||
|
Summary: "Scaffold the frontend workspace",
|
||||||
|
DefaultTo: "worker-b",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("add task T2: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dispatch, err := orchStore.DispatchTask(ctx, store.DispatchInput{
|
||||||
|
RunID: "run_web_001",
|
||||||
|
TaskID: "T1",
|
||||||
|
ToAgent: "worker-a",
|
||||||
|
Body: "Expose the initial HTTP API.",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("dispatch task: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := inboxStore.ClaimThread(ctx, store.ClaimInput{
|
||||||
|
ThreadID: dispatch.Attempt.ThreadID,
|
||||||
|
Agent: "worker-a",
|
||||||
|
LeaseSeconds: 300,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("claim thread: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, _, err := inboxStore.UpdateThreadStatus(ctx, store.UpdateInput{
|
||||||
|
ThreadID: dispatch.Attempt.ThreadID,
|
||||||
|
Agent: "worker-a",
|
||||||
|
Status: "blocked",
|
||||||
|
Summary: "Need the API shape",
|
||||||
|
Body: "Confirm whether run detail should include blocked tasks.",
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("mark thread blocked: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := orchStore.ReconcileRun(ctx, "run_web_001"); err != nil {
|
||||||
|
t.Fatalf("reconcile run: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := NewRouter(app.NewWebService(sqlDB))
|
||||||
|
|
||||||
|
assertStatusAndJSONField(t, handler, "/health", http.StatusOK, []string{"status"}, "ok")
|
||||||
|
assertStatusAndJSONField(t, handler, "/api/runs", http.StatusOK, []string{"runs", "0", "run", "run_id"}, "run_web_001")
|
||||||
|
assertStatusAndJSONField(t, handler, "/api/runs/run_web_001", http.StatusOK, []string{"run", "run", "run_id"}, "run_web_001")
|
||||||
|
assertStatusAndJSONField(t, handler, "/api/runs/run_web_001/tasks", http.StatusOK, []string{"tasks", "0", "task_id"}, "T1")
|
||||||
|
assertStatusAndJSONField(t, handler, "/api/runs/run_web_001/blocked", http.StatusOK, []string{"blocked", "0", "task", "task_id"}, "T1")
|
||||||
|
assertStatusAndJSONField(t, handler, "/api/threads/"+dispatch.Attempt.ThreadID, http.StatusOK, []string{"thread", "thread", "thread_id"}, dispatch.Attempt.ThreadID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRouterMapsNotFoundErrors(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
||||||
|
sqlDB, err := dbpkg.Open(ctx, dbPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open db: %v", err)
|
||||||
|
}
|
||||||
|
defer sqlDB.Close()
|
||||||
|
|
||||||
|
if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil {
|
||||||
|
t.Fatalf("apply migrations: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := NewRouter(app.NewWebService(sqlDB))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/runs/missing-run", nil)
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("expected 404, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload map[string]any
|
||||||
|
if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil {
|
||||||
|
t.Fatalf("decode response: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
code := nestedString(t, payload, "error", "code")
|
||||||
|
if code != "not_found" {
|
||||||
|
t.Fatalf("expected not_found error code, got %q", code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertStatusAndJSONField(t *testing.T, handler http.Handler, path string, wantStatus int, fieldPath []string, want string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest(http.MethodGet, path, nil)
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != wantStatus {
|
||||||
|
t.Fatalf("GET %s: expected status %d, got %d", path, wantStatus, rec.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload map[string]any
|
||||||
|
if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil {
|
||||||
|
t.Fatalf("GET %s: decode response: %v", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got := nestedString(t, payload, fieldPath...)
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("GET %s: expected %q at %v, got %q", path, want, fieldPath, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func nestedString(t *testing.T, value any, path ...string) string {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
current := value
|
||||||
|
for _, part := range path {
|
||||||
|
switch typed := current.(type) {
|
||||||
|
case map[string]any:
|
||||||
|
current = typed[part]
|
||||||
|
case []any:
|
||||||
|
if len(part) != 1 || part[0] < '0' || part[0] > '9' {
|
||||||
|
t.Fatalf("path segment %q is not a numeric index", part)
|
||||||
|
}
|
||||||
|
index := int(part[0] - '0')
|
||||||
|
if index >= len(typed) {
|
||||||
|
t.Fatalf("index %d out of range for path %v", index, path)
|
||||||
|
}
|
||||||
|
current = typed[index]
|
||||||
|
default:
|
||||||
|
t.Fatalf("unsupported type %T at path %v", current, path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
got, ok := current.(string)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected string at path %v, got %T", path, current)
|
||||||
|
}
|
||||||
|
return got
|
||||||
|
}
|
||||||
@@ -0,0 +1,211 @@
|
|||||||
|
package query
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"ai-workflow-skill/packages/coord-core/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReadService struct {
|
||||||
|
db *sql.DB
|
||||||
|
orch *store.OrchStore
|
||||||
|
inbox *store.InboxStore
|
||||||
|
}
|
||||||
|
|
||||||
|
type RunListItem struct {
|
||||||
|
Run store.Run `json:"run"`
|
||||||
|
TaskCounts map[string]int `json:"task_counts"`
|
||||||
|
TotalTasks int `json:"total_tasks"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RunDetail struct {
|
||||||
|
Run store.Run `json:"run"`
|
||||||
|
TaskCounts map[string]int `json:"task_counts"`
|
||||||
|
TotalTasks int `json:"total_tasks"`
|
||||||
|
Tasks []store.Task `json:"tasks"`
|
||||||
|
BlockedTasks []store.BlockedTask `json:"blocked_tasks"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReadService(db *sql.DB) *ReadService {
|
||||||
|
return &ReadService{
|
||||||
|
db: db,
|
||||||
|
orch: store.NewOrchStore(db),
|
||||||
|
inbox: store.NewInboxStore(db),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadService) ListRuns(ctx context.Context) ([]RunListItem, error) {
|
||||||
|
rows, err := s.db.QueryContext(
|
||||||
|
ctx,
|
||||||
|
`SELECT run_id, goal, summary, status, created_at, updated_at
|
||||||
|
FROM runs
|
||||||
|
ORDER BY updated_at DESC, created_at DESC`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query runs: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var runs []store.Run
|
||||||
|
runIDs := make([]string, 0)
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
run store.Run
|
||||||
|
createdAt, updated string
|
||||||
|
)
|
||||||
|
if err := rows.Scan(
|
||||||
|
&run.RunID,
|
||||||
|
&run.Goal,
|
||||||
|
&run.Summary,
|
||||||
|
&run.Status,
|
||||||
|
&createdAt,
|
||||||
|
&updated,
|
||||||
|
); err != nil {
|
||||||
|
return nil, fmt.Errorf("scan run list row: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
run.CreatedAt = parseRFC3339(createdAt)
|
||||||
|
run.UpdatedAt = parseRFC3339(updated)
|
||||||
|
runs = append(runs, run)
|
||||||
|
runIDs = append(runIDs, run.RunID)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("iterate runs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
countsByRunID, err := s.collectTaskCounts(ctx, runIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]RunListItem, 0, len(runs))
|
||||||
|
for _, run := range runs {
|
||||||
|
taskCounts := countsByRunID[run.RunID]
|
||||||
|
if taskCounts == nil {
|
||||||
|
taskCounts = map[string]int{}
|
||||||
|
}
|
||||||
|
|
||||||
|
items = append(items, RunListItem{
|
||||||
|
Run: run,
|
||||||
|
TaskCounts: taskCounts,
|
||||||
|
TotalTasks: totalTasks(taskCounts),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadService) GetRunDetail(ctx context.Context, runID string) (RunDetail, error) {
|
||||||
|
overview, err := s.orch.GetRunOverview(ctx, runID)
|
||||||
|
if err != nil {
|
||||||
|
return RunDetail{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
blocked, err := s.orch.ListBlockedTasks(ctx, runID)
|
||||||
|
if err != nil {
|
||||||
|
return RunDetail{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return RunDetail{
|
||||||
|
Run: overview.Run,
|
||||||
|
TaskCounts: overview.TaskCounts,
|
||||||
|
TotalTasks: totalTasks(overview.TaskCounts),
|
||||||
|
Tasks: overview.Tasks,
|
||||||
|
BlockedTasks: blocked,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadService) ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) {
|
||||||
|
detail, err := s.GetRunDetail(ctx, runID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return detail.Tasks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadService) ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) {
|
||||||
|
return s.orch.ListBlockedTasks(ctx, runID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadService) GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) {
|
||||||
|
return s.inbox.GetThread(ctx, threadID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadService) collectTaskCounts(ctx context.Context, runIDs []string) (map[string]map[string]int, error) {
|
||||||
|
result := make(map[string]map[string]int, len(runIDs))
|
||||||
|
if len(runIDs) == 0 {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
args := make([]any, 0, len(runIDs))
|
||||||
|
for _, runID := range runIDs {
|
||||||
|
args = append(args, runID)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := s.db.QueryContext(
|
||||||
|
ctx,
|
||||||
|
`SELECT run_id, status, COUNT(*)
|
||||||
|
FROM tasks
|
||||||
|
WHERE run_id IN (`+placeholders(len(runIDs))+`)
|
||||||
|
GROUP BY run_id, status`,
|
||||||
|
args...,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query task counts for runs: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
runID string
|
||||||
|
status string
|
||||||
|
count int
|
||||||
|
)
|
||||||
|
if err := rows.Scan(&runID, &status, &count); err != nil {
|
||||||
|
return nil, fmt.Errorf("scan run task count: %w", err)
|
||||||
|
}
|
||||||
|
if result[runID] == nil {
|
||||||
|
result[runID] = make(map[string]int)
|
||||||
|
}
|
||||||
|
result[runID][status] = count
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("iterate run task counts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func totalTasks(counts map[string]int) int {
|
||||||
|
total := 0
|
||||||
|
for _, count := range counts {
|
||||||
|
total += count
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
func placeholders(count int) string {
|
||||||
|
if count <= 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, 0, count*2-1)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
if i > 0 {
|
||||||
|
buf = append(buf, ',')
|
||||||
|
}
|
||||||
|
buf = append(buf, '?')
|
||||||
|
}
|
||||||
|
return string(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseRFC3339(value string) time.Time {
|
||||||
|
parsed, err := time.Parse(time.RFC3339Nano, value)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}
|
||||||
|
}
|
||||||
|
return parsed
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user