388 lines
11 KiB
Go
388 lines
11 KiB
Go
package topics
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"inbox/internal/base/timeutil"
|
|
"inbox/internal/domain/lane"
|
|
"inbox/internal/domain/message"
|
|
"inbox/internal/domain/task"
|
|
"inbox/internal/domain/taskgraph"
|
|
"inbox/internal/domain/topic"
|
|
"inbox/internal/domain/workflow"
|
|
"inbox/internal/domain/workspace"
|
|
sqlitestore "inbox/internal/store/sqlite"
|
|
)
|
|
|
|
func TestStopCancelsTopicExecutionState(t *testing.T) {
|
|
ctx := context.Background()
|
|
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 7, 30, 0, 0, time.UTC)}
|
|
store, err := sqlitestore.OpenInMemory(clock)
|
|
if err != nil {
|
|
t.Fatalf("OpenInMemory() error = %v", err)
|
|
}
|
|
defer store.Close()
|
|
|
|
now := timeutil.FormatRFC3339(clock.Now())
|
|
project, err := store.CreateProject(ctx, workspace.Project{
|
|
Slug: "demo",
|
|
Name: "Demo",
|
|
RootPath: t.TempDir(),
|
|
DefaultBranch: "main",
|
|
Status: "active",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateProject() error = %v", err)
|
|
}
|
|
ws, err := store.CreateWorkspace(ctx, workspace.Workspace{
|
|
ProjectID: project.ID,
|
|
Slug: "demo",
|
|
Name: "demo",
|
|
RootPath: t.TempDir(),
|
|
BaseBranch: "main",
|
|
WorktreeBranch: "worktree/demo",
|
|
RuntimeBackend: "host",
|
|
Status: "active",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateWorkspace() error = %v", err)
|
|
}
|
|
topicRecord, err := store.CreateTopic(ctx, topic.Record{
|
|
WorkspaceID: ws.ID,
|
|
Slug: "sample",
|
|
Title: "sample",
|
|
Space: topic.SpaceWorkflow,
|
|
Status: "execution",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateTopic() error = %v", err)
|
|
}
|
|
runningLane, err := store.CreateLane(ctx, lane.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
Name: "Execution Chain",
|
|
Slug: "execution-chain",
|
|
Status: lane.StatusRunning,
|
|
BranchName: "lane/demo/execution-lane",
|
|
WorktreePath: t.TempDir() + "/execution-chain",
|
|
ContainerName: "lane-demo",
|
|
RuntimeEndpoint: "http://127.0.0.1:40123",
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
StartedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateLane() error = %v", err)
|
|
}
|
|
readyLane, err := store.CreateLane(ctx, lane.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
Name: "Ready Chain",
|
|
Slug: "ready-chain",
|
|
Status: lane.StatusReady,
|
|
BranchName: "lane/demo/ready-lane",
|
|
WorktreePath: t.TempDir() + "/ready-chain",
|
|
ContainerName: "lane-ready",
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateLane(ready) error = %v", err)
|
|
}
|
|
run, err := store.CreateWorkflowRun(ctx, workflow.Run{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
RoleName: "worker",
|
|
Stage: workflow.StageExecution,
|
|
Mode: "task",
|
|
Status: workflow.RunStatusRunning,
|
|
CommandJSON: "{}",
|
|
StartedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateWorkflowRun() error = %v", err)
|
|
}
|
|
if _, err := store.CreateTask(ctx, task.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
LaneID: runningLane.ID,
|
|
Title: "Implement feature",
|
|
BodyMarkdown: "Ship the feature.",
|
|
Kind: task.KindExecution,
|
|
Status: task.StatusRunning,
|
|
AssignedRunID: run.ID,
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
StartedAt: now,
|
|
}, nil); err != nil {
|
|
t.Fatalf("CreateTask(running) error = %v", err)
|
|
}
|
|
if _, err := store.CreateTask(ctx, task.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
LaneID: readyLane.ID,
|
|
Title: "Verify feature",
|
|
BodyMarkdown: "Verify the feature.",
|
|
Kind: task.KindVerification,
|
|
Status: task.StatusReady,
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}, nil); err != nil {
|
|
t.Fatalf("CreateTask(ready) error = %v", err)
|
|
}
|
|
msg, err := store.CreateMessage(ctx, message.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
FromRoleName: "leader",
|
|
ToExpr: "worker",
|
|
Type: message.TypeSummary,
|
|
Stage: "execution",
|
|
BodyMarkdown: "Continue execution.",
|
|
CreatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateMessage() error = %v", err)
|
|
}
|
|
|
|
service := NewService(store, &fakeTopicRuntime{store: store, now: now}, clock)
|
|
stopped, err := service.Stop(ctx, topicRecord.ID)
|
|
if err != nil {
|
|
t.Fatalf("Stop() error = %v", err)
|
|
}
|
|
if stopped.Status != "cancelled" {
|
|
t.Fatalf("expected cancelled topic, got %#v", stopped)
|
|
}
|
|
if stopped.ClosedAt == "" {
|
|
t.Fatalf("expected closed_at to be set")
|
|
}
|
|
|
|
lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID)
|
|
if err != nil {
|
|
t.Fatalf("ListLanesByTopic() error = %v", err)
|
|
}
|
|
for _, item := range lanes {
|
|
if item.Status != lane.StatusCancelled {
|
|
t.Fatalf("expected cancelled lane, got %#v", item)
|
|
}
|
|
}
|
|
|
|
tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID)
|
|
if err != nil {
|
|
t.Fatalf("ListTasksByTopic() error = %v", err)
|
|
}
|
|
for _, item := range tasks {
|
|
if item.Status != task.StatusCancelled {
|
|
t.Fatalf("expected cancelled task, got %#v", item)
|
|
}
|
|
if item.CompletedAt == "" {
|
|
t.Fatalf("expected completed_at on task %#v", item)
|
|
}
|
|
}
|
|
|
|
updatedRun, err := store.GetWorkflowRun(ctx, run.ID)
|
|
if err != nil {
|
|
t.Fatalf("GetWorkflowRun() error = %v", err)
|
|
}
|
|
if updatedRun.Status != workflow.RunStatusCancelled {
|
|
t.Fatalf("expected cancelled run, got %#v", updatedRun)
|
|
}
|
|
if updatedRun.CompletedAt == "" {
|
|
t.Fatalf("expected completed_at on run %#v", updatedRun)
|
|
}
|
|
|
|
var deliveryState string
|
|
if err := store.DB().QueryRowContext(ctx, `
|
|
SELECT state
|
|
FROM message_deliveries
|
|
WHERE message_id = ? AND recipient_role_name = 'worker'
|
|
`, msg.ID).Scan(&deliveryState); err != nil {
|
|
t.Fatalf("select delivery state: %v", err)
|
|
}
|
|
if deliveryState != string(message.DeliveryArchived) {
|
|
t.Fatalf("expected archived delivery, got %q", deliveryState)
|
|
}
|
|
}
|
|
|
|
func TestConfirmPlanActivatesDraftGraphAndStartsReadyLanes(t *testing.T) {
|
|
ctx := context.Background()
|
|
clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 8, 0, 0, 0, time.UTC)}
|
|
store, err := sqlitestore.OpenInMemory(clock)
|
|
if err != nil {
|
|
t.Fatalf("OpenInMemory() error = %v", err)
|
|
}
|
|
defer store.Close()
|
|
|
|
now := timeutil.FormatRFC3339(clock.Now())
|
|
project, err := store.CreateProject(ctx, workspace.Project{
|
|
Slug: "demo",
|
|
Name: "Demo",
|
|
RootPath: t.TempDir(),
|
|
DefaultBranch: "main",
|
|
Status: "active",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateProject() error = %v", err)
|
|
}
|
|
ws, err := store.CreateWorkspace(ctx, workspace.Workspace{
|
|
ProjectID: project.ID,
|
|
Slug: "demo",
|
|
Name: "demo",
|
|
RootPath: t.TempDir(),
|
|
BaseBranch: "main",
|
|
WorktreeBranch: "worktree/demo",
|
|
RuntimeBackend: "host",
|
|
Status: "active",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateWorkspace() error = %v", err)
|
|
}
|
|
topicRecord, err := store.CreateTopic(ctx, topic.Record{
|
|
WorkspaceID: ws.ID,
|
|
Slug: "sample",
|
|
Title: "sample",
|
|
Space: topic.SpaceWorkflow,
|
|
Status: "awaiting_confirmation",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateTopic() error = %v", err)
|
|
}
|
|
gateLane, err := store.CreateLane(ctx, lane.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
Name: "Gate Chain",
|
|
Slug: "gate-chain",
|
|
Status: lane.StatusReady,
|
|
BranchName: "lane/demo/gate-lane",
|
|
WorktreePath: t.TempDir() + "/gate-chain",
|
|
ContainerName: "lane-gate",
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateLane(gate) error = %v", err)
|
|
}
|
|
readyLane, err := store.CreateLane(ctx, lane.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
Name: "Ready Chain",
|
|
Slug: "ready-chain",
|
|
Status: lane.StatusReady,
|
|
BranchName: "lane/demo/ready-lane",
|
|
WorktreePath: t.TempDir() + "/ready-chain",
|
|
ContainerName: "lane-ready",
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateLane(ready) error = %v", err)
|
|
}
|
|
if _, err := store.CreateTask(ctx, task.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
LaneID: gateLane.ID,
|
|
Title: "Inspect workspace",
|
|
BodyMarkdown: "Inspect workspace.",
|
|
Kind: task.KindGate,
|
|
Status: task.StatusReady,
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}, nil); err != nil {
|
|
t.Fatalf("CreateTask(gate) error = %v", err)
|
|
}
|
|
if _, err := store.CreateTask(ctx, task.Record{
|
|
WorkspaceID: ws.ID,
|
|
TopicID: topicRecord.ID,
|
|
LaneID: readyLane.ID,
|
|
Title: "Implement feature",
|
|
BodyMarkdown: "Implement feature.",
|
|
Kind: task.KindExecution,
|
|
Status: task.StatusReady,
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}, nil); err != nil {
|
|
t.Fatalf("CreateTask(ready) error = %v", err)
|
|
}
|
|
if _, err := store.CreateTaskGraphVersion(ctx, taskgraph.Record{
|
|
TopicID: topicRecord.ID,
|
|
Version: 1,
|
|
Status: taskgraph.StatusDraft,
|
|
PlanJSON: `{"plan_version":"1"}`,
|
|
PlanSummaryMarkdown: "Initial graph.",
|
|
CreatedByRoleName: "leader",
|
|
CreatedAt: now,
|
|
}); err != nil {
|
|
t.Fatalf("CreateTaskGraphVersion() error = %v", err)
|
|
}
|
|
|
|
runtime := &fakeTopicRuntime{store: store, now: now}
|
|
service := NewService(store, runtime, clock)
|
|
confirmed, err := service.ConfirmPlan(ctx, topicRecord.ID)
|
|
if err != nil {
|
|
t.Fatalf("ConfirmPlan() error = %v", err)
|
|
}
|
|
if confirmed.Status != "execution" {
|
|
t.Fatalf("expected execution topic, got %#v", confirmed)
|
|
}
|
|
if len(runtime.startedLaneIDs) != 1 || runtime.startedLaneIDs[0] != gateLane.ID {
|
|
t.Fatalf("expected only gate lane to start, got %#v", runtime.startedLaneIDs)
|
|
}
|
|
graphVersion, err := store.GetLatestTaskGraphVersionByTopic(ctx, topicRecord.ID)
|
|
if err != nil {
|
|
t.Fatalf("GetLatestTaskGraphVersionByTopic() error = %v", err)
|
|
}
|
|
if graphVersion.Status != taskgraph.StatusActive || graphVersion.ConfirmedAt == "" {
|
|
t.Fatalf("expected active confirmed graph version, got %#v", graphVersion)
|
|
}
|
|
}
|
|
|
|
type fakeTopicRuntime struct {
|
|
store *sqlitestore.Store
|
|
now string
|
|
startedLaneIDs []string
|
|
}
|
|
|
|
func (f *fakeTopicRuntime) StopLane(ctx context.Context, laneID string) (lane.Record, error) {
|
|
item, err := f.store.GetLane(ctx, laneID)
|
|
if err != nil {
|
|
return lane.Record{}, err
|
|
}
|
|
item.Status = lane.StatusCancelled
|
|
item.RuntimeEndpoint = ""
|
|
item.ErrorMessage = "Stopped manually on user request."
|
|
item.CompletedAt = f.now
|
|
return f.store.UpdateLane(ctx, item)
|
|
}
|
|
|
|
func (f *fakeTopicRuntime) EnsureLane(ctx context.Context, laneID string) (lane.Record, error) {
|
|
f.startedLaneIDs = append(f.startedLaneIDs, laneID)
|
|
item, err := f.store.GetLane(ctx, laneID)
|
|
if err != nil {
|
|
return lane.Record{}, err
|
|
}
|
|
item.Status = lane.StatusRunning
|
|
item.StartedAt = f.now
|
|
return f.store.UpdateLane(ctx, item)
|
|
}
|