282 lines
8.4 KiB
Go
282 lines
8.4 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"inbox/internal/app/runtimecodex"
|
|
taskexecapp "inbox/internal/app/taskexec"
|
|
"inbox/internal/client"
|
|
"inbox/internal/domain/workflow"
|
|
)
|
|
|
|
type config struct {
|
|
apiURL string
|
|
workspaceID string
|
|
laneID string
|
|
workspaceDir string
|
|
runnerID string
|
|
pollInterval time.Duration
|
|
heartbeatInterval time.Duration
|
|
healthAddr string
|
|
codexBin string
|
|
}
|
|
|
|
func main() {
|
|
cfg, err := loadConfig()
|
|
if err != nil {
|
|
log.Fatalf("lane-worker config error: %v", err)
|
|
}
|
|
if err := run(cfg); err != nil {
|
|
log.Fatalf("lane-worker failed: %v", err)
|
|
}
|
|
}
|
|
|
|
func loadConfig() (config, error) {
|
|
cfg := config{
|
|
apiURL: strings.TrimSpace(os.Getenv("INBOX_API_URL")),
|
|
workspaceID: strings.TrimSpace(os.Getenv("INBOX_WORKSPACE_ID")),
|
|
laneID: strings.TrimSpace(os.Getenv("INBOX_LANE_ID")),
|
|
workspaceDir: firstNonEmpty(strings.TrimSpace(os.Getenv("INBOX_WORKSPACE")), "/workspace"),
|
|
runnerID: firstNonEmpty(strings.TrimSpace(os.Getenv("INBOX_RUNTIME_AGENT_ID")), strings.TrimSpace(os.Getenv("HOSTNAME")), "lane-worker"),
|
|
pollInterval: 3 * time.Second,
|
|
heartbeatInterval: 15 * time.Second,
|
|
healthAddr: ":31417",
|
|
codexBin: firstNonEmpty(strings.TrimSpace(os.Getenv("CODEX_BIN")), "codex"),
|
|
}
|
|
if cfg.apiURL == "" {
|
|
return config{}, fmt.Errorf("INBOX_API_URL is required")
|
|
}
|
|
if cfg.workspaceID == "" {
|
|
return config{}, fmt.Errorf("INBOX_WORKSPACE_ID is required")
|
|
}
|
|
if cfg.laneID == "" {
|
|
return config{}, fmt.Errorf("INBOX_LANE_ID is required")
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func run(cfg config) error {
|
|
if err := os.MkdirAll(cfg.workspaceDir, 0755); err != nil {
|
|
return fmt.Errorf("ensure workspace dir: %w", err)
|
|
}
|
|
go serveHealth(cfg.healthAddr)
|
|
|
|
httpClient := client.New(cfg.apiURL, nil)
|
|
for {
|
|
assignment, found, err := claimNext(context.Background(), httpClient, cfg)
|
|
if err != nil {
|
|
log.Printf("claim next task: %v", err)
|
|
time.Sleep(cfg.pollInterval)
|
|
continue
|
|
}
|
|
if !found {
|
|
time.Sleep(cfg.pollInterval)
|
|
continue
|
|
}
|
|
if err := executeAssignment(context.Background(), httpClient, cfg, assignment); err != nil {
|
|
log.Printf("execute task %s: %v", assignment.Run.ID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func serveHealth(addr string) {
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`{"ok":true,"service":"lane-worker"}`))
|
|
})
|
|
if err := http.ListenAndServe(addr, mux); err != nil {
|
|
log.Fatalf("lane-worker health server failed: %v", err)
|
|
}
|
|
}
|
|
|
|
func claimNext(ctx context.Context, httpClient *client.Client, cfg config) (taskexecapp.Assignment, bool, error) {
|
|
body, err := json.Marshal(map[string]any{"runner_id": cfg.runnerID})
|
|
if err != nil {
|
|
return taskexecapp.Assignment{}, false, err
|
|
}
|
|
resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/runtime/lanes/"+cfg.laneID+"/tasks/next", http.Header{
|
|
"Content-Type": []string{"application/json"},
|
|
}, body)
|
|
if err != nil {
|
|
return taskexecapp.Assignment{}, false, err
|
|
}
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
return taskexecapp.Assignment{}, false, nil
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return taskexecapp.Assignment{}, false, fmt.Errorf("claim task status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body)))
|
|
}
|
|
var payload struct {
|
|
Assignment taskexecapp.Assignment `json:"assignment"`
|
|
}
|
|
if err := json.Unmarshal(resp.Body, &payload); err != nil {
|
|
return taskexecapp.Assignment{}, false, fmt.Errorf("decode assignment: %w", err)
|
|
}
|
|
return payload.Assignment, true, nil
|
|
}
|
|
|
|
func executeAssignment(ctx context.Context, httpClient *client.Client, cfg config, assignment taskexecapp.Assignment) error {
|
|
log.Printf("starting task run=%s lane=%s task=%s", assignment.Run.ID, assignment.Lane.ID, assignment.Task.ID)
|
|
tempDir, err := os.MkdirTemp("", "lane-worker-*")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.RemoveAll(tempDir)
|
|
|
|
resultFile := filepath.Join(tempDir, "result.txt")
|
|
args := []string{
|
|
"exec",
|
|
"--skip-git-repo-check",
|
|
"-C", cfg.workspaceDir,
|
|
"--dangerously-bypass-approvals-and-sandbox",
|
|
"-o", resultFile,
|
|
}
|
|
if strings.TrimSpace(assignment.Model) != "" {
|
|
args = append(args, "-m", assignment.Model)
|
|
}
|
|
args = append(args, "-")
|
|
|
|
cmd := exec.CommandContext(ctx, cfg.codexBin, args...)
|
|
cmd.Dir = cfg.workspaceDir
|
|
cmd.Stdin = strings.NewReader(assignment.Prompt)
|
|
cmd.Env = roleCommandEnv(assignment)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("stdout pipe: %w", err)
|
|
}
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("stderr pipe: %w", err)
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("start codex exec: %w", err)
|
|
}
|
|
|
|
var streamWG sync.WaitGroup
|
|
streamWG.Add(2)
|
|
go streamLogs(&streamWG, stdout, workflow.LogStreamStdout, httpClient, assignment.Run.ID)
|
|
go streamLogs(&streamWG, stderr, workflow.LogStreamStderr, httpClient, assignment.Run.ID)
|
|
|
|
waitErr := cmd.Wait()
|
|
streamWG.Wait()
|
|
|
|
resultBytes, _ := os.ReadFile(resultFile)
|
|
resultText := strings.TrimSpace(string(resultBytes))
|
|
completion := taskexecapp.Completion{
|
|
RunID: assignment.Run.ID,
|
|
ExitCode: 0,
|
|
ResultMarkdown: resultText,
|
|
Status: workflow.RunStatusSucceeded,
|
|
}
|
|
if waitErr != nil {
|
|
completion.Status = workflow.RunStatusFailed
|
|
completion.ErrorMessage = waitErr.Error()
|
|
if exitErr, ok := waitErr.(*exec.ExitError); ok {
|
|
completion.ExitCode = exitErr.ExitCode()
|
|
}
|
|
}
|
|
if _, err := completeRun(ctx, httpClient, completion); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("completed task run=%s status=%s", assignment.Run.ID, completion.Status)
|
|
return nil
|
|
}
|
|
|
|
func streamLogs(wg *sync.WaitGroup, pipeReader io.Reader, stream workflow.LogStream, httpClient *client.Client, runID string) {
|
|
defer wg.Done()
|
|
scanner := bufio.NewScanner(pipeReader)
|
|
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
if err := appendLog(context.Background(), httpClient, runID, stream, line); err != nil {
|
|
log.Printf("append log run=%s stream=%s: %v", runID, stream, err)
|
|
}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
log.Printf("log scan run=%s stream=%s: %v", runID, stream, err)
|
|
}
|
|
}
|
|
|
|
func appendLog(ctx context.Context, httpClient *client.Client, runID string, stream workflow.LogStream, content string) error {
|
|
body, err := json.Marshal(map[string]any{
|
|
"stream": stream,
|
|
"content": content,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/workflow-runs/"+runID+"/logs", http.Header{
|
|
"Content-Type": []string{"application/json"},
|
|
}, body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.StatusCode != http.StatusCreated {
|
|
return fmt.Errorf("append log status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func completeRun(ctx context.Context, httpClient *client.Client, completion taskexecapp.Completion) (workflow.Run, error) {
|
|
body, err := json.Marshal(map[string]any{
|
|
"status": completion.Status,
|
|
"exit_code": completion.ExitCode,
|
|
"result_markdown": completion.ResultMarkdown,
|
|
"error_message": completion.ErrorMessage,
|
|
})
|
|
if err != nil {
|
|
return workflow.Run{}, err
|
|
}
|
|
resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/runtime/task-executions/"+completion.RunID+"/complete", http.Header{
|
|
"Content-Type": []string{"application/json"},
|
|
}, body)
|
|
if err != nil {
|
|
return workflow.Run{}, err
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return workflow.Run{}, fmt.Errorf("complete run status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body)))
|
|
}
|
|
var run workflow.Run
|
|
if err := json.Unmarshal(resp.Body, &run); err != nil {
|
|
return workflow.Run{}, fmt.Errorf("decode completed run: %w", err)
|
|
}
|
|
return run, nil
|
|
}
|
|
|
|
func roleCommandEnv(assignment taskexecapp.Assignment) []string {
|
|
roleHome := runtimecodex.ContainerUserHomeDir()
|
|
codexDir := runtimecodex.ContainerCodexDir()
|
|
_ = os.MkdirAll(codexDir, 0755)
|
|
|
|
env := append([]string{}, os.Environ()...)
|
|
env = append(env, "HOME="+roleHome)
|
|
env = append(env, "CODEX_HOME="+codexDir)
|
|
return env
|
|
}
|
|
|
|
func firstNonEmpty(values ...string) string {
|
|
for _, value := range values {
|
|
if strings.TrimSpace(value) != "" {
|
|
return value
|
|
}
|
|
}
|
|
return ""
|
|
}
|