414 lines
12 KiB
Go
414 lines
12 KiB
Go
package sqlite
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"inbox/internal/domain/task"
|
|
)
|
|
|
|
func (s *Store) CreateTask(ctx context.Context, value task.Record, dependencies []task.Dependency) (task.Record, error) {
|
|
value = task.NormalizeRecord(value)
|
|
if err := value.Validate(); err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return task.Record{}, fmt.Errorf("begin create task: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
if value.ID == "" {
|
|
id, err := s.newID("task")
|
|
if err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
value.ID = id
|
|
}
|
|
now := s.now()
|
|
value.CreatedAt = coalesceString(value.CreatedAt, now)
|
|
value.UpdatedAt = now
|
|
for idx := range dependencies {
|
|
dependencies[idx].TaskID = value.ID
|
|
if err := dependencies[idx].Validate(); err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
}
|
|
if _, err := tx.ExecContext(ctx, `
|
|
INSERT INTO tasks(
|
|
id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
|
|
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
|
|
created_at, updated_at, started_at, completed_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`,
|
|
value.ID,
|
|
value.WorkspaceID,
|
|
value.TopicID,
|
|
value.LaneID,
|
|
value.Title,
|
|
value.BodyMarkdown,
|
|
value.AcceptanceMarkdown,
|
|
string(value.Kind),
|
|
mustMarshalStringSlice(value.Deliverables),
|
|
value.BatchKey,
|
|
string(value.Status),
|
|
value.Priority,
|
|
value.TaskOrder,
|
|
value.CreatedByRoleName,
|
|
value.BlockingReasonMarkdown,
|
|
value.ResultSummaryMarkdown,
|
|
nullableString(value.AssignedRunID),
|
|
value.CreatedAt,
|
|
value.UpdatedAt,
|
|
nullableString(value.StartedAt),
|
|
nullableString(value.CompletedAt),
|
|
); err != nil {
|
|
return task.Record{}, fmt.Errorf("insert task: %w", err)
|
|
}
|
|
|
|
for _, dep := range dependencies {
|
|
if _, err := tx.ExecContext(ctx, `
|
|
INSERT INTO task_dependencies(task_id, depends_on_task_id)
|
|
VALUES(?, ?)
|
|
`, value.ID, dep.DependsOnTaskID); err != nil {
|
|
return task.Record{}, fmt.Errorf("insert task dependency: %w", err)
|
|
}
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return task.Record{}, fmt.Errorf("commit create task: %w", err)
|
|
}
|
|
return value, nil
|
|
}
|
|
|
|
func (s *Store) GetTask(ctx context.Context, taskID string) (task.Record, error) {
|
|
row := s.db.QueryRowContext(ctx, `
|
|
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
|
|
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
|
|
created_at, updated_at, started_at, completed_at
|
|
FROM tasks
|
|
WHERE id = ?
|
|
`, taskID)
|
|
return scanTask(row)
|
|
}
|
|
|
|
func (s *Store) ListTasksByTopic(ctx context.Context, topicID string) ([]task.Record, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
|
|
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
|
|
created_at, updated_at, started_at, completed_at
|
|
FROM tasks
|
|
WHERE topic_id = ?
|
|
ORDER BY priority DESC, task_order, created_at, id
|
|
`, topicID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list tasks by topic: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []task.Record
|
|
for rows.Next() {
|
|
item, err := scanTask(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate tasks by topic: %w", err)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Store) ListTasksByLane(ctx context.Context, laneID string) ([]task.Record, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
|
|
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
|
|
created_at, updated_at, started_at, completed_at
|
|
FROM tasks
|
|
WHERE lane_id = ?
|
|
ORDER BY task_order, priority DESC, created_at, id
|
|
`, laneID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list tasks by lane: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []task.Record
|
|
for rows.Next() {
|
|
item, err := scanTask(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate tasks by lane: %w", err)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Store) UpdateTask(ctx context.Context, value task.Record) (task.Record, error) {
|
|
return s.UpdateTaskWithDependencies(ctx, value, nil)
|
|
}
|
|
|
|
func (s *Store) UpdateTaskWithDependencies(ctx context.Context, value task.Record, dependencies *[]task.Dependency) (task.Record, error) {
|
|
if value.ID == "" {
|
|
return task.Record{}, fmt.Errorf("task id is required")
|
|
}
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return task.Record{}, fmt.Errorf("begin update task: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
before, err := getTaskTx(ctx, tx, value.ID)
|
|
if err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
value.CreatedAt = before.CreatedAt
|
|
value.UpdatedAt = s.now()
|
|
value = task.NormalizeRecord(value)
|
|
if err := value.Validate(); err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
if err := updateTaskTx(ctx, tx, value); err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
if dependencies != nil {
|
|
if err := replaceTaskDependenciesTx(ctx, tx, value.ID, *dependencies); err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return task.Record{}, fmt.Errorf("commit update task: %w", err)
|
|
}
|
|
return s.GetTask(ctx, value.ID)
|
|
}
|
|
|
|
func (s *Store) ReplaceTaskDependencies(ctx context.Context, taskID string, dependencies []task.Dependency) error {
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("begin replace task dependencies: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
if err := replaceTaskDependenciesTx(ctx, tx, taskID, dependencies); err != nil {
|
|
return err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("commit replace task dependencies: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) ListTaskDependencies(ctx context.Context, taskID string) ([]task.Dependency, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT task_id, depends_on_task_id
|
|
FROM task_dependencies
|
|
WHERE task_id = ?
|
|
ORDER BY depends_on_task_id
|
|
`, taskID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list task dependencies: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []task.Dependency
|
|
for rows.Next() {
|
|
var item task.Dependency
|
|
if err := rows.Scan(&item.TaskID, &item.DependsOnTaskID); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate task dependencies: %w", err)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Store) AppendTaskEvent(ctx context.Context, value task.Event) (task.Event, error) {
|
|
if err := value.Validate(); err != nil {
|
|
return task.Event{}, err
|
|
}
|
|
if value.ID == "" {
|
|
id, err := s.newID("task-event")
|
|
if err != nil {
|
|
return task.Event{}, err
|
|
}
|
|
value.ID = id
|
|
}
|
|
value.CreatedAt = coalesceString(value.CreatedAt, s.now())
|
|
if _, err := s.db.ExecContext(ctx, `
|
|
INSERT INTO task_events(id, task_id, event_type, body_markdown, created_by_role_name, created_at)
|
|
VALUES(?, ?, ?, ?, ?, ?)
|
|
`, value.ID, value.TaskID, value.EventType, value.BodyMarkdown, value.CreatedByRoleName, value.CreatedAt); err != nil {
|
|
return task.Event{}, fmt.Errorf("append task event: %w", err)
|
|
}
|
|
return value, nil
|
|
}
|
|
|
|
func (s *Store) ListTaskEvents(ctx context.Context, taskID string) ([]task.Event, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT id, task_id, event_type, body_markdown, created_by_role_name, created_at
|
|
FROM task_events
|
|
WHERE task_id = ?
|
|
ORDER BY created_at, id
|
|
`, taskID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list task events: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []task.Event
|
|
for rows.Next() {
|
|
var item task.Event
|
|
if err := rows.Scan(&item.ID, &item.TaskID, &item.EventType, &item.BodyMarkdown, &item.CreatedByRoleName, &item.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate task events: %w", err)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func getTaskTx(ctx context.Context, tx *sql.Tx, taskID string) (task.Record, error) {
|
|
row := tx.QueryRowContext(ctx, `
|
|
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
|
|
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
|
|
created_at, updated_at, started_at, completed_at
|
|
FROM tasks
|
|
WHERE id = ?
|
|
`, taskID)
|
|
return scanTask(row)
|
|
}
|
|
|
|
func updateTaskTx(ctx context.Context, tx *sql.Tx, value task.Record) error {
|
|
value = task.NormalizeRecord(value)
|
|
if _, err := tx.ExecContext(ctx, `
|
|
UPDATE tasks
|
|
SET workspace_id = ?, topic_id = ?, lane_id = ?, title = ?, body_markdown = ?, acceptance_markdown = ?,
|
|
task_kind = ?, deliverables_json = ?, batch_key = ?, status = ?, priority = ?, task_order = ?, created_by_role_name = ?, blocking_reason_markdown = ?,
|
|
result_summary_markdown = ?, assigned_run_id = ?, updated_at = ?, started_at = ?, completed_at = ?
|
|
WHERE id = ?
|
|
`,
|
|
value.WorkspaceID,
|
|
value.TopicID,
|
|
value.LaneID,
|
|
value.Title,
|
|
value.BodyMarkdown,
|
|
value.AcceptanceMarkdown,
|
|
string(value.Kind),
|
|
mustMarshalStringSlice(value.Deliverables),
|
|
value.BatchKey,
|
|
string(value.Status),
|
|
value.Priority,
|
|
value.TaskOrder,
|
|
value.CreatedByRoleName,
|
|
value.BlockingReasonMarkdown,
|
|
value.ResultSummaryMarkdown,
|
|
nullableString(value.AssignedRunID),
|
|
value.UpdatedAt,
|
|
nullableString(value.StartedAt),
|
|
nullableString(value.CompletedAt),
|
|
value.ID,
|
|
); err != nil {
|
|
return fmt.Errorf("update task: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func replaceTaskDependenciesTx(ctx context.Context, tx *sql.Tx, taskID string, dependencies []task.Dependency) error {
|
|
for _, dep := range dependencies {
|
|
if err := dep.Validate(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if _, err := tx.ExecContext(ctx, `DELETE FROM task_dependencies WHERE task_id = ?`, taskID); err != nil {
|
|
return fmt.Errorf("delete task dependencies: %w", err)
|
|
}
|
|
for _, dep := range dependencies {
|
|
if _, err := tx.ExecContext(ctx, `
|
|
INSERT INTO task_dependencies(task_id, depends_on_task_id)
|
|
VALUES(?, ?)
|
|
`, taskID, dep.DependsOnTaskID); err != nil {
|
|
return fmt.Errorf("insert task dependency: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func scanTask(s scanner) (task.Record, error) {
|
|
var item task.Record
|
|
var kind string
|
|
var deliverablesJSON string
|
|
var status string
|
|
var assignedRunID sql.NullString
|
|
var startedAt sql.NullString
|
|
var completedAt sql.NullString
|
|
if err := s.Scan(
|
|
&item.ID,
|
|
&item.WorkspaceID,
|
|
&item.TopicID,
|
|
&item.LaneID,
|
|
&item.Title,
|
|
&item.BodyMarkdown,
|
|
&item.AcceptanceMarkdown,
|
|
&kind,
|
|
&deliverablesJSON,
|
|
&item.BatchKey,
|
|
&status,
|
|
&item.Priority,
|
|
&item.TaskOrder,
|
|
&item.CreatedByRoleName,
|
|
&item.BlockingReasonMarkdown,
|
|
&item.ResultSummaryMarkdown,
|
|
&assignedRunID,
|
|
&item.CreatedAt,
|
|
&item.UpdatedAt,
|
|
&startedAt,
|
|
&completedAt,
|
|
); err != nil {
|
|
return task.Record{}, err
|
|
}
|
|
item.Kind = task.Kind(kind)
|
|
item.Deliverables = mustUnmarshalStringSlice(deliverablesJSON)
|
|
item.Status = task.Status(status)
|
|
if assignedRunID.Valid {
|
|
item.AssignedRunID = assignedRunID.String
|
|
}
|
|
if startedAt.Valid {
|
|
item.StartedAt = startedAt.String
|
|
}
|
|
if completedAt.Valid {
|
|
item.CompletedAt = completedAt.String
|
|
}
|
|
return item, nil
|
|
}
|
|
|
|
func mustMarshalStringSlice(items []string) string {
|
|
if len(items) == 0 {
|
|
return "[]"
|
|
}
|
|
data, err := json.Marshal(items)
|
|
if err != nil {
|
|
return "[]"
|
|
}
|
|
return string(data)
|
|
}
|
|
|
|
func mustUnmarshalStringSlice(raw string) []string {
|
|
if raw == "" {
|
|
return []string{}
|
|
}
|
|
var items []string
|
|
if err := json.Unmarshal([]byte(raw), &items); err != nil {
|
|
return []string{}
|
|
}
|
|
return items
|
|
}
|