Files

121 lines
3.5 KiB
Go

package workflowrun
import (
"context"
"fmt"
"inbox/internal/app/runtimeconfig"
"inbox/internal/base/timeutil"
"inbox/internal/domain/workflow"
)
type Repository interface {
CreateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error)
GetWorkflowRun(ctx context.Context, runID string) (workflow.Run, error)
UpdateWorkflowRun(ctx context.Context, value workflow.Run) (workflow.Run, error)
ListWorkflowRunsByTopic(ctx context.Context, topicID string) ([]workflow.Run, error)
ListWorkflowRunLogs(ctx context.Context, runID string, afterSeq int) ([]workflow.RunLog, error)
AppendWorkflowRunLog(ctx context.Context, value workflow.RunLog) (workflow.RunLog, error)
}
type RuntimeResolver interface {
ResolveRole(ctx context.Context, workspaceID, roleName string) (runtimeconfig.ResolvedRole, error)
}
type Service struct {
repo Repository
resolver RuntimeResolver
clock timeutil.Clock
}
type Patch struct {
Status *workflow.RunStatus
ReplyMessageID *string
ExitCode *int
CompletedAt *string
ErrorMessage *string
CommandJSON *string
}
func NewService(repo Repository, resolver RuntimeResolver, clock timeutil.Clock) *Service {
if clock == nil {
clock = timeutil.SystemClock{}
}
return &Service{
repo: repo,
resolver: resolver,
clock: clock,
}
}
func (s *Service) Start(ctx context.Context, run workflow.Run) (workflow.Run, error) {
resolved, err := s.resolver.ResolveRole(ctx, run.WorkspaceID, run.RoleName)
if err != nil {
return workflow.Run{}, fmt.Errorf("resolve runtime config for %s: %w", run.RoleName, err)
}
snapshot, err := resolved.SnapshotJSON()
if err != nil {
return workflow.Run{}, err
}
run.ConfigSnapshotJSON = snapshot
if run.Status == "" {
run.Status = workflow.RunStatusRunning
}
return s.repo.CreateWorkflowRun(ctx, run)
}
func (s *Service) ListByTopic(ctx context.Context, topicID string) ([]workflow.Run, error) {
return s.repo.ListWorkflowRunsByTopic(ctx, topicID)
}
func (s *Service) Get(ctx context.Context, runID string) (workflow.Run, error) {
return s.repo.GetWorkflowRun(ctx, runID)
}
func (s *Service) Update(ctx context.Context, value workflow.Run) (workflow.Run, error) {
if value.Status != workflow.RunStatusRunning && value.CompletedAt == "" {
value.CompletedAt = timeutil.FormatRFC3339(s.clock.Now())
}
return s.repo.UpdateWorkflowRun(ctx, value)
}
func (s *Service) Patch(ctx context.Context, runID string, patch Patch) (workflow.Run, error) {
current, err := s.repo.GetWorkflowRun(ctx, runID)
if err != nil {
return workflow.Run{}, err
}
if patch.Status != nil {
current.Status = *patch.Status
}
if patch.ReplyMessageID != nil {
current.ReplyMessageID = *patch.ReplyMessageID
}
if patch.ExitCode != nil {
current.ExitCode = *patch.ExitCode
}
if patch.CompletedAt != nil {
current.CompletedAt = *patch.CompletedAt
}
if patch.ErrorMessage != nil {
current.ErrorMessage = *patch.ErrorMessage
}
if patch.CommandJSON != nil {
current.CommandJSON = *patch.CommandJSON
}
return s.Update(ctx, current)
}
func (s *Service) ListLogs(ctx context.Context, runID string, afterSeq int) ([]workflow.RunLog, error) {
if _, err := s.repo.GetWorkflowRun(ctx, runID); err != nil {
return nil, err
}
return s.repo.ListWorkflowRunLogs(ctx, runID, afterSeq)
}
func (s *Service) AppendLog(ctx context.Context, value workflow.RunLog) (workflow.RunLog, error) {
if _, err := s.repo.GetWorkflowRun(ctx, value.RunID); err != nil {
return workflow.RunLog{}, err
}
return s.repo.AppendWorkflowRunLog(ctx, value)
}