Files

388 lines
11 KiB
Go

package topics
import (
"context"
"testing"
"time"
"inbox/internal/base/timeutil"
"inbox/internal/domain/lane"
"inbox/internal/domain/message"
"inbox/internal/domain/task"
"inbox/internal/domain/taskgraph"
"inbox/internal/domain/topic"
"inbox/internal/domain/workflow"
"inbox/internal/domain/workspace"
sqlitestore "inbox/internal/store/sqlite"
)
func TestStopCancelsTopicExecutionState(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 7, 30, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
now := timeutil.FormatRFC3339(clock.Now())
project, err := store.CreateProject(ctx, workspace.Project{
Slug: "demo",
Name: "Demo",
RootPath: t.TempDir(),
DefaultBranch: "main",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateProject() error = %v", err)
}
ws, err := store.CreateWorkspace(ctx, workspace.Workspace{
ProjectID: project.ID,
Slug: "demo",
Name: "demo",
RootPath: t.TempDir(),
BaseBranch: "main",
WorktreeBranch: "worktree/demo",
RuntimeBackend: "host",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateWorkspace() error = %v", err)
}
topicRecord, err := store.CreateTopic(ctx, topic.Record{
WorkspaceID: ws.ID,
Slug: "sample",
Title: "sample",
Space: topic.SpaceWorkflow,
Status: "execution",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateTopic() error = %v", err)
}
runningLane, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Execution Chain",
Slug: "execution-chain",
Status: lane.StatusRunning,
BranchName: "lane/demo/execution-lane",
WorktreePath: t.TempDir() + "/execution-chain",
ContainerName: "lane-demo",
RuntimeEndpoint: "http://127.0.0.1:40123",
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
StartedAt: now,
})
if err != nil {
t.Fatalf("CreateLane() error = %v", err)
}
readyLane, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Ready Chain",
Slug: "ready-chain",
Status: lane.StatusReady,
BranchName: "lane/demo/ready-lane",
WorktreePath: t.TempDir() + "/ready-chain",
ContainerName: "lane-ready",
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateLane(ready) error = %v", err)
}
run, err := store.CreateWorkflowRun(ctx, workflow.Run{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
RoleName: "worker",
Stage: workflow.StageExecution,
Mode: "task",
Status: workflow.RunStatusRunning,
CommandJSON: "{}",
StartedAt: now,
})
if err != nil {
t.Fatalf("CreateWorkflowRun() error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: runningLane.ID,
Title: "Implement feature",
BodyMarkdown: "Ship the feature.",
Kind: task.KindExecution,
Status: task.StatusRunning,
AssignedRunID: run.ID,
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
StartedAt: now,
}, nil); err != nil {
t.Fatalf("CreateTask(running) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: readyLane.ID,
Title: "Verify feature",
BodyMarkdown: "Verify the feature.",
Kind: task.KindVerification,
Status: task.StatusReady,
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
}, nil); err != nil {
t.Fatalf("CreateTask(ready) error = %v", err)
}
msg, err := store.CreateMessage(ctx, message.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
FromRoleName: "leader",
ToExpr: "worker",
Type: message.TypeSummary,
Stage: "execution",
BodyMarkdown: "Continue execution.",
CreatedAt: now,
})
if err != nil {
t.Fatalf("CreateMessage() error = %v", err)
}
service := NewService(store, &fakeTopicRuntime{store: store, now: now}, clock)
stopped, err := service.Stop(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("Stop() error = %v", err)
}
if stopped.Status != "cancelled" {
t.Fatalf("expected cancelled topic, got %#v", stopped)
}
if stopped.ClosedAt == "" {
t.Fatalf("expected closed_at to be set")
}
lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListLanesByTopic() error = %v", err)
}
for _, item := range lanes {
if item.Status != lane.StatusCancelled {
t.Fatalf("expected cancelled lane, got %#v", item)
}
}
tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ListTasksByTopic() error = %v", err)
}
for _, item := range tasks {
if item.Status != task.StatusCancelled {
t.Fatalf("expected cancelled task, got %#v", item)
}
if item.CompletedAt == "" {
t.Fatalf("expected completed_at on task %#v", item)
}
}
updatedRun, err := store.GetWorkflowRun(ctx, run.ID)
if err != nil {
t.Fatalf("GetWorkflowRun() error = %v", err)
}
if updatedRun.Status != workflow.RunStatusCancelled {
t.Fatalf("expected cancelled run, got %#v", updatedRun)
}
if updatedRun.CompletedAt == "" {
t.Fatalf("expected completed_at on run %#v", updatedRun)
}
var deliveryState string
if err := store.DB().QueryRowContext(ctx, `
SELECT state
FROM message_deliveries
WHERE message_id = ? AND recipient_role_name = 'worker'
`, msg.ID).Scan(&deliveryState); err != nil {
t.Fatalf("select delivery state: %v", err)
}
if deliveryState != string(message.DeliveryArchived) {
t.Fatalf("expected archived delivery, got %q", deliveryState)
}
}
func TestConfirmPlanActivatesDraftGraphAndStartsReadyLanes(t *testing.T) {
ctx := context.Background()
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 8, 0, 0, 0, time.UTC)}
store, err := sqlitestore.OpenInMemory(clock)
if err != nil {
t.Fatalf("OpenInMemory() error = %v", err)
}
defer store.Close()
now := timeutil.FormatRFC3339(clock.Now())
project, err := store.CreateProject(ctx, workspace.Project{
Slug: "demo",
Name: "Demo",
RootPath: t.TempDir(),
DefaultBranch: "main",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateProject() error = %v", err)
}
ws, err := store.CreateWorkspace(ctx, workspace.Workspace{
ProjectID: project.ID,
Slug: "demo",
Name: "demo",
RootPath: t.TempDir(),
BaseBranch: "main",
WorktreeBranch: "worktree/demo",
RuntimeBackend: "host",
Status: "active",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateWorkspace() error = %v", err)
}
topicRecord, err := store.CreateTopic(ctx, topic.Record{
WorkspaceID: ws.ID,
Slug: "sample",
Title: "sample",
Space: topic.SpaceWorkflow,
Status: "awaiting_confirmation",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateTopic() error = %v", err)
}
gateLane, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Gate Chain",
Slug: "gate-chain",
Status: lane.StatusReady,
BranchName: "lane/demo/gate-lane",
WorktreePath: t.TempDir() + "/gate-chain",
ContainerName: "lane-gate",
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateLane(gate) error = %v", err)
}
readyLane, err := store.CreateLane(ctx, lane.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
Name: "Ready Chain",
Slug: "ready-chain",
Status: lane.StatusReady,
BranchName: "lane/demo/ready-lane",
WorktreePath: t.TempDir() + "/ready-chain",
ContainerName: "lane-ready",
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("CreateLane(ready) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: gateLane.ID,
Title: "Inspect workspace",
BodyMarkdown: "Inspect workspace.",
Kind: task.KindGate,
Status: task.StatusReady,
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
}, nil); err != nil {
t.Fatalf("CreateTask(gate) error = %v", err)
}
if _, err := store.CreateTask(ctx, task.Record{
WorkspaceID: ws.ID,
TopicID: topicRecord.ID,
LaneID: readyLane.ID,
Title: "Implement feature",
BodyMarkdown: "Implement feature.",
Kind: task.KindExecution,
Status: task.StatusReady,
CreatedByRoleName: "leader",
CreatedAt: now,
UpdatedAt: now,
}, nil); err != nil {
t.Fatalf("CreateTask(ready) error = %v", err)
}
if _, err := store.CreateTaskGraphVersion(ctx, taskgraph.Record{
TopicID: topicRecord.ID,
Version: 1,
Status: taskgraph.StatusDraft,
PlanJSON: `{"plan_version":"1"}`,
PlanSummaryMarkdown: "Initial graph.",
CreatedByRoleName: "leader",
CreatedAt: now,
}); err != nil {
t.Fatalf("CreateTaskGraphVersion() error = %v", err)
}
runtime := &fakeTopicRuntime{store: store, now: now}
service := NewService(store, runtime, clock)
confirmed, err := service.ConfirmPlan(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("ConfirmPlan() error = %v", err)
}
if confirmed.Status != "execution" {
t.Fatalf("expected execution topic, got %#v", confirmed)
}
if len(runtime.startedLaneIDs) != 1 || runtime.startedLaneIDs[0] != gateLane.ID {
t.Fatalf("expected only gate lane to start, got %#v", runtime.startedLaneIDs)
}
graphVersion, err := store.GetLatestTaskGraphVersionByTopic(ctx, topicRecord.ID)
if err != nil {
t.Fatalf("GetLatestTaskGraphVersionByTopic() error = %v", err)
}
if graphVersion.Status != taskgraph.StatusActive || graphVersion.ConfirmedAt == "" {
t.Fatalf("expected active confirmed graph version, got %#v", graphVersion)
}
}
type fakeTopicRuntime struct {
store *sqlitestore.Store
now string
startedLaneIDs []string
}
func (f *fakeTopicRuntime) StopLane(ctx context.Context, laneID string) (lane.Record, error) {
item, err := f.store.GetLane(ctx, laneID)
if err != nil {
return lane.Record{}, err
}
item.Status = lane.StatusCancelled
item.RuntimeEndpoint = ""
item.ErrorMessage = "Stopped manually on user request."
item.CompletedAt = f.now
return f.store.UpdateLane(ctx, item)
}
func (f *fakeTopicRuntime) EnsureLane(ctx context.Context, laneID string) (lane.Record, error) {
f.startedLaneIDs = append(f.startedLaneIDs, laneID)
item, err := f.store.GetLane(ctx, laneID)
if err != nil {
return lane.Record{}, err
}
item.Status = lane.StatusRunning
item.StartedAt = f.now
return f.store.UpdateLane(ctx, item)
}