Files

326 lines
9.9 KiB
Go

package workspaceruntime
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"inbox/internal/app/lanegit"
"inbox/internal/app/runtimecodex"
"inbox/internal/domain/lane"
"inbox/internal/domain/workspace"
)
const (
runtimeBaseImage = "localhost/ai-workflow-agent-runner:local"
runnerContainerPort = "31417/tcp"
runnerContainerPortNum = "31417"
)
type podmanInspect struct {
ImageName string `json:"ImageName"`
Path string `json:"Path"`
Args []string `json:"Args"`
Config struct {
Image string `json:"Image"`
User string `json:"User"`
Env []string `json:"Env"`
WorkingDir string `json:"WorkingDir"`
} `json:"Config"`
State struct {
Running bool `json:"Running"`
Status string `json:"Status"`
} `json:"State"`
NetworkSettings struct {
Ports map[string][]struct {
HostIP string `json:"HostIp"`
HostPort string `json:"HostPort"`
} `json:"Ports"`
} `json:"NetworkSettings"`
Mounts []struct {
Source string `json:"Source"`
Destination string `json:"Destination"`
} `json:"Mounts"`
}
type containerRuntime struct {
projectRoot string
serverPort int
runner lanegit.Runner
probe *endpointProbe
}
const laneWorkerContainerPath = "/usr/local/bin/lane-worker"
const inboxContainerPath = "/usr/local/bin/inbox"
func (r *containerRuntime) ensureRunnerImage(ctx context.Context) error {
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "image", "exists", runtimeBaseImage)
if err != nil {
if strings.TrimSpace(out) == "" {
return fmt.Errorf("runtime base image is missing: %s", runtimeBaseImage)
}
return commandError("runtime base image is missing: "+runtimeBaseImage, out, err)
}
return nil
}
func (r *containerRuntime) ensureLaneContainer(ctx context.Context, ws workspace.Workspace, item lane.Record, workerBinary, inboxBinary, workerCodexDir string) (string, error) {
codexFingerprint, err := runtimeCodexFingerprint(workerCodexDir)
if err != nil {
return "", err
}
info, found, err := r.inspect(ctx, item.ContainerName)
if err != nil {
return "", err
}
if found && r.laneDrifted(info, ws, item, workerBinary, inboxBinary, codexFingerprint) {
if err := r.stopAndRemoveContainer(ctx, item.ContainerName); err != nil {
return "", err
}
found = false
}
if !found {
args := []string{
"create",
"--name", item.ContainerName,
"--user", "root",
"--entrypoint", laneWorkerContainerPath,
"-p", "127.0.0.1::" + runnerContainerPortNum,
"-e", "INBOX_WORKSPACE=/workspace",
"-e", "INBOX_WORKSPACE_ID=" + ws.ID,
"-e", "INBOX_LANE_ID=" + item.ID,
"-e", "INBOX_API_URL=http://host.containers.internal:" + strconv.Itoa(r.serverPort),
"-e", "INBOX_RUNTIME_AGENT_ID=" + item.ContainerName,
"-e", "HOME=" + runtimecodex.ContainerUserHomeDir(),
"-v", filepath.Join(item.WorktreePath) + ":/workspace:z",
"-v", workerBinary + ":" + laneWorkerContainerPath + ":z,ro",
"-v", inboxBinary + ":" + inboxContainerPath + ":z,ro",
"-w", "/workspace",
}
if codexFingerprint != "" {
args = append(args, "-e", "INBOX_RUNTIME_CODEX_SHA="+codexFingerprint)
}
args = append(args, runtimeBaseImage)
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", args...)
if err != nil {
return "", commandError("create lane container "+item.ContainerName, out, err)
}
if err := r.copyRuntimeCodex(ctx, item.ContainerName, workerCodexDir); err != nil {
_ = r.stopAndRemoveContainer(ctx, item.ContainerName)
return "", err
}
info, found, err = r.inspect(ctx, item.ContainerName)
if err != nil {
return "", err
}
if !found {
return "", fmt.Errorf("container %s was not created", item.ContainerName)
}
}
if !info.State.Running {
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "start", item.ContainerName)
if err != nil {
return "", commandError("start lane container "+item.ContainerName, out, err)
}
}
endpoint, err := r.endpoint(ctx, item.ContainerName)
if err != nil {
return "", err
}
if err := r.probe.wait(endpoint); err != nil {
return endpoint, err
}
return endpoint, nil
}
func (r *containerRuntime) inspect(ctx context.Context, name string) (podmanInspect, bool, error) {
if strings.TrimSpace(name) == "" {
return podmanInspect{}, false, nil
}
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "inspect", name)
if err != nil {
if strings.Contains(out, "no such object") || strings.Contains(out, "no container with name or ID") {
return podmanInspect{}, false, nil
}
return podmanInspect{}, false, fmt.Errorf("inspect container %s: %w", name, err)
}
var items []podmanInspect
if err := json.Unmarshal([]byte(out), &items); err != nil {
return podmanInspect{}, false, fmt.Errorf("decode podman inspect %s: %w", name, err)
}
if len(items) == 0 {
return podmanInspect{}, false, nil
}
return items[0], true, nil
}
func (r *containerRuntime) laneDrifted(info podmanInspect, ws workspace.Workspace, item lane.Record, workerBinary, inboxBinary, codexFingerprint string) bool {
if info.ImageName != runtimeBaseImage && info.Config.Image != runtimeBaseImage {
return true
}
if strings.TrimSpace(info.Config.User) != "root" {
return true
}
if !hasPublishedPort(info, runnerContainerPort) {
return true
}
if info.Config.WorkingDir != "/workspace" {
return true
}
if filepath.Clean(strings.TrimSpace(info.Path)) != laneWorkerContainerPath {
return true
}
mounts := make(map[string]string, len(info.Mounts))
for _, mount := range info.Mounts {
mounts[filepath.Clean(mount.Destination)] = filepath.Clean(mount.Source)
}
if _, ok := mounts[filepath.Clean(runtimecodex.ContainerCodexDir())]; ok {
return true
}
expectedMounts := map[string]string{
"/workspace": filepath.Clean(item.WorktreePath),
laneWorkerContainerPath: filepath.Clean(workerBinary),
inboxContainerPath: filepath.Clean(inboxBinary),
}
for destination, source := range expectedMounts {
if filepath.Clean(mounts[destination]) != filepath.Clean(source) {
return true
}
}
envs := make(map[string]string)
for _, item := range info.Config.Env {
key, value, ok := strings.Cut(item, "=")
if !ok {
continue
}
envs[key] = value
}
expectedEnv := map[string]string{
"INBOX_WORKSPACE": "/workspace",
"INBOX_WORKSPACE_ID": ws.ID,
"INBOX_LANE_ID": item.ID,
"INBOX_RUNTIME_AGENT_ID": item.ContainerName,
"INBOX_API_URL": "http://host.containers.internal:" + strconv.Itoa(r.serverPort),
}
if codexFingerprint != "" {
expectedEnv["INBOX_RUNTIME_CODEX_SHA"] = codexFingerprint
} else if _, ok := envs["INBOX_RUNTIME_CODEX_SHA"]; ok {
return true
}
for key, value := range expectedEnv {
if envs[key] != value {
return true
}
}
return false
}
func (r *containerRuntime) copyRuntimeCodex(ctx context.Context, containerName, workerCodexDir string) error {
if strings.TrimSpace(workerCodexDir) == "" {
return nil
}
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "cp", workerCodexDir, containerName+":"+runtimecodex.ContainerUserHomeDir())
if err != nil {
return commandError("copy runtime codex into "+containerName, out, err)
}
return nil
}
func runtimeCodexFingerprint(workerCodexDir string) (string, error) {
if strings.TrimSpace(workerCodexDir) == "" {
return "", nil
}
names := make([]string, 0, 8)
err := filepath.Walk(workerCodexDir, func(path string, info os.FileInfo, walkErr error) error {
if walkErr != nil {
return walkErr
}
if info.IsDir() {
return nil
}
rel, err := filepath.Rel(workerCodexDir, path)
if err != nil {
return err
}
names = append(names, filepath.ToSlash(rel))
return nil
})
if err != nil {
return "", fmt.Errorf("walk runtime codex dir %s: %w", workerCodexDir, err)
}
sort.Strings(names)
sum := sha256.New()
for _, name := range names {
body, err := os.ReadFile(filepath.Join(workerCodexDir, filepath.FromSlash(name)))
if err != nil {
return "", fmt.Errorf("read runtime codex file %s: %w", filepath.Join(workerCodexDir, filepath.FromSlash(name)), err)
}
_, _ = sum.Write([]byte(name))
_, _ = sum.Write([]byte{0})
_, _ = sum.Write(body)
_, _ = sum.Write([]byte{0})
}
return fmt.Sprintf("%x", sum.Sum(nil)), nil
}
func hasPublishedPort(info podmanInspect, containerPort string) bool {
bindings, ok := info.NetworkSettings.Ports[containerPort]
if !ok || len(bindings) == 0 {
return false
}
for _, binding := range bindings {
if strings.TrimSpace(binding.HostPort) != "" {
return true
}
}
return false
}
func (r *containerRuntime) stopContainer(ctx context.Context, containerName string) error {
info, found, err := r.inspect(ctx, containerName)
if err != nil {
return err
}
if !found {
return nil
}
if info.State.Running {
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "stop", containerName)
if err != nil {
return commandError("stop container "+containerName, out, err)
}
}
return nil
}
func (r *containerRuntime) stopAndRemoveContainer(ctx context.Context, containerName string) error {
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "rm", "-f", containerName)
if err != nil {
return commandError("remove container "+containerName, out, err)
}
return nil
}
func (r *containerRuntime) endpoint(ctx context.Context, containerName string) (string, error) {
out, err := r.runner.Run(ctx, r.projectRoot, nil, "podman", "port", containerName, runnerContainerPort)
if err != nil {
return "", fmt.Errorf("read container port for %s: %w", containerName, err)
}
lines := strings.Split(strings.TrimSpace(out), "\n")
if len(lines) == 0 || strings.TrimSpace(lines[0]) == "" {
return "", fmt.Errorf("container %s does not expose %s", containerName, runnerContainerPort)
}
line := strings.TrimSpace(lines[0])
port := line[strings.LastIndex(line, ":")+1:]
if _, err := strconv.Atoi(port); err != nil {
return "", fmt.Errorf("parse runner port %q", line)
}
return "http://127.0.0.1:" + port, nil
}