package topics import ( "context" "testing" "time" "inbox/internal/base/timeutil" "inbox/internal/domain/lane" "inbox/internal/domain/message" "inbox/internal/domain/task" "inbox/internal/domain/taskgraph" "inbox/internal/domain/topic" "inbox/internal/domain/workflow" "inbox/internal/domain/workspace" sqlitestore "inbox/internal/store/sqlite" ) func TestStopCancelsTopicExecutionState(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 7, 30, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() now := timeutil.FormatRFC3339(clock.Now()) project, err := store.CreateProject(ctx, workspace.Project{ Slug: "demo", Name: "Demo", RootPath: t.TempDir(), DefaultBranch: "main", Status: "active", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateProject() error = %v", err) } ws, err := store.CreateWorkspace(ctx, workspace.Workspace{ ProjectID: project.ID, Slug: "demo", Name: "demo", RootPath: t.TempDir(), BaseBranch: "main", WorktreeBranch: "worktree/demo", RuntimeBackend: "host", Status: "active", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateWorkspace() error = %v", err) } topicRecord, err := store.CreateTopic(ctx, topic.Record{ WorkspaceID: ws.ID, Slug: "sample", Title: "sample", Space: topic.SpaceWorkflow, Status: "execution", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateTopic() error = %v", err) } runningLane, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Execution Chain", Slug: "execution-chain", Status: lane.StatusRunning, BranchName: "lane/demo/execution-lane", WorktreePath: t.TempDir() + "/execution-chain", ContainerName: "lane-demo", RuntimeEndpoint: "http://127.0.0.1:40123", CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, StartedAt: now, }) if err != nil { t.Fatalf("CreateLane() error = %v", err) } readyLane, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Ready Chain", Slug: "ready-chain", Status: lane.StatusReady, BranchName: "lane/demo/ready-lane", WorktreePath: t.TempDir() + "/ready-chain", ContainerName: "lane-ready", CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateLane(ready) error = %v", err) } run, err := store.CreateWorkflowRun(ctx, workflow.Run{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, RoleName: "worker", Stage: workflow.StageExecution, Mode: "task", Status: workflow.RunStatusRunning, CommandJSON: "{}", StartedAt: now, }) if err != nil { t.Fatalf("CreateWorkflowRun() error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: runningLane.ID, Title: "Implement feature", BodyMarkdown: "Ship the feature.", Kind: task.KindExecution, Status: task.StatusRunning, AssignedRunID: run.ID, CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, StartedAt: now, }, nil); err != nil { t.Fatalf("CreateTask(running) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: readyLane.ID, Title: "Verify feature", BodyMarkdown: "Verify the feature.", Kind: task.KindVerification, Status: task.StatusReady, CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, }, nil); err != nil { t.Fatalf("CreateTask(ready) error = %v", err) } msg, err := store.CreateMessage(ctx, message.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, FromRoleName: "leader", ToExpr: "worker", Type: message.TypeSummary, Stage: "execution", BodyMarkdown: "Continue execution.", CreatedAt: now, }) if err != nil { t.Fatalf("CreateMessage() error = %v", err) } service := NewService(store, &fakeTopicRuntime{store: store, now: now}, clock) stopped, err := service.Stop(ctx, topicRecord.ID) if err != nil { t.Fatalf("Stop() error = %v", err) } if stopped.Status != "cancelled" { t.Fatalf("expected cancelled topic, got %#v", stopped) } if stopped.ClosedAt == "" { t.Fatalf("expected closed_at to be set") } lanes, err := store.ListLanesByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListLanesByTopic() error = %v", err) } for _, item := range lanes { if item.Status != lane.StatusCancelled { t.Fatalf("expected cancelled lane, got %#v", item) } } tasks, err := store.ListTasksByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("ListTasksByTopic() error = %v", err) } for _, item := range tasks { if item.Status != task.StatusCancelled { t.Fatalf("expected cancelled task, got %#v", item) } if item.CompletedAt == "" { t.Fatalf("expected completed_at on task %#v", item) } } updatedRun, err := store.GetWorkflowRun(ctx, run.ID) if err != nil { t.Fatalf("GetWorkflowRun() error = %v", err) } if updatedRun.Status != workflow.RunStatusCancelled { t.Fatalf("expected cancelled run, got %#v", updatedRun) } if updatedRun.CompletedAt == "" { t.Fatalf("expected completed_at on run %#v", updatedRun) } var deliveryState string if err := store.DB().QueryRowContext(ctx, ` SELECT state FROM message_deliveries WHERE message_id = ? AND recipient_role_name = 'worker' `, msg.ID).Scan(&deliveryState); err != nil { t.Fatalf("select delivery state: %v", err) } if deliveryState != string(message.DeliveryArchived) { t.Fatalf("expected archived delivery, got %q", deliveryState) } } func TestConfirmPlanActivatesDraftGraphAndStartsReadyLanes(t *testing.T) { ctx := context.Background() clock := timeutil.FixedClock{Time: time.Date(2026, 3, 17, 8, 0, 0, 0, time.UTC)} store, err := sqlitestore.OpenInMemory(clock) if err != nil { t.Fatalf("OpenInMemory() error = %v", err) } defer store.Close() now := timeutil.FormatRFC3339(clock.Now()) project, err := store.CreateProject(ctx, workspace.Project{ Slug: "demo", Name: "Demo", RootPath: t.TempDir(), DefaultBranch: "main", Status: "active", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateProject() error = %v", err) } ws, err := store.CreateWorkspace(ctx, workspace.Workspace{ ProjectID: project.ID, Slug: "demo", Name: "demo", RootPath: t.TempDir(), BaseBranch: "main", WorktreeBranch: "worktree/demo", RuntimeBackend: "host", Status: "active", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateWorkspace() error = %v", err) } topicRecord, err := store.CreateTopic(ctx, topic.Record{ WorkspaceID: ws.ID, Slug: "sample", Title: "sample", Space: topic.SpaceWorkflow, Status: "awaiting_confirmation", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateTopic() error = %v", err) } gateLane, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Gate Chain", Slug: "gate-chain", Status: lane.StatusReady, BranchName: "lane/demo/gate-lane", WorktreePath: t.TempDir() + "/gate-chain", ContainerName: "lane-gate", CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateLane(gate) error = %v", err) } readyLane, err := store.CreateLane(ctx, lane.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, Name: "Ready Chain", Slug: "ready-chain", Status: lane.StatusReady, BranchName: "lane/demo/ready-lane", WorktreePath: t.TempDir() + "/ready-chain", ContainerName: "lane-ready", CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, }) if err != nil { t.Fatalf("CreateLane(ready) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: gateLane.ID, Title: "Inspect workspace", BodyMarkdown: "Inspect workspace.", Kind: task.KindGate, Status: task.StatusReady, CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, }, nil); err != nil { t.Fatalf("CreateTask(gate) error = %v", err) } if _, err := store.CreateTask(ctx, task.Record{ WorkspaceID: ws.ID, TopicID: topicRecord.ID, LaneID: readyLane.ID, Title: "Implement feature", BodyMarkdown: "Implement feature.", Kind: task.KindExecution, Status: task.StatusReady, CreatedByRoleName: "leader", CreatedAt: now, UpdatedAt: now, }, nil); err != nil { t.Fatalf("CreateTask(ready) error = %v", err) } if _, err := store.CreateTaskGraphVersion(ctx, taskgraph.Record{ TopicID: topicRecord.ID, Version: 1, Status: taskgraph.StatusDraft, PlanJSON: `{"plan_version":"1"}`, PlanSummaryMarkdown: "Initial graph.", CreatedByRoleName: "leader", CreatedAt: now, }); err != nil { t.Fatalf("CreateTaskGraphVersion() error = %v", err) } runtime := &fakeTopicRuntime{store: store, now: now} service := NewService(store, runtime, clock) confirmed, err := service.ConfirmPlan(ctx, topicRecord.ID) if err != nil { t.Fatalf("ConfirmPlan() error = %v", err) } if confirmed.Status != "execution" { t.Fatalf("expected execution topic, got %#v", confirmed) } if len(runtime.startedLaneIDs) != 1 || runtime.startedLaneIDs[0] != gateLane.ID { t.Fatalf("expected only gate lane to start, got %#v", runtime.startedLaneIDs) } graphVersion, err := store.GetLatestTaskGraphVersionByTopic(ctx, topicRecord.ID) if err != nil { t.Fatalf("GetLatestTaskGraphVersionByTopic() error = %v", err) } if graphVersion.Status != taskgraph.StatusActive || graphVersion.ConfirmedAt == "" { t.Fatalf("expected active confirmed graph version, got %#v", graphVersion) } } type fakeTopicRuntime struct { store *sqlitestore.Store now string startedLaneIDs []string } func (f *fakeTopicRuntime) StopLane(ctx context.Context, laneID string) (lane.Record, error) { item, err := f.store.GetLane(ctx, laneID) if err != nil { return lane.Record{}, err } item.Status = lane.StatusCancelled item.RuntimeEndpoint = "" item.ErrorMessage = "Stopped manually on user request." item.CompletedAt = f.now return f.store.UpdateLane(ctx, item) } func (f *fakeTopicRuntime) EnsureLane(ctx context.Context, laneID string) (lane.Record, error) { f.startedLaneIDs = append(f.startedLaneIDs, laneID) item, err := f.store.GetLane(ctx, laneID) if err != nil { return lane.Record{}, err } item.Status = lane.StatusRunning item.StartedAt = f.now return f.store.UpdateLane(ctx, item) }