Add web product Phase 1 skeleton

This commit is contained in:
2026-03-20 00:20:38 +08:00
parent 0355d7a847
commit a7ef1e0154
24 changed files with 2287 additions and 6 deletions
+39
View File
@@ -0,0 +1,39 @@
package app
import (
"context"
"database/sql"
"ai-workflow-skill/internal/query"
"ai-workflow-skill/internal/store"
)
type WebService struct {
reads *query.ReadService
}
func NewWebService(db *sql.DB) *WebService {
return &WebService{
reads: query.NewReadService(db),
}
}
func (s *WebService) ListRuns(ctx context.Context) ([]query.RunListItem, error) {
return s.reads.ListRuns(ctx)
}
func (s *WebService) GetRunDetail(ctx context.Context, runID string) (query.RunDetail, error) {
return s.reads.GetRunDetail(ctx, runID)
}
func (s *WebService) ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) {
return s.reads.ListRunTasks(ctx, runID)
}
func (s *WebService) ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) {
return s.reads.ListBlockedTasks(ctx, runID)
}
func (s *WebService) GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) {
return s.reads.GetThreadDetail(ctx, threadID)
}
+58
View File
@@ -0,0 +1,58 @@
package httpapi
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"ai-workflow-skill/internal/store"
)
type errorEnvelope struct {
Error errorPayload `json:"error"`
}
type errorPayload struct {
Code string `json:"code"`
Message string `json:"message"`
}
func writeJSON(w http.ResponseWriter, status int, payload any) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(status)
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
_ = enc.Encode(payload)
}
func writeError(w http.ResponseWriter, err error) {
status, code := classifyError(err)
writeJSON(w, status, errorEnvelope{
Error: errorPayload{
Code: code,
Message: errorMessage(err),
},
})
}
func classifyError(err error) (int, string) {
switch {
case errors.Is(err, store.ErrInvalidInput):
return http.StatusBadRequest, "invalid_input"
case errors.Is(err, store.ErrRunNotFound), errors.Is(err, store.ErrTaskNotFound), errors.Is(err, store.ErrThreadNotFound):
return http.StatusNotFound, "not_found"
case errors.Is(err, store.ErrInvalidState):
return http.StatusConflict, "invalid_state"
default:
return http.StatusInternalServerError, "internal_error"
}
}
func errorMessage(err error) string {
if err == nil {
return "unknown error"
}
return fmt.Sprintf("%v", err)
}
+87
View File
@@ -0,0 +1,87 @@
package httpapi
import (
"context"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"ai-workflow-skill/internal/query"
"ai-workflow-skill/internal/store"
)
type readService interface {
ListRuns(ctx context.Context) ([]query.RunListItem, error)
GetRunDetail(ctx context.Context, runID string) (query.RunDetail, error)
ListRunTasks(ctx context.Context, runID string) ([]store.Task, error)
ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error)
GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error)
}
func NewRouter(service readService) http.Handler {
router := chi.NewRouter()
router.Use(middleware.RequestID)
router.Use(middleware.Recoverer)
router.Use(middleware.Timeout(30 * time.Second))
router.Get("/health", func(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{
"status": "ok",
})
})
router.Route("/api", func(r chi.Router) {
r.Get("/runs", func(w http.ResponseWriter, r *http.Request) {
runs, err := service.ListRuns(r.Context())
if err != nil {
writeError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"runs": runs})
})
r.Get("/runs/{runID}", func(w http.ResponseWriter, r *http.Request) {
runID := chi.URLParam(r, "runID")
run, err := service.GetRunDetail(r.Context(), runID)
if err != nil {
writeError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"run": run})
})
r.Get("/runs/{runID}/tasks", func(w http.ResponseWriter, r *http.Request) {
runID := chi.URLParam(r, "runID")
tasks, err := service.ListRunTasks(r.Context(), runID)
if err != nil {
writeError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": tasks})
})
r.Get("/runs/{runID}/blocked", func(w http.ResponseWriter, r *http.Request) {
runID := chi.URLParam(r, "runID")
blocked, err := service.ListBlockedTasks(r.Context(), runID)
if err != nil {
writeError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"blocked": blocked})
})
r.Get("/threads/{threadID}", func(w http.ResponseWriter, r *http.Request) {
threadID := chi.URLParam(r, "threadID")
thread, err := service.GetThreadDetail(r.Context(), threadID)
if err != nil {
writeError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"thread": thread})
})
})
return router
}
+196
View File
@@ -0,0 +1,196 @@
package httpapi
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"time"
"ai-workflow-skill/internal/app"
dbpkg "ai-workflow-skill/internal/db"
"ai-workflow-skill/internal/store"
)
func TestRouterExposesReadOnlyWebEndpoints(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
sqlDB, err := dbpkg.Open(ctx, dbPath)
if err != nil {
t.Fatalf("open db: %v", err)
}
defer sqlDB.Close()
if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil {
t.Fatalf("apply migrations: %v", err)
}
orchStore := store.NewOrchStore(sqlDB)
inboxStore := store.NewInboxStore(sqlDB)
_, err = orchStore.CreateRun(ctx, store.CreateRunInput{
RunID: "run_web_001",
Goal: "Build the web control plane",
Summary: "Initial HTTP slice",
})
if err != nil {
t.Fatalf("create run: %v", err)
}
_, err = orchStore.AddTask(ctx, store.AddTaskInput{
RunID: "run_web_001",
TaskID: "T1",
Title: "Implement read API",
Summary: "Expose run state over HTTP",
DefaultTo: "worker-a",
})
if err != nil {
t.Fatalf("add task T1: %v", err)
}
_, err = orchStore.AddTask(ctx, store.AddTaskInput{
RunID: "run_web_001",
TaskID: "T2",
Title: "Build React shell",
Summary: "Scaffold the frontend workspace",
DefaultTo: "worker-b",
})
if err != nil {
t.Fatalf("add task T2: %v", err)
}
dispatch, err := orchStore.DispatchTask(ctx, store.DispatchInput{
RunID: "run_web_001",
TaskID: "T1",
ToAgent: "worker-a",
Body: "Expose the initial HTTP API.",
})
if err != nil {
t.Fatalf("dispatch task: %v", err)
}
if _, err := inboxStore.ClaimThread(ctx, store.ClaimInput{
ThreadID: dispatch.Attempt.ThreadID,
Agent: "worker-a",
LeaseSeconds: 300,
}); err != nil {
t.Fatalf("claim thread: %v", err)
}
if _, _, err := inboxStore.UpdateThreadStatus(ctx, store.UpdateInput{
ThreadID: dispatch.Attempt.ThreadID,
Agent: "worker-a",
Status: "blocked",
Summary: "Need the API shape",
Body: "Confirm whether run detail should include blocked tasks.",
}); err != nil {
t.Fatalf("mark thread blocked: %v", err)
}
if _, err := orchStore.ReconcileRun(ctx, "run_web_001"); err != nil {
t.Fatalf("reconcile run: %v", err)
}
handler := NewRouter(app.NewWebService(sqlDB))
assertStatusAndJSONField(t, handler, "/health", http.StatusOK, []string{"status"}, "ok")
assertStatusAndJSONField(t, handler, "/api/runs", http.StatusOK, []string{"runs", "0", "run", "run_id"}, "run_web_001")
assertStatusAndJSONField(t, handler, "/api/runs/run_web_001", http.StatusOK, []string{"run", "run", "run_id"}, "run_web_001")
assertStatusAndJSONField(t, handler, "/api/runs/run_web_001/tasks", http.StatusOK, []string{"tasks", "0", "task_id"}, "T1")
assertStatusAndJSONField(t, handler, "/api/runs/run_web_001/blocked", http.StatusOK, []string{"blocked", "0", "task", "task_id"}, "T1")
assertStatusAndJSONField(t, handler, "/api/threads/"+dispatch.Attempt.ThreadID, http.StatusOK, []string{"thread", "thread", "thread_id"}, dispatch.Attempt.ThreadID)
}
func TestRouterMapsNotFoundErrors(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dbPath := filepath.Join(t.TempDir(), "coord.db")
sqlDB, err := dbpkg.Open(ctx, dbPath)
if err != nil {
t.Fatalf("open db: %v", err)
}
defer sqlDB.Close()
if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil {
t.Fatalf("apply migrations: %v", err)
}
handler := NewRouter(app.NewWebService(sqlDB))
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/api/runs/missing-run", nil)
handler.ServeHTTP(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d", rec.Code)
}
var payload map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil {
t.Fatalf("decode response: %v", err)
}
code := nestedString(t, payload, "error", "code")
if code != "not_found" {
t.Fatalf("expected not_found error code, got %q", code)
}
}
func assertStatusAndJSONField(t *testing.T, handler http.Handler, path string, wantStatus int, fieldPath []string, want string) {
t.Helper()
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, path, nil)
handler.ServeHTTP(rec, req)
if rec.Code != wantStatus {
t.Fatalf("GET %s: expected status %d, got %d", path, wantStatus, rec.Code)
}
var payload map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil {
t.Fatalf("GET %s: decode response: %v", path, err)
}
got := nestedString(t, payload, fieldPath...)
if got != want {
t.Fatalf("GET %s: expected %q at %v, got %q", path, want, fieldPath, got)
}
}
func nestedString(t *testing.T, value any, path ...string) string {
t.Helper()
current := value
for _, part := range path {
switch typed := current.(type) {
case map[string]any:
current = typed[part]
case []any:
if len(part) != 1 || part[0] < '0' || part[0] > '9' {
t.Fatalf("path segment %q is not a numeric index", part)
}
index := int(part[0] - '0')
if index >= len(typed) {
t.Fatalf("index %d out of range for path %v", index, path)
}
current = typed[index]
default:
t.Fatalf("unsupported type %T at path %v", current, path)
}
}
got, ok := current.(string)
if !ok {
t.Fatalf("expected string at path %v, got %T", path, current)
}
return got
}
+211
View File
@@ -0,0 +1,211 @@
package query
import (
"context"
"database/sql"
"fmt"
"time"
"ai-workflow-skill/internal/store"
)
type ReadService struct {
db *sql.DB
orch *store.OrchStore
inbox *store.InboxStore
}
type RunListItem struct {
Run store.Run `json:"run"`
TaskCounts map[string]int `json:"task_counts"`
TotalTasks int `json:"total_tasks"`
}
type RunDetail struct {
Run store.Run `json:"run"`
TaskCounts map[string]int `json:"task_counts"`
TotalTasks int `json:"total_tasks"`
Tasks []store.Task `json:"tasks"`
BlockedTasks []store.BlockedTask `json:"blocked_tasks"`
}
func NewReadService(db *sql.DB) *ReadService {
return &ReadService{
db: db,
orch: store.NewOrchStore(db),
inbox: store.NewInboxStore(db),
}
}
func (s *ReadService) ListRuns(ctx context.Context) ([]RunListItem, error) {
rows, err := s.db.QueryContext(
ctx,
`SELECT run_id, goal, summary, status, created_at, updated_at
FROM runs
ORDER BY updated_at DESC, created_at DESC`,
)
if err != nil {
return nil, fmt.Errorf("query runs: %w", err)
}
defer rows.Close()
var runs []store.Run
runIDs := make([]string, 0)
for rows.Next() {
var (
run store.Run
createdAt, updated string
)
if err := rows.Scan(
&run.RunID,
&run.Goal,
&run.Summary,
&run.Status,
&createdAt,
&updated,
); err != nil {
return nil, fmt.Errorf("scan run list row: %w", err)
}
run.CreatedAt = parseRFC3339(createdAt)
run.UpdatedAt = parseRFC3339(updated)
runs = append(runs, run)
runIDs = append(runIDs, run.RunID)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate runs: %w", err)
}
countsByRunID, err := s.collectTaskCounts(ctx, runIDs)
if err != nil {
return nil, err
}
items := make([]RunListItem, 0, len(runs))
for _, run := range runs {
taskCounts := countsByRunID[run.RunID]
if taskCounts == nil {
taskCounts = map[string]int{}
}
items = append(items, RunListItem{
Run: run,
TaskCounts: taskCounts,
TotalTasks: totalTasks(taskCounts),
})
}
return items, nil
}
func (s *ReadService) GetRunDetail(ctx context.Context, runID string) (RunDetail, error) {
overview, err := s.orch.GetRunOverview(ctx, runID)
if err != nil {
return RunDetail{}, err
}
blocked, err := s.orch.ListBlockedTasks(ctx, runID)
if err != nil {
return RunDetail{}, err
}
return RunDetail{
Run: overview.Run,
TaskCounts: overview.TaskCounts,
TotalTasks: totalTasks(overview.TaskCounts),
Tasks: overview.Tasks,
BlockedTasks: blocked,
}, nil
}
func (s *ReadService) ListRunTasks(ctx context.Context, runID string) ([]store.Task, error) {
detail, err := s.GetRunDetail(ctx, runID)
if err != nil {
return nil, err
}
return detail.Tasks, nil
}
func (s *ReadService) ListBlockedTasks(ctx context.Context, runID string) ([]store.BlockedTask, error) {
return s.orch.ListBlockedTasks(ctx, runID)
}
func (s *ReadService) GetThreadDetail(ctx context.Context, threadID string) (store.ThreadDetail, error) {
return s.inbox.GetThread(ctx, threadID)
}
func (s *ReadService) collectTaskCounts(ctx context.Context, runIDs []string) (map[string]map[string]int, error) {
result := make(map[string]map[string]int, len(runIDs))
if len(runIDs) == 0 {
return result, nil
}
args := make([]any, 0, len(runIDs))
for _, runID := range runIDs {
args = append(args, runID)
}
rows, err := s.db.QueryContext(
ctx,
`SELECT run_id, status, COUNT(*)
FROM tasks
WHERE run_id IN (`+placeholders(len(runIDs))+`)
GROUP BY run_id, status`,
args...,
)
if err != nil {
return nil, fmt.Errorf("query task counts for runs: %w", err)
}
defer rows.Close()
for rows.Next() {
var (
runID string
status string
count int
)
if err := rows.Scan(&runID, &status, &count); err != nil {
return nil, fmt.Errorf("scan run task count: %w", err)
}
if result[runID] == nil {
result[runID] = make(map[string]int)
}
result[runID][status] = count
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate run task counts: %w", err)
}
return result, nil
}
func totalTasks(counts map[string]int) int {
total := 0
for _, count := range counts {
total += count
}
return total
}
func placeholders(count int) string {
if count <= 0 {
return ""
}
buf := make([]byte, 0, count*2-1)
for i := 0; i < count; i++ {
if i > 0 {
buf = append(buf, ',')
}
buf = append(buf, '?')
}
return string(buf)
}
func parseRFC3339(value string) time.Time {
parsed, err := time.Parse(time.RFC3339Nano, value)
if err != nil {
return time.Time{}
}
return parsed
}