Files

282 lines
8.4 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"inbox/internal/app/runtimecodex"
taskexecapp "inbox/internal/app/taskexec"
"inbox/internal/client"
"inbox/internal/domain/workflow"
)
type config struct {
apiURL string
workspaceID string
laneID string
workspaceDir string
runnerID string
pollInterval time.Duration
heartbeatInterval time.Duration
healthAddr string
codexBin string
}
func main() {
cfg, err := loadConfig()
if err != nil {
log.Fatalf("lane-worker config error: %v", err)
}
if err := run(cfg); err != nil {
log.Fatalf("lane-worker failed: %v", err)
}
}
func loadConfig() (config, error) {
cfg := config{
apiURL: strings.TrimSpace(os.Getenv("INBOX_API_URL")),
workspaceID: strings.TrimSpace(os.Getenv("INBOX_WORKSPACE_ID")),
laneID: strings.TrimSpace(os.Getenv("INBOX_LANE_ID")),
workspaceDir: firstNonEmpty(strings.TrimSpace(os.Getenv("INBOX_WORKSPACE")), "/workspace"),
runnerID: firstNonEmpty(strings.TrimSpace(os.Getenv("INBOX_RUNTIME_AGENT_ID")), strings.TrimSpace(os.Getenv("HOSTNAME")), "lane-worker"),
pollInterval: 3 * time.Second,
heartbeatInterval: 15 * time.Second,
healthAddr: ":31417",
codexBin: firstNonEmpty(strings.TrimSpace(os.Getenv("CODEX_BIN")), "codex"),
}
if cfg.apiURL == "" {
return config{}, fmt.Errorf("INBOX_API_URL is required")
}
if cfg.workspaceID == "" {
return config{}, fmt.Errorf("INBOX_WORKSPACE_ID is required")
}
if cfg.laneID == "" {
return config{}, fmt.Errorf("INBOX_LANE_ID is required")
}
return cfg, nil
}
func run(cfg config) error {
if err := os.MkdirAll(cfg.workspaceDir, 0755); err != nil {
return fmt.Errorf("ensure workspace dir: %w", err)
}
go serveHealth(cfg.healthAddr)
httpClient := client.New(cfg.apiURL, nil)
for {
assignment, found, err := claimNext(context.Background(), httpClient, cfg)
if err != nil {
log.Printf("claim next task: %v", err)
time.Sleep(cfg.pollInterval)
continue
}
if !found {
time.Sleep(cfg.pollInterval)
continue
}
if err := executeAssignment(context.Background(), httpClient, cfg, assignment); err != nil {
log.Printf("execute task %s: %v", assignment.Run.ID, err)
}
}
}
func serveHealth(addr string) {
mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"ok":true,"service":"lane-worker"}`))
})
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatalf("lane-worker health server failed: %v", err)
}
}
func claimNext(ctx context.Context, httpClient *client.Client, cfg config) (taskexecapp.Assignment, bool, error) {
body, err := json.Marshal(map[string]any{"runner_id": cfg.runnerID})
if err != nil {
return taskexecapp.Assignment{}, false, err
}
resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/runtime/lanes/"+cfg.laneID+"/tasks/next", http.Header{
"Content-Type": []string{"application/json"},
}, body)
if err != nil {
return taskexecapp.Assignment{}, false, err
}
if resp.StatusCode == http.StatusNoContent {
return taskexecapp.Assignment{}, false, nil
}
if resp.StatusCode != http.StatusOK {
return taskexecapp.Assignment{}, false, fmt.Errorf("claim task status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body)))
}
var payload struct {
Assignment taskexecapp.Assignment `json:"assignment"`
}
if err := json.Unmarshal(resp.Body, &payload); err != nil {
return taskexecapp.Assignment{}, false, fmt.Errorf("decode assignment: %w", err)
}
return payload.Assignment, true, nil
}
func executeAssignment(ctx context.Context, httpClient *client.Client, cfg config, assignment taskexecapp.Assignment) error {
log.Printf("starting task run=%s lane=%s task=%s", assignment.Run.ID, assignment.Lane.ID, assignment.Task.ID)
tempDir, err := os.MkdirTemp("", "lane-worker-*")
if err != nil {
return err
}
defer os.RemoveAll(tempDir)
resultFile := filepath.Join(tempDir, "result.txt")
args := []string{
"exec",
"--skip-git-repo-check",
"-C", cfg.workspaceDir,
"--dangerously-bypass-approvals-and-sandbox",
"-o", resultFile,
}
if strings.TrimSpace(assignment.Model) != "" {
args = append(args, "-m", assignment.Model)
}
args = append(args, "-")
cmd := exec.CommandContext(ctx, cfg.codexBin, args...)
cmd.Dir = cfg.workspaceDir
cmd.Stdin = strings.NewReader(assignment.Prompt)
cmd.Env = roleCommandEnv(assignment)
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("start codex exec: %w", err)
}
var streamWG sync.WaitGroup
streamWG.Add(2)
go streamLogs(&streamWG, stdout, workflow.LogStreamStdout, httpClient, assignment.Run.ID)
go streamLogs(&streamWG, stderr, workflow.LogStreamStderr, httpClient, assignment.Run.ID)
waitErr := cmd.Wait()
streamWG.Wait()
resultBytes, _ := os.ReadFile(resultFile)
resultText := strings.TrimSpace(string(resultBytes))
completion := taskexecapp.Completion{
RunID: assignment.Run.ID,
ExitCode: 0,
ResultMarkdown: resultText,
Status: workflow.RunStatusSucceeded,
}
if waitErr != nil {
completion.Status = workflow.RunStatusFailed
completion.ErrorMessage = waitErr.Error()
if exitErr, ok := waitErr.(*exec.ExitError); ok {
completion.ExitCode = exitErr.ExitCode()
}
}
if _, err := completeRun(ctx, httpClient, completion); err != nil {
return err
}
log.Printf("completed task run=%s status=%s", assignment.Run.ID, completion.Status)
return nil
}
func streamLogs(wg *sync.WaitGroup, pipeReader io.Reader, stream workflow.LogStream, httpClient *client.Client, runID string) {
defer wg.Done()
scanner := bufio.NewScanner(pipeReader)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if err := appendLog(context.Background(), httpClient, runID, stream, line); err != nil {
log.Printf("append log run=%s stream=%s: %v", runID, stream, err)
}
}
if err := scanner.Err(); err != nil {
log.Printf("log scan run=%s stream=%s: %v", runID, stream, err)
}
}
func appendLog(ctx context.Context, httpClient *client.Client, runID string, stream workflow.LogStream, content string) error {
body, err := json.Marshal(map[string]any{
"stream": stream,
"content": content,
})
if err != nil {
return err
}
resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/workflow-runs/"+runID+"/logs", http.Header{
"Content-Type": []string{"application/json"},
}, body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusCreated {
return fmt.Errorf("append log status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body)))
}
return nil
}
func completeRun(ctx context.Context, httpClient *client.Client, completion taskexecapp.Completion) (workflow.Run, error) {
body, err := json.Marshal(map[string]any{
"status": completion.Status,
"exit_code": completion.ExitCode,
"result_markdown": completion.ResultMarkdown,
"error_message": completion.ErrorMessage,
})
if err != nil {
return workflow.Run{}, err
}
resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/runtime/task-executions/"+completion.RunID+"/complete", http.Header{
"Content-Type": []string{"application/json"},
}, body)
if err != nil {
return workflow.Run{}, err
}
if resp.StatusCode != http.StatusOK {
return workflow.Run{}, fmt.Errorf("complete run status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body)))
}
var run workflow.Run
if err := json.Unmarshal(resp.Body, &run); err != nil {
return workflow.Run{}, fmt.Errorf("decode completed run: %w", err)
}
return run, nil
}
func roleCommandEnv(assignment taskexecapp.Assignment) []string {
roleHome := runtimecodex.ContainerUserHomeDir()
codexDir := runtimecodex.ContainerCodexDir()
_ = os.MkdirAll(codexDir, 0755)
env := append([]string{}, os.Environ()...)
env = append(env, "HOME="+roleHome)
env = append(env, "CODEX_HOME="+codexDir)
return env
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}