108 lines
2.3 KiB
Go
108 lines
2.3 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
dbpkg "ai-workflow-skill/internal/db"
|
|
)
|
|
|
|
func TestClaimThreadReturnsLeaseConflictAfterBusyWrite(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
dbPath := filepath.Join(t.TempDir(), "coord.db")
|
|
|
|
sqlDB, err := dbpkg.Open(ctx, dbPath)
|
|
if err != nil {
|
|
t.Fatalf("open base db: %v", err)
|
|
}
|
|
defer sqlDB.Close()
|
|
|
|
if err := dbpkg.ApplyMigrations(ctx, sqlDB); err != nil {
|
|
t.Fatalf("apply migrations: %v", err)
|
|
}
|
|
|
|
baseStore := NewInboxStore(sqlDB)
|
|
thread, _, err := baseStore.Send(ctx, SendInput{
|
|
FromAgent: "leader",
|
|
ToAgent: "worker-a",
|
|
Subject: "race claim",
|
|
Summary: "race claim",
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("seed thread: %v", err)
|
|
}
|
|
|
|
lockerDB, err := dbpkg.Open(ctx, dbPath)
|
|
if err != nil {
|
|
t.Fatalf("open locker db: %v", err)
|
|
}
|
|
defer lockerDB.Close()
|
|
|
|
lockTx, err := lockerDB.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
t.Fatalf("begin locker tx: %v", err)
|
|
}
|
|
|
|
now := nowUTC()
|
|
if _, err := lockTx.ExecContext(
|
|
ctx,
|
|
`INSERT INTO leases (
|
|
thread_id, agent_id, lease_token, claimed_at, expires_at, released_at
|
|
) VALUES (?, ?, ?, ?, ?, NULL)`,
|
|
thread.ThreadID,
|
|
"worker-a",
|
|
"lease_locked",
|
|
formatTime(now),
|
|
formatTime(now.Add(5*time.Minute)),
|
|
); err != nil {
|
|
t.Fatalf("seed active lease in tx: %v", err)
|
|
}
|
|
|
|
if _, err := lockTx.ExecContext(
|
|
ctx,
|
|
`UPDATE threads
|
|
SET status = ?, assigned_to = ?, latest_message_id = ?, updated_at = ?
|
|
WHERE thread_id = ?`,
|
|
"claimed",
|
|
"worker-a",
|
|
"msg_locked",
|
|
formatTime(now),
|
|
thread.ThreadID,
|
|
); err != nil {
|
|
t.Fatalf("seed claimed thread in tx: %v", err)
|
|
}
|
|
|
|
commitDone := make(chan error, 1)
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
commitDone <- lockTx.Commit()
|
|
}()
|
|
|
|
claimDB, err := dbpkg.Open(ctx, dbPath)
|
|
if err != nil {
|
|
t.Fatalf("open claim db: %v", err)
|
|
}
|
|
defer claimDB.Close()
|
|
|
|
claimStore := NewInboxStore(claimDB)
|
|
_, err = claimStore.ClaimThread(ctx, ClaimInput{
|
|
ThreadID: thread.ThreadID,
|
|
Agent: "worker-b",
|
|
LeaseSeconds: 300,
|
|
})
|
|
if !errors.Is(err, ErrLeaseConflict) {
|
|
t.Fatalf("expected lease conflict after busy retry, got %v", err)
|
|
}
|
|
|
|
if err := <-commitDone; err != nil {
|
|
t.Fatalf("commit locker tx: %v", err)
|
|
}
|
|
}
|