151 lines
4.3 KiB
Go
151 lines
4.3 KiB
Go
package lanematerialize
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
|
|
"inbox/internal/app/lanegit"
|
|
"inbox/internal/base/timeutil"
|
|
"inbox/internal/domain/lane"
|
|
"inbox/internal/domain/lanesync"
|
|
)
|
|
|
|
type SyncRecorder interface {
|
|
CreateLaneSync(ctx context.Context, value lanesync.Record) (lanesync.Record, error)
|
|
}
|
|
|
|
type Upstream struct {
|
|
TaskID string
|
|
Lane lane.Record
|
|
}
|
|
|
|
type Service struct {
|
|
recorder SyncRecorder
|
|
runner lanegit.Runner
|
|
clock timeutil.Clock
|
|
}
|
|
|
|
func NewService(recorder SyncRecorder, runner lanegit.Runner, clock timeutil.Clock) *Service {
|
|
if runner == nil {
|
|
runner = lanegit.ExecRunner{}
|
|
}
|
|
if clock == nil {
|
|
clock = timeutil.SystemClock{}
|
|
}
|
|
return &Service{recorder: recorder, runner: runner, clock: clock}
|
|
}
|
|
|
|
func (s *Service) Materialize(ctx context.Context, downstream lane.Record, taskID string, upstreams []Upstream) error {
|
|
if len(upstreams) == 0 {
|
|
return nil
|
|
}
|
|
worktree := strings.TrimSpace(downstream.WorktreePath)
|
|
if worktree == "" {
|
|
return fmt.Errorf("lane %s has empty worktree path", downstream.ID)
|
|
}
|
|
clean, err := s.isClean(ctx, worktree)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !clean {
|
|
return fmt.Errorf("lane %s worktree has uncommitted changes before materialization", downstream.ID)
|
|
}
|
|
|
|
sort.SliceStable(upstreams, func(i, j int) bool {
|
|
if upstreams[i].Lane.ID == upstreams[j].Lane.ID {
|
|
return upstreams[i].TaskID < upstreams[j].TaskID
|
|
}
|
|
return upstreams[i].Lane.ID < upstreams[j].Lane.ID
|
|
})
|
|
|
|
for _, upstream := range upstreams {
|
|
if upstream.Lane.ID == downstream.ID {
|
|
continue
|
|
}
|
|
commit := strings.TrimSpace(upstream.Lane.HeadCommit)
|
|
if commit == "" {
|
|
err := fmt.Errorf("upstream lane %s has no head_commit", upstream.Lane.ID)
|
|
s.record(ctx, downstream, upstream, "", lanesync.StatusFailed, err.Error())
|
|
return err
|
|
}
|
|
|
|
alreadyMerged, err := s.isAncestor(ctx, worktree, commit)
|
|
if err != nil {
|
|
s.record(ctx, downstream, upstream, "", lanesync.StatusFailed, err.Error())
|
|
return err
|
|
}
|
|
if alreadyMerged {
|
|
s.record(ctx, downstream, upstream, commit, lanesync.StatusSkipped, "")
|
|
continue
|
|
}
|
|
|
|
if _, err := lanegit.Run(ctx, s.runner, worktree, nil, "merge", "--no-ff", "--no-edit", commit); err != nil {
|
|
_ = s.abortMerge(ctx, worktree)
|
|
s.record(ctx, downstream, upstream, "", lanesync.StatusFailed, err.Error())
|
|
return err
|
|
}
|
|
mergedHead, err := s.headCommit(ctx, worktree)
|
|
if err != nil {
|
|
s.record(ctx, downstream, upstream, "", lanesync.StatusFailed, err.Error())
|
|
return err
|
|
}
|
|
s.record(ctx, downstream, upstream, mergedHead, lanesync.StatusApplied, "")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) record(ctx context.Context, downstream lane.Record, upstream Upstream, mergeCommit string, status lanesync.Status, errorMessage string) {
|
|
if s.recorder == nil {
|
|
return
|
|
}
|
|
_, _ = s.recorder.CreateLaneSync(ctx, lanesync.Record{
|
|
WorkspaceID: downstream.WorkspaceID,
|
|
TopicID: downstream.TopicID,
|
|
DownstreamLaneID: downstream.ID,
|
|
UpstreamLaneID: upstream.Lane.ID,
|
|
TaskID: upstream.TaskID,
|
|
UpstreamCommit: strings.TrimSpace(upstream.Lane.HeadCommit),
|
|
MergeCommit: strings.TrimSpace(mergeCommit),
|
|
Status: status,
|
|
ErrorMessage: strings.TrimSpace(errorMessage),
|
|
})
|
|
}
|
|
|
|
func (s *Service) abortMerge(ctx context.Context, worktree string) error {
|
|
_, err := lanegit.Run(ctx, s.runner, worktree, nil, "merge", "--abort")
|
|
return err
|
|
}
|
|
|
|
func (s *Service) headCommit(ctx context.Context, worktree string) (string, error) {
|
|
out, err := lanegit.Run(ctx, s.runner, worktree, nil, "rev-parse", "HEAD")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return strings.TrimSpace(out), nil
|
|
}
|
|
|
|
func (s *Service) isClean(ctx context.Context, worktree string) (bool, error) {
|
|
out, err := lanegit.Run(ctx, s.runner, worktree, nil, "status", "--porcelain")
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return strings.TrimSpace(out) == "", nil
|
|
}
|
|
|
|
func (s *Service) isAncestor(ctx context.Context, worktree, commit string) (bool, error) {
|
|
head, err := s.headCommit(ctx, worktree)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
_, err = s.runner.Run(ctx, worktree, nil, "git", "merge-base", "--is-ancestor", commit, head)
|
|
if err == nil {
|
|
return true, nil
|
|
}
|
|
if lanegit.IsExitCode(err, 1) {
|
|
return false, nil
|
|
}
|
|
return false, fmt.Errorf("git merge-base --is-ancestor %s %s: %w", commit, head, err)
|
|
}
|