326 lines
9.9 KiB
Go
326 lines
9.9 KiB
Go
package workspaceruntime
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"inbox/internal/app/lanegit"
|
|
"inbox/internal/app/runtimecodex"
|
|
"inbox/internal/domain/lane"
|
|
"inbox/internal/domain/workspace"
|
|
)
|
|
|
|
const (
|
|
runtimeBaseImage = "localhost/ai-workflow-agent-runner:local"
|
|
runnerContainerPort = "31417/tcp"
|
|
runnerContainerPortNum = "31417"
|
|
)
|
|
|
|
type podmanInspect struct {
|
|
ImageName string `json:"ImageName"`
|
|
Path string `json:"Path"`
|
|
Args []string `json:"Args"`
|
|
Config struct {
|
|
Image string `json:"Image"`
|
|
User string `json:"User"`
|
|
Env []string `json:"Env"`
|
|
WorkingDir string `json:"WorkingDir"`
|
|
} `json:"Config"`
|
|
State struct {
|
|
Running bool `json:"Running"`
|
|
Status string `json:"Status"`
|
|
} `json:"State"`
|
|
NetworkSettings struct {
|
|
Ports map[string][]struct {
|
|
HostIP string `json:"HostIp"`
|
|
HostPort string `json:"HostPort"`
|
|
} `json:"Ports"`
|
|
} `json:"NetworkSettings"`
|
|
Mounts []struct {
|
|
Source string `json:"Source"`
|
|
Destination string `json:"Destination"`
|
|
} `json:"Mounts"`
|
|
}
|
|
|
|
type containerRuntime struct {
|
|
projectRoot string
|
|
serverPort int
|
|
runner lanegit.Runner
|
|
probe *endpointProbe
|
|
}
|
|
|
|
const laneWorkerContainerPath = "/usr/local/bin/lane-worker"
|
|
const inboxContainerPath = "/usr/local/bin/inbox"
|
|
|
|
func (r *containerRuntime) ensureRunnerImage(ctx context.Context) error {
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "image", "exists", runtimeBaseImage)
|
|
if err != nil {
|
|
if strings.TrimSpace(out) == "" {
|
|
return fmt.Errorf("runtime base image is missing: %s", runtimeBaseImage)
|
|
}
|
|
return commandError("runtime base image is missing: "+runtimeBaseImage, out, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *containerRuntime) ensureLaneContainer(ctx context.Context, ws workspace.Workspace, item lane.Record, workerBinary, inboxBinary, workerCodexDir string) (string, error) {
|
|
codexFingerprint, err := runtimeCodexFingerprint(workerCodexDir)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
info, found, err := r.inspect(ctx, item.ContainerName)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if found && r.laneDrifted(info, ws, item, workerBinary, inboxBinary, codexFingerprint) {
|
|
if err := r.stopAndRemoveContainer(ctx, item.ContainerName); err != nil {
|
|
return "", err
|
|
}
|
|
found = false
|
|
}
|
|
if !found {
|
|
args := []string{
|
|
"create",
|
|
"--name", item.ContainerName,
|
|
"--user", "root",
|
|
"--entrypoint", laneWorkerContainerPath,
|
|
"-p", "127.0.0.1::" + runnerContainerPortNum,
|
|
"-e", "INBOX_WORKSPACE=/workspace",
|
|
"-e", "INBOX_WORKSPACE_ID=" + ws.ID,
|
|
"-e", "INBOX_LANE_ID=" + item.ID,
|
|
"-e", "INBOX_API_URL=http://host.containers.internal:" + strconv.Itoa(r.serverPort),
|
|
"-e", "INBOX_RUNTIME_AGENT_ID=" + item.ContainerName,
|
|
"-e", "HOME=" + runtimecodex.ContainerUserHomeDir(),
|
|
"-v", filepath.Join(item.WorktreePath) + ":/workspace:z",
|
|
"-v", workerBinary + ":" + laneWorkerContainerPath + ":z,ro",
|
|
"-v", inboxBinary + ":" + inboxContainerPath + ":z,ro",
|
|
"-w", "/workspace",
|
|
}
|
|
if codexFingerprint != "" {
|
|
args = append(args, "-e", "INBOX_RUNTIME_CODEX_SHA="+codexFingerprint)
|
|
}
|
|
args = append(args, runtimeBaseImage)
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", args...)
|
|
if err != nil {
|
|
return "", commandError("create lane container "+item.ContainerName, out, err)
|
|
}
|
|
if err := r.copyRuntimeCodex(ctx, item.ContainerName, workerCodexDir); err != nil {
|
|
_ = r.stopAndRemoveContainer(ctx, item.ContainerName)
|
|
return "", err
|
|
}
|
|
info, found, err = r.inspect(ctx, item.ContainerName)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !found {
|
|
return "", fmt.Errorf("container %s was not created", item.ContainerName)
|
|
}
|
|
}
|
|
if !info.State.Running {
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "start", item.ContainerName)
|
|
if err != nil {
|
|
return "", commandError("start lane container "+item.ContainerName, out, err)
|
|
}
|
|
}
|
|
endpoint, err := r.endpoint(ctx, item.ContainerName)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := r.probe.wait(endpoint); err != nil {
|
|
return endpoint, err
|
|
}
|
|
return endpoint, nil
|
|
}
|
|
|
|
func (r *containerRuntime) inspect(ctx context.Context, name string) (podmanInspect, bool, error) {
|
|
if strings.TrimSpace(name) == "" {
|
|
return podmanInspect{}, false, nil
|
|
}
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "inspect", name)
|
|
if err != nil {
|
|
if strings.Contains(out, "no such object") || strings.Contains(out, "no container with name or ID") {
|
|
return podmanInspect{}, false, nil
|
|
}
|
|
return podmanInspect{}, false, fmt.Errorf("inspect container %s: %w", name, err)
|
|
}
|
|
var items []podmanInspect
|
|
if err := json.Unmarshal([]byte(out), &items); err != nil {
|
|
return podmanInspect{}, false, fmt.Errorf("decode podman inspect %s: %w", name, err)
|
|
}
|
|
if len(items) == 0 {
|
|
return podmanInspect{}, false, nil
|
|
}
|
|
return items[0], true, nil
|
|
}
|
|
|
|
func (r *containerRuntime) laneDrifted(info podmanInspect, ws workspace.Workspace, item lane.Record, workerBinary, inboxBinary, codexFingerprint string) bool {
|
|
if info.ImageName != runtimeBaseImage && info.Config.Image != runtimeBaseImage {
|
|
return true
|
|
}
|
|
if strings.TrimSpace(info.Config.User) != "root" {
|
|
return true
|
|
}
|
|
if !hasPublishedPort(info, runnerContainerPort) {
|
|
return true
|
|
}
|
|
if info.Config.WorkingDir != "/workspace" {
|
|
return true
|
|
}
|
|
if filepath.Clean(strings.TrimSpace(info.Path)) != laneWorkerContainerPath {
|
|
return true
|
|
}
|
|
mounts := make(map[string]string, len(info.Mounts))
|
|
for _, mount := range info.Mounts {
|
|
mounts[filepath.Clean(mount.Destination)] = filepath.Clean(mount.Source)
|
|
}
|
|
if _, ok := mounts[filepath.Clean(runtimecodex.ContainerCodexDir())]; ok {
|
|
return true
|
|
}
|
|
expectedMounts := map[string]string{
|
|
"/workspace": filepath.Clean(item.WorktreePath),
|
|
laneWorkerContainerPath: filepath.Clean(workerBinary),
|
|
inboxContainerPath: filepath.Clean(inboxBinary),
|
|
}
|
|
for destination, source := range expectedMounts {
|
|
if filepath.Clean(mounts[destination]) != filepath.Clean(source) {
|
|
return true
|
|
}
|
|
}
|
|
envs := make(map[string]string)
|
|
for _, item := range info.Config.Env {
|
|
key, value, ok := strings.Cut(item, "=")
|
|
if !ok {
|
|
continue
|
|
}
|
|
envs[key] = value
|
|
}
|
|
expectedEnv := map[string]string{
|
|
"INBOX_WORKSPACE": "/workspace",
|
|
"INBOX_WORKSPACE_ID": ws.ID,
|
|
"INBOX_LANE_ID": item.ID,
|
|
"INBOX_RUNTIME_AGENT_ID": item.ContainerName,
|
|
"INBOX_API_URL": "http://host.containers.internal:" + strconv.Itoa(r.serverPort),
|
|
}
|
|
if codexFingerprint != "" {
|
|
expectedEnv["INBOX_RUNTIME_CODEX_SHA"] = codexFingerprint
|
|
} else if _, ok := envs["INBOX_RUNTIME_CODEX_SHA"]; ok {
|
|
return true
|
|
}
|
|
for key, value := range expectedEnv {
|
|
if envs[key] != value {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *containerRuntime) copyRuntimeCodex(ctx context.Context, containerName, workerCodexDir string) error {
|
|
if strings.TrimSpace(workerCodexDir) == "" {
|
|
return nil
|
|
}
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "cp", workerCodexDir, containerName+":"+runtimecodex.ContainerUserHomeDir())
|
|
if err != nil {
|
|
return commandError("copy runtime codex into "+containerName, out, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runtimeCodexFingerprint(workerCodexDir string) (string, error) {
|
|
if strings.TrimSpace(workerCodexDir) == "" {
|
|
return "", nil
|
|
}
|
|
names := make([]string, 0, 8)
|
|
err := filepath.Walk(workerCodexDir, func(path string, info os.FileInfo, walkErr error) error {
|
|
if walkErr != nil {
|
|
return walkErr
|
|
}
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
rel, err := filepath.Rel(workerCodexDir, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
names = append(names, filepath.ToSlash(rel))
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("walk runtime codex dir %s: %w", workerCodexDir, err)
|
|
}
|
|
sort.Strings(names)
|
|
sum := sha256.New()
|
|
for _, name := range names {
|
|
body, err := os.ReadFile(filepath.Join(workerCodexDir, filepath.FromSlash(name)))
|
|
if err != nil {
|
|
return "", fmt.Errorf("read runtime codex file %s: %w", filepath.Join(workerCodexDir, filepath.FromSlash(name)), err)
|
|
}
|
|
_, _ = sum.Write([]byte(name))
|
|
_, _ = sum.Write([]byte{0})
|
|
_, _ = sum.Write(body)
|
|
_, _ = sum.Write([]byte{0})
|
|
}
|
|
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
|
}
|
|
|
|
func hasPublishedPort(info podmanInspect, containerPort string) bool {
|
|
bindings, ok := info.NetworkSettings.Ports[containerPort]
|
|
if !ok || len(bindings) == 0 {
|
|
return false
|
|
}
|
|
for _, binding := range bindings {
|
|
if strings.TrimSpace(binding.HostPort) != "" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *containerRuntime) stopContainer(ctx context.Context, containerName string) error {
|
|
info, found, err := r.inspect(ctx, containerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !found {
|
|
return nil
|
|
}
|
|
if info.State.Running {
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "stop", containerName)
|
|
if err != nil {
|
|
return commandError("stop container "+containerName, out, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *containerRuntime) stopAndRemoveContainer(ctx context.Context, containerName string) error {
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "rm", "-f", containerName)
|
|
if err != nil {
|
|
return commandError("remove container "+containerName, out, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *containerRuntime) endpoint(ctx context.Context, containerName string) (string, error) {
|
|
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "port", containerName, runnerContainerPort)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read container port for %s: %w", containerName, err)
|
|
}
|
|
lines := strings.Split(strings.TrimSpace(out), "\n")
|
|
if len(lines) == 0 || strings.TrimSpace(lines[0]) == "" {
|
|
return "", fmt.Errorf("container %s does not expose %s", containerName, runnerContainerPort)
|
|
}
|
|
line := strings.TrimSpace(lines[0])
|
|
port := line[strings.LastIndex(line, ":")+1:]
|
|
if _, err := strconv.Atoi(port); err != nil {
|
|
return "", fmt.Errorf("parse runner port %q", line)
|
|
}
|
|
return "http://127.0.0.1:" + port, nil
|
|
}
|