264 lines
8.2 KiB
Go
264 lines
8.2 KiB
Go
package dashboard
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"strings"
|
|
|
|
"inbox/internal/domain/message"
|
|
"inbox/internal/domain/topic"
|
|
"inbox/internal/domain/workflow"
|
|
"inbox/internal/domain/workspace"
|
|
)
|
|
|
|
func (s *Service) Messages(ctx context.Context, ws workspace.Workspace) (dashboardMessagesResponse, error) {
|
|
topics, err := s.repo.ListTopics(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardMessagesResponse{}, err
|
|
}
|
|
topicByID := make(map[string]topic.Record, len(topics))
|
|
for _, item := range topics {
|
|
topicByID[item.ID] = item
|
|
}
|
|
messages, err := s.repo.ListMessagesByWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardMessagesResponse{}, err
|
|
}
|
|
items := make([]dashboardMessageItem, 0, len(messages))
|
|
for _, item := range messages {
|
|
items = append(items, dashboardMessageItemFor(item, topicByID[item.TopicID]))
|
|
}
|
|
return dashboardMessagesResponse{Messages: items}, nil
|
|
}
|
|
|
|
func (s *Service) Topics(ctx context.Context, ws workspace.Workspace) (dashboardTopicsResponse, error) {
|
|
snapshot, err := s.loadWorkspaceTopicSnapshot(ctx, ws.ID, topic.SpaceWorkflow)
|
|
if err != nil {
|
|
return dashboardTopicsResponse{}, err
|
|
}
|
|
items := make([]dashboardTopicInfo, 0, len(snapshot.topics))
|
|
for _, item := range snapshot.topics {
|
|
stages := topicStages(item, snapshot.messagesByTopic[item.ID], snapshot.runsByTopic[item.ID])
|
|
items = append(items, dashboardTopicInfo{
|
|
Name: item.Slug,
|
|
MessageCount: len(snapshot.messagesByTopic[item.ID]),
|
|
Stages: stages,
|
|
LatestStage: latestTopicStage(item, snapshot.messagesByTopic[item.ID], snapshot.runsByTopic[item.ID]),
|
|
})
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
return items[i].Name < items[j].Name
|
|
})
|
|
return dashboardTopicsResponse{Topics: items}, nil
|
|
}
|
|
|
|
func (s *Service) TopicRecords(ctx context.Context, ws workspace.Workspace, spaceFilter string) (dashboardTopicRecordsResponse, error) {
|
|
items, err := s.repo.ListTopics(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardTopicRecordsResponse{}, err
|
|
}
|
|
records := make([]dashboardTopicRecord, 0, len(items))
|
|
for _, item := range items {
|
|
if spaceFilter != "" && string(item.Space) != spaceFilter {
|
|
continue
|
|
}
|
|
records = append(records, dashboardTopicRecord{
|
|
Name: item.Slug,
|
|
Space: string(item.Space),
|
|
Status: item.Status,
|
|
CreatedAt: item.CreatedAt,
|
|
UpdatedAt: item.UpdatedAt,
|
|
Description: item.Summary,
|
|
})
|
|
}
|
|
return dashboardTopicRecordsResponse{Records: records}, nil
|
|
}
|
|
|
|
func (s *Service) SpaceTopics(ctx context.Context, ws workspace.Workspace, space topic.Space) (dashboardSpaceTopicsResponse, error) {
|
|
snapshot, err := s.loadWorkspaceTopicSnapshot(ctx, ws.ID, space)
|
|
if err != nil {
|
|
return dashboardSpaceTopicsResponse{}, err
|
|
}
|
|
latestMessageByTopic := make(map[string]string, len(snapshot.topics))
|
|
items := make([]dashboardSpaceTopic, 0, len(snapshot.topics))
|
|
for _, item := range snapshot.topics {
|
|
topicMessages := snapshot.messagesByTopic[item.ID]
|
|
latestMessageByTopic[item.Slug] = latestMessageTime(topicMessages)
|
|
items = append(items, dashboardSpaceTopic{
|
|
Topic: item.Slug,
|
|
MessageCount: len(topicMessages),
|
|
LastFile: latestMessageID(topicMessages),
|
|
Status: item.Status,
|
|
Description: item.Summary,
|
|
})
|
|
}
|
|
sort.Slice(items, func(i, j int) bool {
|
|
leftTime := latestMessageByTopic[items[i].Topic]
|
|
rightTime := latestMessageByTopic[items[j].Topic]
|
|
if leftTime == rightTime {
|
|
return items[i].Topic < items[j].Topic
|
|
}
|
|
return leftTime > rightTime
|
|
})
|
|
return dashboardSpaceTopicsResponse{Topics: items}, nil
|
|
}
|
|
|
|
func (s *Service) SpaceMessages(ctx context.Context, ws workspace.Workspace, space topic.Space, topicSlug string) (dashboardMessagesResponse, error) {
|
|
record, err := s.repo.GetTopicBySlugOrTitle(ctx, ws.ID, topicSlug, space)
|
|
if err != nil {
|
|
return dashboardMessagesResponse{}, err
|
|
}
|
|
messages, err := s.repo.ListMessagesByTopic(ctx, record.ID)
|
|
if err != nil {
|
|
return dashboardMessagesResponse{}, err
|
|
}
|
|
items := make([]dashboardMessageItem, 0, len(messages))
|
|
for _, item := range messages {
|
|
items = append(items, dashboardMessageItemFor(item, record))
|
|
}
|
|
return dashboardMessagesResponse{Messages: items}, nil
|
|
}
|
|
|
|
func (s *Service) Dispatch(ctx context.Context, ws workspace.Workspace) (dashboardDispatchLogsResponse, error) {
|
|
topics, err := s.repo.ListTopics(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardDispatchLogsResponse{}, err
|
|
}
|
|
topicByID := make(map[string]topic.Record, len(topics))
|
|
for _, item := range topics {
|
|
topicByID[item.ID] = item
|
|
}
|
|
messages, err := s.repo.ListMessagesByWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardDispatchLogsResponse{}, err
|
|
}
|
|
messageByID := make(map[string]message.Record, len(messages))
|
|
for _, item := range messages {
|
|
messageByID[item.ID] = item
|
|
}
|
|
runs, err := s.repo.ListWorkflowRunsByWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardDispatchLogsResponse{}, err
|
|
}
|
|
items := make([]dashboardDispatchLog, 0, len(runs))
|
|
for _, run := range runs {
|
|
topicSlug := topicByID[run.TopicID].Slug
|
|
reply := ""
|
|
if run.ReplyMessageID != "" {
|
|
reply = strings.TrimSpace(messageByID[run.ReplyMessageID].BodyMarkdown)
|
|
}
|
|
items = append(items, dashboardDispatchLog{
|
|
Role: run.RoleName,
|
|
InboxFile: coalesce(run.RequestMessageID, run.ID),
|
|
Stage: string(run.Stage),
|
|
Topic: topicSlug,
|
|
Mode: run.Mode,
|
|
StartedAt: run.StartedAt,
|
|
CompletedAt: run.CompletedAt,
|
|
ExitCode: run.ExitCode,
|
|
Reply: reply,
|
|
ErrorMessage: run.ErrorMessage,
|
|
Running: run.Status == workflow.RunStatusRunning,
|
|
})
|
|
}
|
|
return dashboardDispatchLogsResponse{Logs: items}, nil
|
|
}
|
|
|
|
func (s *Service) DispatchLive(ctx context.Context, ws workspace.Workspace, topicSlug, roleName string, afterSeq int) (dashboardDispatchLiveResponse, error) {
|
|
record, err := s.repo.GetTopicBySlugOrTitle(ctx, ws.ID, topicSlug)
|
|
if err != nil {
|
|
return dashboardDispatchLiveResponse{}, err
|
|
}
|
|
runs, err := s.repo.ListWorkflowRunsByTopic(ctx, record.ID)
|
|
if err != nil {
|
|
return dashboardDispatchLiveResponse{}, err
|
|
}
|
|
var selected *workflow.Run
|
|
for _, run := range runs {
|
|
if run.RoleName != roleName {
|
|
continue
|
|
}
|
|
runCopy := run
|
|
selected = &runCopy
|
|
break
|
|
}
|
|
if selected == nil {
|
|
return dashboardDispatchLiveResponse{
|
|
Entries: []dashboardDispatchLiveEntry{},
|
|
Offset: afterSeq,
|
|
}, nil
|
|
}
|
|
logs, err := s.repo.ListWorkflowRunLogs(ctx, selected.ID, afterSeq)
|
|
if err != nil {
|
|
return dashboardDispatchLiveResponse{}, err
|
|
}
|
|
entries := make([]dashboardDispatchLiveEntry, 0, len(logs))
|
|
offset := afterSeq
|
|
for _, item := range logs {
|
|
entries = append(entries, dashboardDispatchLiveEntry{
|
|
Text: item.Content,
|
|
Timestamp: item.CreatedAt,
|
|
})
|
|
offset = item.Seq
|
|
}
|
|
return dashboardDispatchLiveResponse{
|
|
Entries: entries,
|
|
Offset: offset,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) Roles(ctx context.Context, ws *workspace.Workspace) (dashboardRolesResponse, error) {
|
|
roles, err := s.repo.ListRoles(ctx)
|
|
if err != nil {
|
|
return dashboardRolesResponse{}, err
|
|
}
|
|
var (
|
|
messages []message.Record
|
|
runs []workflow.Run
|
|
)
|
|
pendingByRole := make(map[string]int)
|
|
if ws != nil {
|
|
messages, err = s.repo.ListMessagesByWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardRolesResponse{}, err
|
|
}
|
|
runs, err = s.repo.ListWorkflowRunsByWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardRolesResponse{}, err
|
|
}
|
|
pending, err := s.repo.ListPendingDeliveriesByWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
return dashboardRolesResponse{}, err
|
|
}
|
|
for _, item := range pending {
|
|
pendingByRole[item.RoleName] += item.Count
|
|
}
|
|
}
|
|
|
|
items := make([]dashboardRoleInfo, 0, len(roles))
|
|
for _, item := range roles {
|
|
if !item.IsEnabled {
|
|
continue
|
|
}
|
|
lastRun := latestRunForRole(runs, item.Name)
|
|
lastMessage := latestMessageForRole(messages, item.Name)
|
|
var session *dashboardSessionInfo
|
|
if lastRun != nil {
|
|
session = &dashboardSessionInfo{
|
|
Role: item.Name,
|
|
CreatedAt: lastRun.StartedAt,
|
|
LastUsedAt: latestString(lastRun.CompletedAt, lastRun.StartedAt, lastMessage.CreatedAt),
|
|
LastMessage: previewText(lastMessage.BodyMarkdown),
|
|
}
|
|
}
|
|
items = append(items, dashboardRoleInfo{
|
|
Name: item.Name,
|
|
Description: item.Description,
|
|
SortOrder: item.SortOrder,
|
|
Pending: pendingByRole[item.Name],
|
|
Session: session,
|
|
})
|
|
}
|
|
return dashboardRolesResponse{Roles: items}, nil
|
|
}
|