Files

414 lines
12 KiB
Go

package sqlite
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"inbox/internal/domain/task"
)
func (s *Store) CreateTask(ctx context.Context, value task.Record, dependencies []task.Dependency) (task.Record, error) {
value = task.NormalizeRecord(value)
if err := value.Validate(); err != nil {
return task.Record{}, err
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return task.Record{}, fmt.Errorf("begin create task: %w", err)
}
defer tx.Rollback()
if value.ID == "" {
id, err := s.newID("task")
if err != nil {
return task.Record{}, err
}
value.ID = id
}
now := s.now()
value.CreatedAt = coalesceString(value.CreatedAt, now)
value.UpdatedAt = now
for idx := range dependencies {
dependencies[idx].TaskID = value.ID
if err := dependencies[idx].Validate(); err != nil {
return task.Record{}, err
}
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO tasks(
id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
created_at, updated_at, started_at, completed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
value.ID,
value.WorkspaceID,
value.TopicID,
value.LaneID,
value.Title,
value.BodyMarkdown,
value.AcceptanceMarkdown,
string(value.Kind),
mustMarshalStringSlice(value.Deliverables),
value.BatchKey,
string(value.Status),
value.Priority,
value.TaskOrder,
value.CreatedByRoleName,
value.BlockingReasonMarkdown,
value.ResultSummaryMarkdown,
nullableString(value.AssignedRunID),
value.CreatedAt,
value.UpdatedAt,
nullableString(value.StartedAt),
nullableString(value.CompletedAt),
); err != nil {
return task.Record{}, fmt.Errorf("insert task: %w", err)
}
for _, dep := range dependencies {
if _, err := tx.ExecContext(ctx, `
INSERT INTO task_dependencies(task_id, depends_on_task_id)
VALUES(?, ?)
`, value.ID, dep.DependsOnTaskID); err != nil {
return task.Record{}, fmt.Errorf("insert task dependency: %w", err)
}
}
if err := tx.Commit(); err != nil {
return task.Record{}, fmt.Errorf("commit create task: %w", err)
}
return value, nil
}
func (s *Store) GetTask(ctx context.Context, taskID string) (task.Record, error) {
row := s.db.QueryRowContext(ctx, `
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
created_at, updated_at, started_at, completed_at
FROM tasks
WHERE id = ?
`, taskID)
return scanTask(row)
}
func (s *Store) ListTasksByTopic(ctx context.Context, topicID string) ([]task.Record, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
created_at, updated_at, started_at, completed_at
FROM tasks
WHERE topic_id = ?
ORDER BY priority DESC, task_order, created_at, id
`, topicID)
if err != nil {
return nil, fmt.Errorf("list tasks by topic: %w", err)
}
defer rows.Close()
var out []task.Record
for rows.Next() {
item, err := scanTask(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate tasks by topic: %w", err)
}
return out, nil
}
func (s *Store) ListTasksByLane(ctx context.Context, laneID string) ([]task.Record, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
created_at, updated_at, started_at, completed_at
FROM tasks
WHERE lane_id = ?
ORDER BY task_order, priority DESC, created_at, id
`, laneID)
if err != nil {
return nil, fmt.Errorf("list tasks by lane: %w", err)
}
defer rows.Close()
var out []task.Record
for rows.Next() {
item, err := scanTask(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate tasks by lane: %w", err)
}
return out, nil
}
func (s *Store) UpdateTask(ctx context.Context, value task.Record) (task.Record, error) {
return s.UpdateTaskWithDependencies(ctx, value, nil)
}
func (s *Store) UpdateTaskWithDependencies(ctx context.Context, value task.Record, dependencies *[]task.Dependency) (task.Record, error) {
if value.ID == "" {
return task.Record{}, fmt.Errorf("task id is required")
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return task.Record{}, fmt.Errorf("begin update task: %w", err)
}
defer tx.Rollback()
before, err := getTaskTx(ctx, tx, value.ID)
if err != nil {
return task.Record{}, err
}
value.CreatedAt = before.CreatedAt
value.UpdatedAt = s.now()
value = task.NormalizeRecord(value)
if err := value.Validate(); err != nil {
return task.Record{}, err
}
if err := updateTaskTx(ctx, tx, value); err != nil {
return task.Record{}, err
}
if dependencies != nil {
if err := replaceTaskDependenciesTx(ctx, tx, value.ID, *dependencies); err != nil {
return task.Record{}, err
}
}
if err := tx.Commit(); err != nil {
return task.Record{}, fmt.Errorf("commit update task: %w", err)
}
return s.GetTask(ctx, value.ID)
}
func (s *Store) ReplaceTaskDependencies(ctx context.Context, taskID string, dependencies []task.Dependency) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin replace task dependencies: %w", err)
}
defer tx.Rollback()
if err := replaceTaskDependenciesTx(ctx, tx, taskID, dependencies); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit replace task dependencies: %w", err)
}
return nil
}
func (s *Store) ListTaskDependencies(ctx context.Context, taskID string) ([]task.Dependency, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT task_id, depends_on_task_id
FROM task_dependencies
WHERE task_id = ?
ORDER BY depends_on_task_id
`, taskID)
if err != nil {
return nil, fmt.Errorf("list task dependencies: %w", err)
}
defer rows.Close()
var out []task.Dependency
for rows.Next() {
var item task.Dependency
if err := rows.Scan(&item.TaskID, &item.DependsOnTaskID); err != nil {
return nil, err
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate task dependencies: %w", err)
}
return out, nil
}
func (s *Store) AppendTaskEvent(ctx context.Context, value task.Event) (task.Event, error) {
if err := value.Validate(); err != nil {
return task.Event{}, err
}
if value.ID == "" {
id, err := s.newID("task-event")
if err != nil {
return task.Event{}, err
}
value.ID = id
}
value.CreatedAt = coalesceString(value.CreatedAt, s.now())
if _, err := s.db.ExecContext(ctx, `
INSERT INTO task_events(id, task_id, event_type, body_markdown, created_by_role_name, created_at)
VALUES(?, ?, ?, ?, ?, ?)
`, value.ID, value.TaskID, value.EventType, value.BodyMarkdown, value.CreatedByRoleName, value.CreatedAt); err != nil {
return task.Event{}, fmt.Errorf("append task event: %w", err)
}
return value, nil
}
func (s *Store) ListTaskEvents(ctx context.Context, taskID string) ([]task.Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, task_id, event_type, body_markdown, created_by_role_name, created_at
FROM task_events
WHERE task_id = ?
ORDER BY created_at, id
`, taskID)
if err != nil {
return nil, fmt.Errorf("list task events: %w", err)
}
defer rows.Close()
var out []task.Event
for rows.Next() {
var item task.Event
if err := rows.Scan(&item.ID, &item.TaskID, &item.EventType, &item.BodyMarkdown, &item.CreatedByRoleName, &item.CreatedAt); err != nil {
return nil, err
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate task events: %w", err)
}
return out, nil
}
func getTaskTx(ctx context.Context, tx *sql.Tx, taskID string) (task.Record, error) {
row := tx.QueryRowContext(ctx, `
SELECT id, workspace_id, topic_id, lane_id, title, body_markdown, acceptance_markdown, task_kind, deliverables_json, batch_key, status, priority,
task_order, created_by_role_name, blocking_reason_markdown, result_summary_markdown, assigned_run_id,
created_at, updated_at, started_at, completed_at
FROM tasks
WHERE id = ?
`, taskID)
return scanTask(row)
}
func updateTaskTx(ctx context.Context, tx *sql.Tx, value task.Record) error {
value = task.NormalizeRecord(value)
if _, err := tx.ExecContext(ctx, `
UPDATE tasks
SET workspace_id = ?, topic_id = ?, lane_id = ?, title = ?, body_markdown = ?, acceptance_markdown = ?,
task_kind = ?, deliverables_json = ?, batch_key = ?, status = ?, priority = ?, task_order = ?, created_by_role_name = ?, blocking_reason_markdown = ?,
result_summary_markdown = ?, assigned_run_id = ?, updated_at = ?, started_at = ?, completed_at = ?
WHERE id = ?
`,
value.WorkspaceID,
value.TopicID,
value.LaneID,
value.Title,
value.BodyMarkdown,
value.AcceptanceMarkdown,
string(value.Kind),
mustMarshalStringSlice(value.Deliverables),
value.BatchKey,
string(value.Status),
value.Priority,
value.TaskOrder,
value.CreatedByRoleName,
value.BlockingReasonMarkdown,
value.ResultSummaryMarkdown,
nullableString(value.AssignedRunID),
value.UpdatedAt,
nullableString(value.StartedAt),
nullableString(value.CompletedAt),
value.ID,
); err != nil {
return fmt.Errorf("update task: %w", err)
}
return nil
}
func replaceTaskDependenciesTx(ctx context.Context, tx *sql.Tx, taskID string, dependencies []task.Dependency) error {
for _, dep := range dependencies {
if err := dep.Validate(); err != nil {
return err
}
}
if _, err := tx.ExecContext(ctx, `DELETE FROM task_dependencies WHERE task_id = ?`, taskID); err != nil {
return fmt.Errorf("delete task dependencies: %w", err)
}
for _, dep := range dependencies {
if _, err := tx.ExecContext(ctx, `
INSERT INTO task_dependencies(task_id, depends_on_task_id)
VALUES(?, ?)
`, taskID, dep.DependsOnTaskID); err != nil {
return fmt.Errorf("insert task dependency: %w", err)
}
}
return nil
}
func scanTask(s scanner) (task.Record, error) {
var item task.Record
var kind string
var deliverablesJSON string
var status string
var assignedRunID sql.NullString
var startedAt sql.NullString
var completedAt sql.NullString
if err := s.Scan(
&item.ID,
&item.WorkspaceID,
&item.TopicID,
&item.LaneID,
&item.Title,
&item.BodyMarkdown,
&item.AcceptanceMarkdown,
&kind,
&deliverablesJSON,
&item.BatchKey,
&status,
&item.Priority,
&item.TaskOrder,
&item.CreatedByRoleName,
&item.BlockingReasonMarkdown,
&item.ResultSummaryMarkdown,
&assignedRunID,
&item.CreatedAt,
&item.UpdatedAt,
&startedAt,
&completedAt,
); err != nil {
return task.Record{}, err
}
item.Kind = task.Kind(kind)
item.Deliverables = mustUnmarshalStringSlice(deliverablesJSON)
item.Status = task.Status(status)
if assignedRunID.Valid {
item.AssignedRunID = assignedRunID.String
}
if startedAt.Valid {
item.StartedAt = startedAt.String
}
if completedAt.Valid {
item.CompletedAt = completedAt.String
}
return item, nil
}
func mustMarshalStringSlice(items []string) string {
if len(items) == 0 {
return "[]"
}
data, err := json.Marshal(items)
if err != nil {
return "[]"
}
return string(data)
}
func mustUnmarshalStringSlice(raw string) []string {
if raw == "" {
return []string{}
}
var items []string
if err := json.Unmarshal([]byte(raw), &items); err != nil {
return []string{}
}
return items
}