489 lines
15 KiB
Go
489 lines
15 KiB
Go
package dashboard
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"sort"
|
|
"strings"
|
|
|
|
"inbox/internal/domain/humantask"
|
|
"inbox/internal/domain/lane"
|
|
"inbox/internal/domain/lanesync"
|
|
"inbox/internal/domain/message"
|
|
"inbox/internal/domain/role"
|
|
"inbox/internal/domain/task"
|
|
"inbox/internal/domain/topic"
|
|
"inbox/internal/domain/workflow"
|
|
"inbox/internal/domain/workspace"
|
|
)
|
|
|
|
func (s *Service) WorkflowBoard(ctx context.Context, ws workspace.Workspace, activeSlug string) (dashboardWorkflowBoardResponse, error) {
|
|
snapshot, err := s.loadWorkflowBoardSnapshot(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardWorkflowBoardResponse{}, err
|
|
}
|
|
summaries := make([]dashboardWorkflowTopicSummary, 0, len(snapshot.topics))
|
|
for _, item := range snapshot.topics {
|
|
topicMessages := snapshot.messagesByTopic[item.ID]
|
|
topicRuns := snapshot.runsByTopic[item.ID]
|
|
summaries = append(summaries, dashboardWorkflowTopicSummary{
|
|
Name: item.Slug,
|
|
Status: item.Status,
|
|
MessageCount: len(topicMessages),
|
|
LatestStage: latestTopicStage(item, topicMessages, topicRuns),
|
|
LatestTime: latestTopicTimeWithLanes(item, topicMessages, topicRuns, snapshot.lanesByTopic[item.ID]),
|
|
RunningRoles: runningRolesForRuns(topicRuns),
|
|
WaitingRoles: pendingRolesForTopic(
|
|
mergePendingRoleCounts(snapshot.pendingByTopicRole[item.ID], snapshot.pendingHumanByTopicRole[item.ID]),
|
|
snapshot.roles,
|
|
),
|
|
})
|
|
}
|
|
sort.Slice(summaries, func(i, j int) bool {
|
|
if summaries[i].LatestTime == summaries[j].LatestTime {
|
|
return summaries[i].Name < summaries[j].Name
|
|
}
|
|
return summaries[i].LatestTime > summaries[j].LatestTime
|
|
})
|
|
|
|
if activeSlug == "" && len(summaries) > 0 {
|
|
activeSlug = summaries[0].Name
|
|
}
|
|
|
|
response := dashboardWorkflowBoardResponse{
|
|
Topics: summaries,
|
|
Board: nil,
|
|
}
|
|
if activeSlug == "" {
|
|
return response, nil
|
|
}
|
|
record, err := s.repo.GetTopicBySlugOrTitle(ctx, ws.ID, activeSlug, topic.SpaceWorkflow)
|
|
if err != nil {
|
|
return dashboardWorkflowBoardResponse{}, err
|
|
}
|
|
response.ActiveTopic = record.Slug
|
|
board, err := s.buildWorkflowBoard(
|
|
ctx,
|
|
record,
|
|
snapshot.roles,
|
|
snapshot.lanesByTopic[record.ID],
|
|
snapshot.messagesByTopic[record.ID],
|
|
snapshot.runsByTopic[record.ID],
|
|
snapshot.pendingByTopicRole[record.ID],
|
|
snapshot.pendingByRole,
|
|
snapshot.pendingHumanByTopicRole[record.ID],
|
|
snapshot.pendingHumanByRole,
|
|
snapshot.pendingHumanByTopic[record.ID],
|
|
)
|
|
if err != nil {
|
|
return dashboardWorkflowBoardResponse{}, err
|
|
}
|
|
response.Board = board
|
|
return response, nil
|
|
}
|
|
|
|
func (s *Service) buildWorkflowBoard(
|
|
ctx context.Context,
|
|
record topic.Record,
|
|
roles []role.Definition,
|
|
lanes []lane.Record,
|
|
messages []message.Record,
|
|
runs []workflow.Run,
|
|
pendingByRole map[string]int,
|
|
pendingGlobal map[string]int,
|
|
pendingHumanByRole map[string]int,
|
|
pendingHumanGlobal map[string]int,
|
|
humanTasks []humantask.Record,
|
|
) (*dashboardWorkflowBoard, error) {
|
|
tasks, err := s.repo.ListTasksByTopic(ctx, record.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
laneSyncs, err := s.repo.ListLaneSyncsByTopic(ctx, record.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
latestPlan, planErr := s.repo.GetLatestTaskGraphVersionByTopic(ctx, record.ID)
|
|
if planErr != nil && planErr != sql.ErrNoRows {
|
|
return nil, planErr
|
|
}
|
|
events := make([]workflowEventEnvelope, 0, len(messages)+len(runs))
|
|
messageByID := make(map[string]message.Record, len(messages))
|
|
for _, item := range messages {
|
|
messageByID[item.ID] = item
|
|
events = append(events, workflowEventEnvelope{
|
|
Key: item.ID,
|
|
Timestamp: item.CreatedAt,
|
|
Event: dashboardWorkflowEvent{
|
|
Kind: "message",
|
|
ID: item.ID,
|
|
Timestamp: item.CreatedAt,
|
|
From: item.FromRoleName,
|
|
To: item.ToExpr,
|
|
Stage: item.Stage,
|
|
Type: string(item.Type),
|
|
Body: item.BodyMarkdown,
|
|
ReplyTo: item.ReplyToMessageID,
|
|
},
|
|
})
|
|
}
|
|
for _, item := range runs {
|
|
timestamp := latestString(item.CompletedAt, item.StartedAt)
|
|
reply := ""
|
|
if item.ReplyMessageID != "" {
|
|
reply = strings.TrimSpace(messageByID[item.ReplyMessageID].BodyMarkdown)
|
|
}
|
|
events = append(events, workflowEventEnvelope{
|
|
Key: item.ID,
|
|
Timestamp: timestamp,
|
|
Event: dashboardWorkflowEvent{
|
|
Kind: "dispatch",
|
|
ID: item.ID,
|
|
Timestamp: timestamp,
|
|
Role: item.RoleName,
|
|
Stage: string(item.Stage),
|
|
Mode: item.Mode,
|
|
Running: item.Status == workflow.RunStatusRunning,
|
|
StartedAt: item.StartedAt,
|
|
CompletedAt: item.CompletedAt,
|
|
ExitCode: item.ExitCode,
|
|
Reply: reply,
|
|
ErrorMessage: item.ErrorMessage,
|
|
},
|
|
})
|
|
}
|
|
sort.Slice(events, func(i, j int) bool {
|
|
if events[i].Timestamp == events[j].Timestamp {
|
|
return events[i].Key < events[j].Key
|
|
}
|
|
return events[i].Timestamp < events[j].Timestamp
|
|
})
|
|
|
|
combinedPendingByRole := mergePendingRoleCounts(pendingByRole, pendingHumanByRole)
|
|
combinedPendingGlobal := mergePendingRoleCounts(pendingGlobal, pendingHumanGlobal)
|
|
links := buildWorkflowLinks(messages, combinedPendingByRole, runs)
|
|
agents := buildWorkflowAgents(roles, messages, runs, combinedPendingByRole, combinedPendingGlobal)
|
|
summary := dashboardWorkflowSummary{ActiveRoles: []string{}}
|
|
for _, agent := range agents {
|
|
switch agent.State {
|
|
case "running":
|
|
summary.RunningCount++
|
|
summary.ActiveRoles = append(summary.ActiveRoles, agent.Name)
|
|
case "queued", "recent":
|
|
if agent.State == "queued" {
|
|
summary.WaitingCount++
|
|
}
|
|
summary.ActiveRoles = append(summary.ActiveRoles, agent.Name)
|
|
}
|
|
summary.LastEventAt = latestString(summary.LastEventAt, agent.SessionLastUsedAt)
|
|
}
|
|
summary.LastEventAt = latestString(summary.LastEventAt, record.UpdatedAt)
|
|
summary.LastEventAt = latestString(summary.LastEventAt, latestLaneTime(lanes))
|
|
taskItems, err := s.buildWorkflowTasks(ctx, tasks)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
laneItems := buildWorkflowLanes(lanes, laneSyncs)
|
|
for _, item := range laneItems {
|
|
switch item.Status {
|
|
case string(lane.StatusRunning):
|
|
summary.RunningCount++
|
|
case string(lane.StatusReady), string(lane.StatusBlocked):
|
|
summary.WaitingCount++
|
|
}
|
|
}
|
|
|
|
payloadEvents := make([]dashboardWorkflowEvent, 0, len(events))
|
|
for _, item := range events {
|
|
payloadEvents = append(payloadEvents, item.Event)
|
|
}
|
|
payloadHumanTasks := buildWorkflowHumanTasks(humanTasks, messageByID)
|
|
var planPayload *dashboardWorkflowPlan
|
|
if planErr == nil {
|
|
planPayload = &dashboardWorkflowPlan{
|
|
Version: latestPlan.Version,
|
|
Status: string(latestPlan.Status),
|
|
SummaryMarkdown: latestPlan.PlanSummaryMarkdown,
|
|
CreatedAt: latestPlan.CreatedAt,
|
|
ConfirmedAt: latestPlan.ConfirmedAt,
|
|
CreatedByRoleName: latestPlan.CreatedByRoleName,
|
|
SupersedesVersionID: latestPlan.SupersedesGraphVersionID,
|
|
}
|
|
}
|
|
|
|
return &dashboardWorkflowBoard{
|
|
Topic: dashboardWorkflowTopicDetail{
|
|
Name: record.Slug,
|
|
LatestStage: latestTopicStage(record, messages, runs),
|
|
MessageCount: len(messages),
|
|
CreatedAt: record.CreatedAt,
|
|
UpdatedAt: record.UpdatedAt,
|
|
Status: record.Status,
|
|
},
|
|
Plan: planPayload,
|
|
Summary: summary,
|
|
Agents: agents,
|
|
Lanes: laneItems,
|
|
Tasks: taskItems,
|
|
Links: links,
|
|
Events: payloadEvents,
|
|
PendingHumanTasks: payloadHumanTasks,
|
|
}, nil
|
|
}
|
|
|
|
type workflowEventEnvelope struct {
|
|
Key string
|
|
Timestamp string
|
|
Event dashboardWorkflowEvent
|
|
}
|
|
|
|
func buildWorkflowAgents(
|
|
roles []role.Definition,
|
|
messages []message.Record,
|
|
runs []workflow.Run,
|
|
pendingByRole map[string]int,
|
|
pendingGlobal map[string]int,
|
|
) []dashboardWorkflowAgent {
|
|
items := make([]dashboardWorkflowAgent, 0, len(roles)+1)
|
|
for _, item := range roles {
|
|
inbound := latestInboundMessage(messages, item.Name)
|
|
outbound := latestOutboundMessage(messages, item.Name)
|
|
run := latestRunForRole(runs, item.Name)
|
|
isHuman := item.ExecutorKind == role.ExecutorKindHuman
|
|
if !item.IsEnabled && !isHuman {
|
|
continue
|
|
}
|
|
if isHuman && inbound.ID == "" && outbound.ID == "" && pendingByRole[item.Name] == 0 {
|
|
continue
|
|
}
|
|
state := "idle"
|
|
if !isHuman && run != nil && run.Status == workflow.RunStatusRunning {
|
|
state = "running"
|
|
} else if pendingByRole[item.Name] > 0 {
|
|
state = "queued"
|
|
} else if run != nil || inbound.ID != "" || outbound.ID != "" {
|
|
state = "recent"
|
|
}
|
|
agent := dashboardWorkflowAgent{
|
|
Name: item.Name,
|
|
Category: "workflow",
|
|
SortOrder: item.SortOrder,
|
|
Description: item.Description,
|
|
PendingGlobal: pendingGlobal[item.Name],
|
|
SessionLastUsedAt: latestString(latestRunTime(run), inbound.CreatedAt, outbound.CreatedAt),
|
|
State: state,
|
|
LatestInboundAt: inbound.CreatedAt,
|
|
LatestOutboundAt: outbound.CreatedAt,
|
|
LatestInboundPreview: previewText(inbound.BodyMarkdown),
|
|
LatestOutboundPreview: previewText(outbound.BodyMarkdown),
|
|
}
|
|
if isHuman {
|
|
agent.Category = "user"
|
|
} else if run != nil {
|
|
agent.CurrentDispatch = &dashboardWorkflowDispatchSummary{
|
|
Stage: string(run.Stage),
|
|
Mode: run.Mode,
|
|
StartedAt: run.StartedAt,
|
|
CompletedAt: run.CompletedAt,
|
|
Running: run.Status == workflow.RunStatusRunning,
|
|
ExitCode: run.ExitCode,
|
|
}
|
|
}
|
|
items = append(items, agent)
|
|
}
|
|
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].SortOrder == items[j].SortOrder {
|
|
return items[i].Name < items[j].Name
|
|
}
|
|
return items[i].SortOrder < items[j].SortOrder
|
|
})
|
|
return items
|
|
}
|
|
|
|
func buildWorkflowLinks(messages []message.Record, pendingByRole map[string]int, runs []workflow.Run) []dashboardWorkflowLink {
|
|
type key struct {
|
|
from string
|
|
to string
|
|
}
|
|
grouped := make(map[key]dashboardWorkflowLink)
|
|
runningByRole := make(map[string]bool)
|
|
for _, item := range runs {
|
|
if item.Status == workflow.RunStatusRunning {
|
|
runningByRole[item.RoleName] = true
|
|
}
|
|
}
|
|
for _, item := range messages {
|
|
for _, recipient := range splitRecipients(item.ToExpr) {
|
|
k := key{from: item.FromRoleName, to: recipient}
|
|
link := grouped[k]
|
|
link.From = item.FromRoleName
|
|
link.To = recipient
|
|
link.Count++
|
|
link.LastMessageAt = latestString(link.LastMessageAt, item.CreatedAt)
|
|
if item.CreatedAt >= link.LastMessageAt {
|
|
link.LastStage = item.Stage
|
|
link.LastType = string(item.Type)
|
|
}
|
|
link.IsHot = pendingByRole[recipient] > 0 || runningByRole[recipient]
|
|
grouped[k] = link
|
|
}
|
|
}
|
|
items := make([]dashboardWorkflowLink, 0, len(grouped))
|
|
for _, item := range grouped {
|
|
items = append(items, item)
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].LastMessageAt == items[j].LastMessageAt {
|
|
if items[i].From == items[j].From {
|
|
return items[i].To < items[j].To
|
|
}
|
|
return items[i].From < items[j].From
|
|
}
|
|
return items[i].LastMessageAt > items[j].LastMessageAt
|
|
})
|
|
return items
|
|
}
|
|
|
|
func buildWorkflowHumanTasks(tasks []humantask.Record, messageByID map[string]message.Record) []dashboardWorkflowHumanTask {
|
|
if len(tasks) == 0 {
|
|
return []dashboardWorkflowHumanTask{}
|
|
}
|
|
items := make([]dashboardWorkflowHumanTask, 0, len(tasks))
|
|
for _, item := range tasks {
|
|
prompt := messageByID[item.PromptMessageID]
|
|
items = append(items, dashboardWorkflowHumanTask{
|
|
ID: item.ID,
|
|
RoleName: item.RoleName,
|
|
Status: item.Status,
|
|
PromptMessageID: item.PromptMessageID,
|
|
PromptFrom: prompt.FromRoleName,
|
|
PromptStage: prompt.Stage,
|
|
PromptBody: prompt.BodyMarkdown,
|
|
AnsweredMessageID: item.AnsweredMessageID,
|
|
CreatedAt: item.CreatedAt,
|
|
UpdatedAt: item.UpdatedAt,
|
|
})
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
if items[i].UpdatedAt == items[j].UpdatedAt {
|
|
return items[i].ID > items[j].ID
|
|
}
|
|
return items[i].UpdatedAt > items[j].UpdatedAt
|
|
})
|
|
return items
|
|
}
|
|
|
|
func latestTopicTimeWithLanes(record topic.Record, messages []message.Record, runs []workflow.Run, lanes []lane.Record) string {
|
|
value := latestTopicTime(record, messages, runs)
|
|
return latestString(value, latestLaneTime(lanes))
|
|
}
|
|
|
|
func latestLaneTime(items []lane.Record) string {
|
|
latest := ""
|
|
for _, item := range items {
|
|
latest = latestString(latest, item.CompletedAt, item.StartedAt, item.UpdatedAt, item.CreatedAt)
|
|
}
|
|
return latest
|
|
}
|
|
|
|
func buildWorkflowLanes(items []lane.Record, laneSyncs []lanesync.Record) []dashboardWorkflowLane {
|
|
syncsByLane := make(map[string][]dashboardWorkflowLaneSync)
|
|
for _, item := range laneSyncs {
|
|
entry := dashboardWorkflowLaneSync{
|
|
UpstreamLaneID: item.UpstreamLaneID,
|
|
TaskID: item.TaskID,
|
|
UpstreamCommit: item.UpstreamCommit,
|
|
MergeCommit: item.MergeCommit,
|
|
Status: string(item.Status),
|
|
ErrorMessage: item.ErrorMessage,
|
|
CreatedAt: item.CreatedAt,
|
|
}
|
|
syncsByLane[item.DownstreamLaneID] = append(syncsByLane[item.DownstreamLaneID], entry)
|
|
}
|
|
out := make([]dashboardWorkflowLane, 0, len(items))
|
|
for _, item := range items {
|
|
syncHistory := syncsByLane[item.ID]
|
|
var lastSync *dashboardWorkflowLaneSync
|
|
if len(syncHistory) > 0 {
|
|
lastSync = &syncHistory[0]
|
|
}
|
|
out = append(out, dashboardWorkflowLane{
|
|
ID: item.ID,
|
|
Name: item.Name,
|
|
Slug: item.Slug,
|
|
Purpose: item.Purpose,
|
|
Status: string(item.Status),
|
|
BranchName: item.BranchName,
|
|
HeadCommit: item.HeadCommit,
|
|
WorktreePath: item.WorktreePath,
|
|
ContainerName: item.ContainerName,
|
|
RuntimeEndpoint: item.RuntimeEndpoint,
|
|
StartedAt: item.StartedAt,
|
|
CompletedAt: item.CompletedAt,
|
|
ErrorMessage: item.ErrorMessage,
|
|
LastSync: lastSync,
|
|
SyncHistory: syncHistory,
|
|
})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].Name < out[j].Name
|
|
})
|
|
return out
|
|
}
|
|
|
|
func (s *Service) buildWorkflowTasks(ctx context.Context, items []task.Record) ([]dashboardWorkflowTask, error) {
|
|
out := make([]dashboardWorkflowTask, 0, len(items))
|
|
for _, item := range items {
|
|
deps, err := s.repo.ListTaskDependencies(ctx, item.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dependencies := make([]dashboardWorkflowTaskDependency, 0, len(deps))
|
|
for _, dep := range deps {
|
|
dependencies = append(dependencies, dashboardWorkflowTaskDependency{
|
|
DependsOnTaskID: dep.DependsOnTaskID,
|
|
})
|
|
}
|
|
out = append(out, dashboardWorkflowTask{
|
|
ID: item.ID,
|
|
LaneID: item.LaneID,
|
|
Title: item.Title,
|
|
Kind: string(item.Kind),
|
|
Deliverables: append([]string(nil), item.Deliverables...),
|
|
BatchKey: item.BatchKey,
|
|
Status: string(item.Status),
|
|
Priority: item.Priority,
|
|
TaskOrder: item.TaskOrder,
|
|
AcceptanceMarkdown: item.AcceptanceMarkdown,
|
|
BlockingReasonMarkdown: item.BlockingReasonMarkdown,
|
|
ResultSummaryMarkdown: item.ResultSummaryMarkdown,
|
|
Dependencies: dependencies,
|
|
})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
if out[i].LaneID == out[j].LaneID {
|
|
if out[i].TaskOrder == out[j].TaskOrder {
|
|
return out[i].Title < out[j].Title
|
|
}
|
|
return out[i].TaskOrder < out[j].TaskOrder
|
|
}
|
|
return out[i].LaneID < out[j].LaneID
|
|
})
|
|
return out, nil
|
|
}
|
|
|
|
func mergePendingRoleCounts(left, right map[string]int) map[string]int {
|
|
if len(left) == 0 && len(right) == 0 {
|
|
return map[string]int{}
|
|
}
|
|
out := make(map[string]int, len(left)+len(right))
|
|
for key, value := range left {
|
|
out[key] += value
|
|
}
|
|
for key, value := range right {
|
|
out[key] += value
|
|
}
|
|
return out
|
|
}
|