package dashboard import ( "context" "database/sql" "sort" "strings" "inbox/internal/domain/humantask" "inbox/internal/domain/lane" "inbox/internal/domain/lanesync" "inbox/internal/domain/message" "inbox/internal/domain/role" "inbox/internal/domain/task" "inbox/internal/domain/topic" "inbox/internal/domain/workflow" "inbox/internal/domain/workspace" ) func (s *Service) WorkflowBoard(ctx context.Context, ws workspace.Workspace, activeSlug string) (dashboardWorkflowBoardResponse, error) { snapshot, err := s.loadWorkflowBoardSnapshot(ctx, ws.ID) if err != nil { return dashboardWorkflowBoardResponse{}, err } summaries := make([]dashboardWorkflowTopicSummary, 0, len(snapshot.topics)) for _, item := range snapshot.topics { topicMessages := snapshot.messagesByTopic[item.ID] topicRuns := snapshot.runsByTopic[item.ID] summaries = append(summaries, dashboardWorkflowTopicSummary{ Name: item.Slug, Status: item.Status, MessageCount: len(topicMessages), LatestStage: latestTopicStage(item, topicMessages, topicRuns), LatestTime: latestTopicTimeWithLanes(item, topicMessages, topicRuns, snapshot.lanesByTopic[item.ID]), RunningRoles: runningRolesForRuns(topicRuns), WaitingRoles: pendingRolesForTopic( mergePendingRoleCounts(snapshot.pendingByTopicRole[item.ID], snapshot.pendingHumanByTopicRole[item.ID]), snapshot.roles, ), }) } sort.Slice(summaries, func(i, j int) bool { if summaries[i].LatestTime == summaries[j].LatestTime { return summaries[i].Name < summaries[j].Name } return summaries[i].LatestTime > summaries[j].LatestTime }) if activeSlug == "" && len(summaries) > 0 { activeSlug = summaries[0].Name } response := dashboardWorkflowBoardResponse{ Topics: summaries, Board: nil, } if activeSlug == "" { return response, nil } record, err := s.repo.GetTopicBySlugOrTitle(ctx, ws.ID, activeSlug, topic.SpaceWorkflow) if err != nil { return dashboardWorkflowBoardResponse{}, err } response.ActiveTopic = record.Slug board, err := s.buildWorkflowBoard( ctx, record, snapshot.roles, snapshot.lanesByTopic[record.ID], snapshot.messagesByTopic[record.ID], snapshot.runsByTopic[record.ID], snapshot.pendingByTopicRole[record.ID], snapshot.pendingByRole, snapshot.pendingHumanByTopicRole[record.ID], snapshot.pendingHumanByRole, snapshot.pendingHumanByTopic[record.ID], ) if err != nil { return dashboardWorkflowBoardResponse{}, err } response.Board = board return response, nil } func (s *Service) buildWorkflowBoard( ctx context.Context, record topic.Record, roles []role.Definition, lanes []lane.Record, messages []message.Record, runs []workflow.Run, pendingByRole map[string]int, pendingGlobal map[string]int, pendingHumanByRole map[string]int, pendingHumanGlobal map[string]int, humanTasks []humantask.Record, ) (*dashboardWorkflowBoard, error) { tasks, err := s.repo.ListTasksByTopic(ctx, record.ID) if err != nil { return nil, err } laneSyncs, err := s.repo.ListLaneSyncsByTopic(ctx, record.ID) if err != nil { return nil, err } latestPlan, planErr := s.repo.GetLatestTaskGraphVersionByTopic(ctx, record.ID) if planErr != nil && planErr != sql.ErrNoRows { return nil, planErr } events := make([]workflowEventEnvelope, 0, len(messages)+len(runs)) messageByID := make(map[string]message.Record, len(messages)) for _, item := range messages { messageByID[item.ID] = item events = append(events, workflowEventEnvelope{ Key: item.ID, Timestamp: item.CreatedAt, Event: dashboardWorkflowEvent{ Kind: "message", ID: item.ID, Timestamp: item.CreatedAt, From: item.FromRoleName, To: item.ToExpr, Stage: item.Stage, Type: string(item.Type), Body: item.BodyMarkdown, ReplyTo: item.ReplyToMessageID, }, }) } for _, item := range runs { timestamp := latestString(item.CompletedAt, item.StartedAt) reply := "" if item.ReplyMessageID != "" { reply = strings.TrimSpace(messageByID[item.ReplyMessageID].BodyMarkdown) } events = append(events, workflowEventEnvelope{ Key: item.ID, Timestamp: timestamp, Event: dashboardWorkflowEvent{ Kind: "dispatch", ID: item.ID, Timestamp: timestamp, Role: item.RoleName, Stage: string(item.Stage), Mode: item.Mode, Running: item.Status == workflow.RunStatusRunning, StartedAt: item.StartedAt, CompletedAt: item.CompletedAt, ExitCode: item.ExitCode, Reply: reply, ErrorMessage: item.ErrorMessage, }, }) } sort.Slice(events, func(i, j int) bool { if events[i].Timestamp == events[j].Timestamp { return events[i].Key < events[j].Key } return events[i].Timestamp < events[j].Timestamp }) combinedPendingByRole := mergePendingRoleCounts(pendingByRole, pendingHumanByRole) combinedPendingGlobal := mergePendingRoleCounts(pendingGlobal, pendingHumanGlobal) links := buildWorkflowLinks(messages, combinedPendingByRole, runs) agents := buildWorkflowAgents(roles, messages, runs, combinedPendingByRole, combinedPendingGlobal) summary := dashboardWorkflowSummary{ActiveRoles: []string{}} for _, agent := range agents { switch agent.State { case "running": summary.RunningCount++ summary.ActiveRoles = append(summary.ActiveRoles, agent.Name) case "queued", "recent": if agent.State == "queued" { summary.WaitingCount++ } summary.ActiveRoles = append(summary.ActiveRoles, agent.Name) } summary.LastEventAt = latestString(summary.LastEventAt, agent.SessionLastUsedAt) } summary.LastEventAt = latestString(summary.LastEventAt, record.UpdatedAt) summary.LastEventAt = latestString(summary.LastEventAt, latestLaneTime(lanes)) taskItems, err := s.buildWorkflowTasks(ctx, tasks) if err != nil { return nil, err } laneItems := buildWorkflowLanes(lanes, laneSyncs) for _, item := range laneItems { switch item.Status { case string(lane.StatusRunning): summary.RunningCount++ case string(lane.StatusReady), string(lane.StatusBlocked): summary.WaitingCount++ } } payloadEvents := make([]dashboardWorkflowEvent, 0, len(events)) for _, item := range events { payloadEvents = append(payloadEvents, item.Event) } payloadHumanTasks := buildWorkflowHumanTasks(humanTasks, messageByID) var planPayload *dashboardWorkflowPlan if planErr == nil { planPayload = &dashboardWorkflowPlan{ Version: latestPlan.Version, Status: string(latestPlan.Status), SummaryMarkdown: latestPlan.PlanSummaryMarkdown, CreatedAt: latestPlan.CreatedAt, ConfirmedAt: latestPlan.ConfirmedAt, CreatedByRoleName: latestPlan.CreatedByRoleName, SupersedesVersionID: latestPlan.SupersedesGraphVersionID, } } return &dashboardWorkflowBoard{ Topic: dashboardWorkflowTopicDetail{ Name: record.Slug, LatestStage: latestTopicStage(record, messages, runs), MessageCount: len(messages), CreatedAt: record.CreatedAt, UpdatedAt: record.UpdatedAt, Status: record.Status, }, Plan: planPayload, Summary: summary, Agents: agents, Lanes: laneItems, Tasks: taskItems, Links: links, Events: payloadEvents, PendingHumanTasks: payloadHumanTasks, }, nil } type workflowEventEnvelope struct { Key string Timestamp string Event dashboardWorkflowEvent } func buildWorkflowAgents( roles []role.Definition, messages []message.Record, runs []workflow.Run, pendingByRole map[string]int, pendingGlobal map[string]int, ) []dashboardWorkflowAgent { items := make([]dashboardWorkflowAgent, 0, len(roles)+1) for _, item := range roles { inbound := latestInboundMessage(messages, item.Name) outbound := latestOutboundMessage(messages, item.Name) run := latestRunForRole(runs, item.Name) isHuman := item.ExecutorKind == role.ExecutorKindHuman if !item.IsEnabled && !isHuman { continue } if isHuman && inbound.ID == "" && outbound.ID == "" && pendingByRole[item.Name] == 0 { continue } state := "idle" if !isHuman && run != nil && run.Status == workflow.RunStatusRunning { state = "running" } else if pendingByRole[item.Name] > 0 { state = "queued" } else if run != nil || inbound.ID != "" || outbound.ID != "" { state = "recent" } agent := dashboardWorkflowAgent{ Name: item.Name, Category: "workflow", SortOrder: item.SortOrder, Description: item.Description, PendingGlobal: pendingGlobal[item.Name], SessionLastUsedAt: latestString(latestRunTime(run), inbound.CreatedAt, outbound.CreatedAt), State: state, LatestInboundAt: inbound.CreatedAt, LatestOutboundAt: outbound.CreatedAt, LatestInboundPreview: previewText(inbound.BodyMarkdown), LatestOutboundPreview: previewText(outbound.BodyMarkdown), } if isHuman { agent.Category = "user" } else if run != nil { agent.CurrentDispatch = &dashboardWorkflowDispatchSummary{ Stage: string(run.Stage), Mode: run.Mode, StartedAt: run.StartedAt, CompletedAt: run.CompletedAt, Running: run.Status == workflow.RunStatusRunning, ExitCode: run.ExitCode, } } items = append(items, agent) } sort.Slice(items, func(i, j int) bool { if items[i].SortOrder == items[j].SortOrder { return items[i].Name < items[j].Name } return items[i].SortOrder < items[j].SortOrder }) return items } func buildWorkflowLinks(messages []message.Record, pendingByRole map[string]int, runs []workflow.Run) []dashboardWorkflowLink { type key struct { from string to string } grouped := make(map[key]dashboardWorkflowLink) runningByRole := make(map[string]bool) for _, item := range runs { if item.Status == workflow.RunStatusRunning { runningByRole[item.RoleName] = true } } for _, item := range messages { for _, recipient := range splitRecipients(item.ToExpr) { k := key{from: item.FromRoleName, to: recipient} link := grouped[k] link.From = item.FromRoleName link.To = recipient link.Count++ link.LastMessageAt = latestString(link.LastMessageAt, item.CreatedAt) if item.CreatedAt >= link.LastMessageAt { link.LastStage = item.Stage link.LastType = string(item.Type) } link.IsHot = pendingByRole[recipient] > 0 || runningByRole[recipient] grouped[k] = link } } items := make([]dashboardWorkflowLink, 0, len(grouped)) for _, item := range grouped { items = append(items, item) } sort.Slice(items, func(i, j int) bool { if items[i].LastMessageAt == items[j].LastMessageAt { if items[i].From == items[j].From { return items[i].To < items[j].To } return items[i].From < items[j].From } return items[i].LastMessageAt > items[j].LastMessageAt }) return items } func buildWorkflowHumanTasks(tasks []humantask.Record, messageByID map[string]message.Record) []dashboardWorkflowHumanTask { if len(tasks) == 0 { return []dashboardWorkflowHumanTask{} } items := make([]dashboardWorkflowHumanTask, 0, len(tasks)) for _, item := range tasks { prompt := messageByID[item.PromptMessageID] items = append(items, dashboardWorkflowHumanTask{ ID: item.ID, RoleName: item.RoleName, Status: item.Status, PromptMessageID: item.PromptMessageID, PromptFrom: prompt.FromRoleName, PromptStage: prompt.Stage, PromptBody: prompt.BodyMarkdown, AnsweredMessageID: item.AnsweredMessageID, CreatedAt: item.CreatedAt, UpdatedAt: item.UpdatedAt, }) } sort.Slice(items, func(i, j int) bool { if items[i].UpdatedAt == items[j].UpdatedAt { return items[i].ID > items[j].ID } return items[i].UpdatedAt > items[j].UpdatedAt }) return items } func latestTopicTimeWithLanes(record topic.Record, messages []message.Record, runs []workflow.Run, lanes []lane.Record) string { value := latestTopicTime(record, messages, runs) return latestString(value, latestLaneTime(lanes)) } func latestLaneTime(items []lane.Record) string { latest := "" for _, item := range items { latest = latestString(latest, item.CompletedAt, item.StartedAt, item.UpdatedAt, item.CreatedAt) } return latest } func buildWorkflowLanes(items []lane.Record, laneSyncs []lanesync.Record) []dashboardWorkflowLane { syncsByLane := make(map[string][]dashboardWorkflowLaneSync) for _, item := range laneSyncs { entry := dashboardWorkflowLaneSync{ UpstreamLaneID: item.UpstreamLaneID, TaskID: item.TaskID, UpstreamCommit: item.UpstreamCommit, MergeCommit: item.MergeCommit, Status: string(item.Status), ErrorMessage: item.ErrorMessage, CreatedAt: item.CreatedAt, } syncsByLane[item.DownstreamLaneID] = append(syncsByLane[item.DownstreamLaneID], entry) } out := make([]dashboardWorkflowLane, 0, len(items)) for _, item := range items { syncHistory := syncsByLane[item.ID] var lastSync *dashboardWorkflowLaneSync if len(syncHistory) > 0 { lastSync = &syncHistory[0] } out = append(out, dashboardWorkflowLane{ ID: item.ID, Name: item.Name, Slug: item.Slug, Purpose: item.Purpose, Status: string(item.Status), BranchName: item.BranchName, HeadCommit: item.HeadCommit, WorktreePath: item.WorktreePath, ContainerName: item.ContainerName, RuntimeEndpoint: item.RuntimeEndpoint, StartedAt: item.StartedAt, CompletedAt: item.CompletedAt, ErrorMessage: item.ErrorMessage, LastSync: lastSync, SyncHistory: syncHistory, }) } sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name }) return out } func (s *Service) buildWorkflowTasks(ctx context.Context, items []task.Record) ([]dashboardWorkflowTask, error) { out := make([]dashboardWorkflowTask, 0, len(items)) for _, item := range items { deps, err := s.repo.ListTaskDependencies(ctx, item.ID) if err != nil { return nil, err } dependencies := make([]dashboardWorkflowTaskDependency, 0, len(deps)) for _, dep := range deps { dependencies = append(dependencies, dashboardWorkflowTaskDependency{ DependsOnTaskID: dep.DependsOnTaskID, }) } out = append(out, dashboardWorkflowTask{ ID: item.ID, LaneID: item.LaneID, Title: item.Title, Kind: string(item.Kind), Deliverables: append([]string(nil), item.Deliverables...), BatchKey: item.BatchKey, Status: string(item.Status), Priority: item.Priority, TaskOrder: item.TaskOrder, AcceptanceMarkdown: item.AcceptanceMarkdown, BlockingReasonMarkdown: item.BlockingReasonMarkdown, ResultSummaryMarkdown: item.ResultSummaryMarkdown, Dependencies: dependencies, }) } sort.Slice(out, func(i, j int) bool { if out[i].LaneID == out[j].LaneID { if out[i].TaskOrder == out[j].TaskOrder { return out[i].Title < out[j].Title } return out[i].TaskOrder < out[j].TaskOrder } return out[i].LaneID < out[j].LaneID }) return out, nil } func mergePendingRoleCounts(left, right map[string]int) map[string]int { if len(left) == 0 && len(right) == 0 { return map[string]int{} } out := make(map[string]int, len(left)+len(right)) for key, value := range left { out[key] += value } for key, value := range right { out[key] += value } return out }