Files

489 lines
15 KiB
Go

package dashboard
import (
"context"
"database/sql"
"sort"
"strings"
"inbox/internal/domain/humantask"
"inbox/internal/domain/lane"
"inbox/internal/domain/lanesync"
"inbox/internal/domain/message"
"inbox/internal/domain/role"
"inbox/internal/domain/task"
"inbox/internal/domain/topic"
"inbox/internal/domain/workflow"
"inbox/internal/domain/workspace"
)
func (s *Service) WorkflowBoard(ctx context.Context, ws workspace.Workspace, activeSlug string) (dashboardWorkflowBoardResponse, error) {
snapshot, err := s.loadWorkflowBoardSnapshot(ctx, ws.ID)
if err != nil {
return dashboardWorkflowBoardResponse{}, err
}
summaries := make([]dashboardWorkflowTopicSummary, 0, len(snapshot.topics))
for _, item := range snapshot.topics {
topicMessages := snapshot.messagesByTopic[item.ID]
topicRuns := snapshot.runsByTopic[item.ID]
summaries = append(summaries, dashboardWorkflowTopicSummary{
Name: item.Slug,
Status: item.Status,
MessageCount: len(topicMessages),
LatestStage: latestTopicStage(item, topicMessages, topicRuns),
LatestTime: latestTopicTimeWithLanes(item, topicMessages, topicRuns, snapshot.lanesByTopic[item.ID]),
RunningRoles: runningRolesForRuns(topicRuns),
WaitingRoles: pendingRolesForTopic(
mergePendingRoleCounts(snapshot.pendingByTopicRole[item.ID], snapshot.pendingHumanByTopicRole[item.ID]),
snapshot.roles,
),
})
}
sort.Slice(summaries, func(i, j int) bool {
if summaries[i].LatestTime == summaries[j].LatestTime {
return summaries[i].Name < summaries[j].Name
}
return summaries[i].LatestTime > summaries[j].LatestTime
})
if activeSlug == "" && len(summaries) > 0 {
activeSlug = summaries[0].Name
}
response := dashboardWorkflowBoardResponse{
Topics: summaries,
Board: nil,
}
if activeSlug == "" {
return response, nil
}
record, err := s.repo.GetTopicBySlugOrTitle(ctx, ws.ID, activeSlug, topic.SpaceWorkflow)
if err != nil {
return dashboardWorkflowBoardResponse{}, err
}
response.ActiveTopic = record.Slug
board, err := s.buildWorkflowBoard(
ctx,
record,
snapshot.roles,
snapshot.lanesByTopic[record.ID],
snapshot.messagesByTopic[record.ID],
snapshot.runsByTopic[record.ID],
snapshot.pendingByTopicRole[record.ID],
snapshot.pendingByRole,
snapshot.pendingHumanByTopicRole[record.ID],
snapshot.pendingHumanByRole,
snapshot.pendingHumanByTopic[record.ID],
)
if err != nil {
return dashboardWorkflowBoardResponse{}, err
}
response.Board = board
return response, nil
}
func (s *Service) buildWorkflowBoard(
ctx context.Context,
record topic.Record,
roles []role.Definition,
lanes []lane.Record,
messages []message.Record,
runs []workflow.Run,
pendingByRole map[string]int,
pendingGlobal map[string]int,
pendingHumanByRole map[string]int,
pendingHumanGlobal map[string]int,
humanTasks []humantask.Record,
) (*dashboardWorkflowBoard, error) {
tasks, err := s.repo.ListTasksByTopic(ctx, record.ID)
if err != nil {
return nil, err
}
laneSyncs, err := s.repo.ListLaneSyncsByTopic(ctx, record.ID)
if err != nil {
return nil, err
}
latestPlan, planErr := s.repo.GetLatestTaskGraphVersionByTopic(ctx, record.ID)
if planErr != nil && planErr != sql.ErrNoRows {
return nil, planErr
}
events := make([]workflowEventEnvelope, 0, len(messages)+len(runs))
messageByID := make(map[string]message.Record, len(messages))
for _, item := range messages {
messageByID[item.ID] = item
events = append(events, workflowEventEnvelope{
Key: item.ID,
Timestamp: item.CreatedAt,
Event: dashboardWorkflowEvent{
Kind: "message",
ID: item.ID,
Timestamp: item.CreatedAt,
From: item.FromRoleName,
To: item.ToExpr,
Stage: item.Stage,
Type: string(item.Type),
Body: item.BodyMarkdown,
ReplyTo: item.ReplyToMessageID,
},
})
}
for _, item := range runs {
timestamp := latestString(item.CompletedAt, item.StartedAt)
reply := ""
if item.ReplyMessageID != "" {
reply = strings.TrimSpace(messageByID[item.ReplyMessageID].BodyMarkdown)
}
events = append(events, workflowEventEnvelope{
Key: item.ID,
Timestamp: timestamp,
Event: dashboardWorkflowEvent{
Kind: "dispatch",
ID: item.ID,
Timestamp: timestamp,
Role: item.RoleName,
Stage: string(item.Stage),
Mode: item.Mode,
Running: item.Status == workflow.RunStatusRunning,
StartedAt: item.StartedAt,
CompletedAt: item.CompletedAt,
ExitCode: item.ExitCode,
Reply: reply,
ErrorMessage: item.ErrorMessage,
},
})
}
sort.Slice(events, func(i, j int) bool {
if events[i].Timestamp == events[j].Timestamp {
return events[i].Key < events[j].Key
}
return events[i].Timestamp < events[j].Timestamp
})
combinedPendingByRole := mergePendingRoleCounts(pendingByRole, pendingHumanByRole)
combinedPendingGlobal := mergePendingRoleCounts(pendingGlobal, pendingHumanGlobal)
links := buildWorkflowLinks(messages, combinedPendingByRole, runs)
agents := buildWorkflowAgents(roles, messages, runs, combinedPendingByRole, combinedPendingGlobal)
summary := dashboardWorkflowSummary{ActiveRoles: []string{}}
for _, agent := range agents {
switch agent.State {
case "running":
summary.RunningCount++
summary.ActiveRoles = append(summary.ActiveRoles, agent.Name)
case "queued", "recent":
if agent.State == "queued" {
summary.WaitingCount++
}
summary.ActiveRoles = append(summary.ActiveRoles, agent.Name)
}
summary.LastEventAt = latestString(summary.LastEventAt, agent.SessionLastUsedAt)
}
summary.LastEventAt = latestString(summary.LastEventAt, record.UpdatedAt)
summary.LastEventAt = latestString(summary.LastEventAt, latestLaneTime(lanes))
taskItems, err := s.buildWorkflowTasks(ctx, tasks)
if err != nil {
return nil, err
}
laneItems := buildWorkflowLanes(lanes, laneSyncs)
for _, item := range laneItems {
switch item.Status {
case string(lane.StatusRunning):
summary.RunningCount++
case string(lane.StatusReady), string(lane.StatusBlocked):
summary.WaitingCount++
}
}
payloadEvents := make([]dashboardWorkflowEvent, 0, len(events))
for _, item := range events {
payloadEvents = append(payloadEvents, item.Event)
}
payloadHumanTasks := buildWorkflowHumanTasks(humanTasks, messageByID)
var planPayload *dashboardWorkflowPlan
if planErr == nil {
planPayload = &dashboardWorkflowPlan{
Version: latestPlan.Version,
Status: string(latestPlan.Status),
SummaryMarkdown: latestPlan.PlanSummaryMarkdown,
CreatedAt: latestPlan.CreatedAt,
ConfirmedAt: latestPlan.ConfirmedAt,
CreatedByRoleName: latestPlan.CreatedByRoleName,
SupersedesVersionID: latestPlan.SupersedesGraphVersionID,
}
}
return &dashboardWorkflowBoard{
Topic: dashboardWorkflowTopicDetail{
Name: record.Slug,
LatestStage: latestTopicStage(record, messages, runs),
MessageCount: len(messages),
CreatedAt: record.CreatedAt,
UpdatedAt: record.UpdatedAt,
Status: record.Status,
},
Plan: planPayload,
Summary: summary,
Agents: agents,
Lanes: laneItems,
Tasks: taskItems,
Links: links,
Events: payloadEvents,
PendingHumanTasks: payloadHumanTasks,
}, nil
}
type workflowEventEnvelope struct {
Key string
Timestamp string
Event dashboardWorkflowEvent
}
func buildWorkflowAgents(
roles []role.Definition,
messages []message.Record,
runs []workflow.Run,
pendingByRole map[string]int,
pendingGlobal map[string]int,
) []dashboardWorkflowAgent {
items := make([]dashboardWorkflowAgent, 0, len(roles)+1)
for _, item := range roles {
inbound := latestInboundMessage(messages, item.Name)
outbound := latestOutboundMessage(messages, item.Name)
run := latestRunForRole(runs, item.Name)
isHuman := item.ExecutorKind == role.ExecutorKindHuman
if !item.IsEnabled && !isHuman {
continue
}
if isHuman && inbound.ID == "" && outbound.ID == "" && pendingByRole[item.Name] == 0 {
continue
}
state := "idle"
if !isHuman && run != nil && run.Status == workflow.RunStatusRunning {
state = "running"
} else if pendingByRole[item.Name] > 0 {
state = "queued"
} else if run != nil || inbound.ID != "" || outbound.ID != "" {
state = "recent"
}
agent := dashboardWorkflowAgent{
Name: item.Name,
Category: "workflow",
SortOrder: item.SortOrder,
Description: item.Description,
PendingGlobal: pendingGlobal[item.Name],
SessionLastUsedAt: latestString(latestRunTime(run), inbound.CreatedAt, outbound.CreatedAt),
State: state,
LatestInboundAt: inbound.CreatedAt,
LatestOutboundAt: outbound.CreatedAt,
LatestInboundPreview: previewText(inbound.BodyMarkdown),
LatestOutboundPreview: previewText(outbound.BodyMarkdown),
}
if isHuman {
agent.Category = "user"
} else if run != nil {
agent.CurrentDispatch = &dashboardWorkflowDispatchSummary{
Stage: string(run.Stage),
Mode: run.Mode,
StartedAt: run.StartedAt,
CompletedAt: run.CompletedAt,
Running: run.Status == workflow.RunStatusRunning,
ExitCode: run.ExitCode,
}
}
items = append(items, agent)
}
sort.Slice(items, func(i, j int) bool {
if items[i].SortOrder == items[j].SortOrder {
return items[i].Name < items[j].Name
}
return items[i].SortOrder < items[j].SortOrder
})
return items
}
func buildWorkflowLinks(messages []message.Record, pendingByRole map[string]int, runs []workflow.Run) []dashboardWorkflowLink {
type key struct {
from string
to string
}
grouped := make(map[key]dashboardWorkflowLink)
runningByRole := make(map[string]bool)
for _, item := range runs {
if item.Status == workflow.RunStatusRunning {
runningByRole[item.RoleName] = true
}
}
for _, item := range messages {
for _, recipient := range splitRecipients(item.ToExpr) {
k := key{from: item.FromRoleName, to: recipient}
link := grouped[k]
link.From = item.FromRoleName
link.To = recipient
link.Count++
link.LastMessageAt = latestString(link.LastMessageAt, item.CreatedAt)
if item.CreatedAt >= link.LastMessageAt {
link.LastStage = item.Stage
link.LastType = string(item.Type)
}
link.IsHot = pendingByRole[recipient] > 0 || runningByRole[recipient]
grouped[k] = link
}
}
items := make([]dashboardWorkflowLink, 0, len(grouped))
for _, item := range grouped {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
if items[i].LastMessageAt == items[j].LastMessageAt {
if items[i].From == items[j].From {
return items[i].To < items[j].To
}
return items[i].From < items[j].From
}
return items[i].LastMessageAt > items[j].LastMessageAt
})
return items
}
func buildWorkflowHumanTasks(tasks []humantask.Record, messageByID map[string]message.Record) []dashboardWorkflowHumanTask {
if len(tasks) == 0 {
return []dashboardWorkflowHumanTask{}
}
items := make([]dashboardWorkflowHumanTask, 0, len(tasks))
for _, item := range tasks {
prompt := messageByID[item.PromptMessageID]
items = append(items, dashboardWorkflowHumanTask{
ID: item.ID,
RoleName: item.RoleName,
Status: item.Status,
PromptMessageID: item.PromptMessageID,
PromptFrom: prompt.FromRoleName,
PromptStage: prompt.Stage,
PromptBody: prompt.BodyMarkdown,
AnsweredMessageID: item.AnsweredMessageID,
CreatedAt: item.CreatedAt,
UpdatedAt: item.UpdatedAt,
})
}
sort.Slice(items, func(i, j int) bool {
if items[i].UpdatedAt == items[j].UpdatedAt {
return items[i].ID > items[j].ID
}
return items[i].UpdatedAt > items[j].UpdatedAt
})
return items
}
func latestTopicTimeWithLanes(record topic.Record, messages []message.Record, runs []workflow.Run, lanes []lane.Record) string {
value := latestTopicTime(record, messages, runs)
return latestString(value, latestLaneTime(lanes))
}
func latestLaneTime(items []lane.Record) string {
latest := ""
for _, item := range items {
latest = latestString(latest, item.CompletedAt, item.StartedAt, item.UpdatedAt, item.CreatedAt)
}
return latest
}
func buildWorkflowLanes(items []lane.Record, laneSyncs []lanesync.Record) []dashboardWorkflowLane {
syncsByLane := make(map[string][]dashboardWorkflowLaneSync)
for _, item := range laneSyncs {
entry := dashboardWorkflowLaneSync{
UpstreamLaneID: item.UpstreamLaneID,
TaskID: item.TaskID,
UpstreamCommit: item.UpstreamCommit,
MergeCommit: item.MergeCommit,
Status: string(item.Status),
ErrorMessage: item.ErrorMessage,
CreatedAt: item.CreatedAt,
}
syncsByLane[item.DownstreamLaneID] = append(syncsByLane[item.DownstreamLaneID], entry)
}
out := make([]dashboardWorkflowLane, 0, len(items))
for _, item := range items {
syncHistory := syncsByLane[item.ID]
var lastSync *dashboardWorkflowLaneSync
if len(syncHistory) > 0 {
lastSync = &syncHistory[0]
}
out = append(out, dashboardWorkflowLane{
ID: item.ID,
Name: item.Name,
Slug: item.Slug,
Purpose: item.Purpose,
Status: string(item.Status),
BranchName: item.BranchName,
HeadCommit: item.HeadCommit,
WorktreePath: item.WorktreePath,
ContainerName: item.ContainerName,
RuntimeEndpoint: item.RuntimeEndpoint,
StartedAt: item.StartedAt,
CompletedAt: item.CompletedAt,
ErrorMessage: item.ErrorMessage,
LastSync: lastSync,
SyncHistory: syncHistory,
})
}
sort.Slice(out, func(i, j int) bool {
return out[i].Name < out[j].Name
})
return out
}
func (s *Service) buildWorkflowTasks(ctx context.Context, items []task.Record) ([]dashboardWorkflowTask, error) {
out := make([]dashboardWorkflowTask, 0, len(items))
for _, item := range items {
deps, err := s.repo.ListTaskDependencies(ctx, item.ID)
if err != nil {
return nil, err
}
dependencies := make([]dashboardWorkflowTaskDependency, 0, len(deps))
for _, dep := range deps {
dependencies = append(dependencies, dashboardWorkflowTaskDependency{
DependsOnTaskID: dep.DependsOnTaskID,
})
}
out = append(out, dashboardWorkflowTask{
ID: item.ID,
LaneID: item.LaneID,
Title: item.Title,
Kind: string(item.Kind),
Deliverables: append([]string(nil), item.Deliverables...),
BatchKey: item.BatchKey,
Status: string(item.Status),
Priority: item.Priority,
TaskOrder: item.TaskOrder,
AcceptanceMarkdown: item.AcceptanceMarkdown,
BlockingReasonMarkdown: item.BlockingReasonMarkdown,
ResultSummaryMarkdown: item.ResultSummaryMarkdown,
Dependencies: dependencies,
})
}
sort.Slice(out, func(i, j int) bool {
if out[i].LaneID == out[j].LaneID {
if out[i].TaskOrder == out[j].TaskOrder {
return out[i].Title < out[j].Title
}
return out[i].TaskOrder < out[j].TaskOrder
}
return out[i].LaneID < out[j].LaneID
})
return out, nil
}
func mergePendingRoleCounts(left, right map[string]int) map[string]int {
if len(left) == 0 && len(right) == 0 {
return map[string]int{}
}
out := make(map[string]int, len(left)+len(right))
for key, value := range left {
out[key] += value
}
for key, value := range right {
out[key] += value
}
return out
}