package main import ( "bufio" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "os/exec" "path/filepath" "strings" "sync" "time" "inbox/internal/app/runtimecodex" taskexecapp "inbox/internal/app/taskexec" "inbox/internal/client" "inbox/internal/domain/workflow" ) type config struct { apiURL string workspaceID string laneID string workspaceDir string runnerID string pollInterval time.Duration heartbeatInterval time.Duration healthAddr string codexBin string } func main() { cfg, err := loadConfig() if err != nil { log.Fatalf("lane-worker config error: %v", err) } if err := run(cfg); err != nil { log.Fatalf("lane-worker failed: %v", err) } } func loadConfig() (config, error) { cfg := config{ apiURL: strings.TrimSpace(os.Getenv("INBOX_API_URL")), workspaceID: strings.TrimSpace(os.Getenv("INBOX_WORKSPACE_ID")), laneID: strings.TrimSpace(os.Getenv("INBOX_LANE_ID")), workspaceDir: firstNonEmpty(strings.TrimSpace(os.Getenv("INBOX_WORKSPACE")), "/workspace"), runnerID: firstNonEmpty(strings.TrimSpace(os.Getenv("INBOX_RUNTIME_AGENT_ID")), strings.TrimSpace(os.Getenv("HOSTNAME")), "lane-worker"), pollInterval: 3 * time.Second, heartbeatInterval: 15 * time.Second, healthAddr: ":31417", codexBin: firstNonEmpty(strings.TrimSpace(os.Getenv("CODEX_BIN")), "codex"), } if cfg.apiURL == "" { return config{}, fmt.Errorf("INBOX_API_URL is required") } if cfg.workspaceID == "" { return config{}, fmt.Errorf("INBOX_WORKSPACE_ID is required") } if cfg.laneID == "" { return config{}, fmt.Errorf("INBOX_LANE_ID is required") } return cfg, nil } func run(cfg config) error { if err := os.MkdirAll(cfg.workspaceDir, 0755); err != nil { return fmt.Errorf("ensure workspace dir: %w", err) } go serveHealth(cfg.healthAddr) httpClient := client.New(cfg.apiURL, nil) for { assignment, found, err := claimNext(context.Background(), httpClient, cfg) if err != nil { log.Printf("claim next task: %v", err) time.Sleep(cfg.pollInterval) continue } if !found { time.Sleep(cfg.pollInterval) continue } if err := executeAssignment(context.Background(), httpClient, cfg, assignment); err != nil { log.Printf("execute task %s: %v", assignment.Run.ID, err) } } } func serveHealth(addr string) { mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"ok":true,"service":"lane-worker"}`)) }) if err := http.ListenAndServe(addr, mux); err != nil { log.Fatalf("lane-worker health server failed: %v", err) } } func claimNext(ctx context.Context, httpClient *client.Client, cfg config) (taskexecapp.Assignment, bool, error) { body, err := json.Marshal(map[string]any{"runner_id": cfg.runnerID}) if err != nil { return taskexecapp.Assignment{}, false, err } resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/runtime/lanes/"+cfg.laneID+"/tasks/next", http.Header{ "Content-Type": []string{"application/json"}, }, body) if err != nil { return taskexecapp.Assignment{}, false, err } if resp.StatusCode == http.StatusNoContent { return taskexecapp.Assignment{}, false, nil } if resp.StatusCode != http.StatusOK { return taskexecapp.Assignment{}, false, fmt.Errorf("claim task status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body))) } var payload struct { Assignment taskexecapp.Assignment `json:"assignment"` } if err := json.Unmarshal(resp.Body, &payload); err != nil { return taskexecapp.Assignment{}, false, fmt.Errorf("decode assignment: %w", err) } return payload.Assignment, true, nil } func executeAssignment(ctx context.Context, httpClient *client.Client, cfg config, assignment taskexecapp.Assignment) error { log.Printf("starting task run=%s lane=%s task=%s", assignment.Run.ID, assignment.Lane.ID, assignment.Task.ID) tempDir, err := os.MkdirTemp("", "lane-worker-*") if err != nil { return err } defer os.RemoveAll(tempDir) resultFile := filepath.Join(tempDir, "result.txt") args := []string{ "exec", "--skip-git-repo-check", "-C", cfg.workspaceDir, "--dangerously-bypass-approvals-and-sandbox", "-o", resultFile, } if strings.TrimSpace(assignment.Model) != "" { args = append(args, "-m", assignment.Model) } args = append(args, "-") cmd := exec.CommandContext(ctx, cfg.codexBin, args...) cmd.Dir = cfg.workspaceDir cmd.Stdin = strings.NewReader(assignment.Prompt) cmd.Env = roleCommandEnv(assignment) stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { return fmt.Errorf("stderr pipe: %w", err) } if err := cmd.Start(); err != nil { return fmt.Errorf("start codex exec: %w", err) } var streamWG sync.WaitGroup streamWG.Add(2) go streamLogs(&streamWG, stdout, workflow.LogStreamStdout, httpClient, assignment.Run.ID) go streamLogs(&streamWG, stderr, workflow.LogStreamStderr, httpClient, assignment.Run.ID) waitErr := cmd.Wait() streamWG.Wait() resultBytes, _ := os.ReadFile(resultFile) resultText := strings.TrimSpace(string(resultBytes)) completion := taskexecapp.Completion{ RunID: assignment.Run.ID, ExitCode: 0, ResultMarkdown: resultText, Status: workflow.RunStatusSucceeded, } if waitErr != nil { completion.Status = workflow.RunStatusFailed completion.ErrorMessage = waitErr.Error() if exitErr, ok := waitErr.(*exec.ExitError); ok { completion.ExitCode = exitErr.ExitCode() } } if _, err := completeRun(ctx, httpClient, completion); err != nil { return err } log.Printf("completed task run=%s status=%s", assignment.Run.ID, completion.Status) return nil } func streamLogs(wg *sync.WaitGroup, pipeReader io.Reader, stream workflow.LogStream, httpClient *client.Client, runID string) { defer wg.Done() scanner := bufio.NewScanner(pipeReader) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if line == "" { continue } if err := appendLog(context.Background(), httpClient, runID, stream, line); err != nil { log.Printf("append log run=%s stream=%s: %v", runID, stream, err) } } if err := scanner.Err(); err != nil { log.Printf("log scan run=%s stream=%s: %v", runID, stream, err) } } func appendLog(ctx context.Context, httpClient *client.Client, runID string, stream workflow.LogStream, content string) error { body, err := json.Marshal(map[string]any{ "stream": stream, "content": content, }) if err != nil { return err } resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/workflow-runs/"+runID+"/logs", http.Header{ "Content-Type": []string{"application/json"}, }, body) if err != nil { return err } if resp.StatusCode != http.StatusCreated { return fmt.Errorf("append log status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body))) } return nil } func completeRun(ctx context.Context, httpClient *client.Client, completion taskexecapp.Completion) (workflow.Run, error) { body, err := json.Marshal(map[string]any{ "status": completion.Status, "exit_code": completion.ExitCode, "result_markdown": completion.ResultMarkdown, "error_message": completion.ErrorMessage, }) if err != nil { return workflow.Run{}, err } resp, err := httpClient.Do(ctx, http.MethodPost, "/api/v2/runtime/task-executions/"+completion.RunID+"/complete", http.Header{ "Content-Type": []string{"application/json"}, }, body) if err != nil { return workflow.Run{}, err } if resp.StatusCode != http.StatusOK { return workflow.Run{}, fmt.Errorf("complete run status %d: %s", resp.StatusCode, strings.TrimSpace(string(resp.Body))) } var run workflow.Run if err := json.Unmarshal(resp.Body, &run); err != nil { return workflow.Run{}, fmt.Errorf("decode completed run: %w", err) } return run, nil } func roleCommandEnv(assignment taskexecapp.Assignment) []string { roleHome := runtimecodex.ContainerUserHomeDir() codexDir := runtimecodex.ContainerCodexDir() _ = os.MkdirAll(codexDir, 0755) env := append([]string{}, os.Environ()...) env = append(env, "HOME="+roleHome) env = append(env, "CODEX_HOME="+codexDir) return env } func firstNonEmpty(values ...string) string { for _, value := range values { if strings.TrimSpace(value) != "" { return value } } return "" }