From c37954aa3fd65cff7f2100dca6972ce182cc0c53 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Wed, 6 May 2026 21:47:54 +0100 Subject: [PATCH] store: migrations 0021+0022 + fleet_updates CRUD --- internal/store/fleet_updates.go | 258 ++++++++++++++++++ internal/store/fleet_updates_test.go | 180 ++++++++++++ .../migrations/0021_jobs_update_kind.sql | 57 ++++ .../store/migrations/0022_fleet_updates.sql | 35 +++ internal/store/types.go | 27 ++ 5 files changed, 557 insertions(+) create mode 100644 internal/store/fleet_updates.go create mode 100644 internal/store/fleet_updates_test.go create mode 100644 internal/store/migrations/0021_jobs_update_kind.sql create mode 100644 internal/store/migrations/0022_fleet_updates.sql diff --git a/internal/store/fleet_updates.go b/internal/store/fleet_updates.go new file mode 100644 index 0000000..ae9fec2 --- /dev/null +++ b/internal/store/fleet_updates.go @@ -0,0 +1,258 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" +) + +// ErrFleetUpdateRunning is returned by CreateFleetUpdate if another +// fleet update is already in 'running' state. The HTTP layer surfaces +// this as a 409 with a structured error code. +var ErrFleetUpdateRunning = errors.New("store: fleet update already running") + +// CreateFleetUpdate inserts the parent row and one pending child per +// hostID, in the order given (position = index). Returns +// ErrFleetUpdateRunning if a fleet update is already in flight. +func (st *Store) CreateFleetUpdate(ctx context.Context, fu FleetUpdate, hostIDs []string) error { + if fu.ID == "" || fu.StartedByUserID == "" || fu.TargetVersion == "" { + return errors.New("store: fleet update id, user_id, target_version required") + } + if fu.Status == "" { + fu.Status = "running" + } + if fu.StartedAt.IsZero() { + fu.StartedAt = time.Now().UTC() + } + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin: %w", err) + } + defer func() { _ = tx.Rollback() }() + + var existing string + if err := tx.QueryRowContext(ctx, + `SELECT id FROM fleet_updates WHERE status = 'running' LIMIT 1`). + Scan(&existing); err == nil { + return fmt.Errorf("%w: %s", ErrFleetUpdateRunning, existing) + } else if !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("store: check active fleet update: %w", err) + } + + if _, err := tx.ExecContext(ctx, + `INSERT INTO fleet_updates (id, started_at, started_by_user_id, target_version, status) + VALUES (?, ?, ?, ?, ?)`, + fu.ID, fu.StartedAt.UTC().Format(time.RFC3339Nano), fu.StartedByUserID, fu.TargetVersion, fu.Status, + ); err != nil { + return fmt.Errorf("store: insert fleet_updates: %w", err) + } + for i, hid := range hostIDs { + if _, err := tx.ExecContext(ctx, + `INSERT INTO fleet_update_hosts (fleet_update_id, host_id, position, status) + VALUES (?, ?, ?, 'pending')`, + fu.ID, hid, i, + ); err != nil { + return fmt.Errorf("store: insert fleet_update_hosts: %w", err) + } + } + return tx.Commit() +} + +// ActiveFleetUpdate returns the currently-running fleet update or nil. +func (st *Store) ActiveFleetUpdate(ctx context.Context) (*FleetUpdate, error) { + var fu FleetUpdate + var startedAt string + var current sql.NullString + var halted sql.NullString + var completedAt sql.NullString + err := st.db.QueryRowContext(ctx, + `SELECT id, started_at, started_by_user_id, target_version, status, + current_host_id, halted_reason, completed_at + FROM fleet_updates WHERE status = 'running' LIMIT 1`). + Scan(&fu.ID, &startedAt, &fu.StartedByUserID, &fu.TargetVersion, &fu.Status, + ¤t, &halted, &completedAt) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("store: active fleet update: %w", err) + } + fu.StartedAt, _ = time.Parse(time.RFC3339Nano, startedAt) + fu.CurrentHostID = current.String + fu.HaltedReason = halted.String + if completedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, completedAt.String) + fu.CompletedAt = &t + } + return &fu, nil +} + +// GetFleetUpdate hydrates parent + ordered child rows. Returns +// ErrNotFound on missing id. +func (st *Store) GetFleetUpdate(ctx context.Context, id string) (*FleetUpdate, []FleetUpdateHost, error) { + var fu FleetUpdate + var startedAt string + var current sql.NullString + var halted sql.NullString + var completedAt sql.NullString + err := st.db.QueryRowContext(ctx, + `SELECT id, started_at, started_by_user_id, target_version, status, + current_host_id, halted_reason, completed_at + FROM fleet_updates WHERE id = ?`, id). + Scan(&fu.ID, &startedAt, &fu.StartedByUserID, &fu.TargetVersion, &fu.Status, + ¤t, &halted, &completedAt) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil, ErrNotFound + } + if err != nil { + return nil, nil, fmt.Errorf("store: get fleet update: %w", err) + } + fu.StartedAt, _ = time.Parse(time.RFC3339Nano, startedAt) + fu.CurrentHostID = current.String + fu.HaltedReason = halted.String + if completedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, completedAt.String) + fu.CompletedAt = &t + } + + rows, err := st.db.QueryContext(ctx, + `SELECT host_id, position, status, COALESCE(job_id, ''), COALESCE(failed_reason, '') + FROM fleet_update_hosts + WHERE fleet_update_id = ? + ORDER BY position`, id) + if err != nil { + return nil, nil, fmt.Errorf("store: list fleet hosts: %w", err) + } + defer func() { _ = rows.Close() }() + out := []FleetUpdateHost{} + for rows.Next() { + fh := FleetUpdateHost{FleetUpdateID: id} + if err := rows.Scan(&fh.HostID, &fh.Position, &fh.Status, &fh.JobID, &fh.FailedReason); err != nil { + return nil, nil, fmt.Errorf("store: scan fleet host: %w", err) + } + out = append(out, fh) + } + return &fu, out, rows.Err() +} + +// ListPendingFleetUpdateHosts returns rows with status='pending' for +// this fleet update, in position order. The worker calls this to +// pick the next host to dispatch. +func (st *Store) ListPendingFleetUpdateHosts(ctx context.Context, fuID string) ([]FleetUpdateHost, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT host_id, position, status, COALESCE(job_id, ''), COALESCE(failed_reason, '') + FROM fleet_update_hosts + WHERE fleet_update_id = ? AND status = 'pending' + ORDER BY position`, fuID) + if err != nil { + return nil, fmt.Errorf("store: list pending fleet hosts: %w", err) + } + defer func() { _ = rows.Close() }() + out := []FleetUpdateHost{} + for rows.Next() { + fh := FleetUpdateHost{FleetUpdateID: fuID} + if err := rows.Scan(&fh.HostID, &fh.Position, &fh.Status, &fh.JobID, &fh.FailedReason); err != nil { + return nil, err + } + out = append(out, fh) + } + return out, rows.Err() +} + +// SetFleetUpdateHostStatus moves one row through pending → running → +// {succeeded, failed, skipped}. failedReason and jobID may be empty +// (e.g. on succeeded). Empty values are stored as NULL so subsequent +// reads round-trip cleanly via COALESCE. +func (st *Store) SetFleetUpdateHostStatus(ctx context.Context, fuID, hostID, status, failedReason, jobID string) error { + _, err := st.db.ExecContext(ctx, + `UPDATE fleet_update_hosts + SET status = ?, failed_reason = ?, job_id = COALESCE(?, job_id) + WHERE fleet_update_id = ? AND host_id = ?`, + status, nullableString(failedReason), nullableString(jobID), + fuID, hostID, + ) + if err != nil { + return fmt.Errorf("store: set fleet host status: %w", err) + } + return nil +} + +// SetFleetUpdateCurrentHost stamps which host the worker is actively +// waiting on. Pass empty string to clear. +func (st *Store) SetFleetUpdateCurrentHost(ctx context.Context, fuID, hostID string) error { + _, err := st.db.ExecContext(ctx, + `UPDATE fleet_updates SET current_host_id = ? WHERE id = ?`, + nullableString(hostID), fuID, + ) + if err != nil { + return fmt.Errorf("store: set fleet current host: %w", err) + } + return nil +} + +// HaltFleetUpdate flips status to 'halted', stamps the reason, and +// clears current_host_id. +func (st *Store) HaltFleetUpdate(ctx context.Context, fuID, reason string, when time.Time) error { + _, err := st.db.ExecContext(ctx, + `UPDATE fleet_updates + SET status = 'halted', halted_reason = ?, current_host_id = NULL, + completed_at = ? + WHERE id = ? AND status = 'running'`, + reason, when.UTC().Format(time.RFC3339Nano), fuID, + ) + if err != nil { + return fmt.Errorf("store: halt fleet update: %w", err) + } + return nil +} + +// CancelFleetUpdate flips status to 'cancelled'. Caller checks that +// the row is still 'running' before calling. +func (st *Store) CancelFleetUpdate(ctx context.Context, fuID string, when time.Time) error { + _, err := st.db.ExecContext(ctx, + `UPDATE fleet_updates + SET status = 'cancelled', current_host_id = NULL, completed_at = ? + WHERE id = ? AND status = 'running'`, + when.UTC().Format(time.RFC3339Nano), fuID, + ) + if err != nil { + return fmt.Errorf("store: cancel fleet update: %w", err) + } + return nil +} + +// CompleteFleetUpdate flips status to 'completed' once every host has +// reached a terminal state. +func (st *Store) CompleteFleetUpdate(ctx context.Context, fuID string, when time.Time) error { + _, err := st.db.ExecContext(ctx, + `UPDATE fleet_updates + SET status = 'completed', current_host_id = NULL, completed_at = ? + WHERE id = ? AND status = 'running'`, + when.UTC().Format(time.RFC3339Nano), fuID, + ) + if err != nil { + return fmt.Errorf("store: complete fleet update: %w", err) + } + return nil +} + +// RunningUpdateJobForHost returns the id of any in-flight (queued or +// running) `update` job for hostID, or "" + nil if none. Used by the +// host-update HTTP handler to refuse double-dispatch and by the +// fleet worker to dedupe on retry. +func (st *Store) RunningUpdateJobForHost(ctx context.Context, hostID string) (string, error) { + var id string + err := st.db.QueryRowContext(ctx, + `SELECT id FROM jobs + WHERE host_id = ? AND kind = 'update' AND status IN ('queued','running') + ORDER BY created_at DESC LIMIT 1`, hostID).Scan(&id) + if errors.Is(err, sql.ErrNoRows) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("store: running update job: %w", err) + } + return id, nil +} diff --git a/internal/store/fleet_updates_test.go b/internal/store/fleet_updates_test.go new file mode 100644 index 0000000..9942411 --- /dev/null +++ b/internal/store/fleet_updates_test.go @@ -0,0 +1,180 @@ +package store + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/oklog/ulid/v2" +) + +func ptrStr(s string) *string { return &s } + +func seedFleetUser(t *testing.T, s *Store) string { + t.Helper() + id := ulid.Make().String() + if err := s.CreateUser(context.Background(), User{ + ID: id, Username: "u-" + id[:6], PasswordHash: "x", Role: RoleAdmin, + }); err != nil { + t.Fatalf("create user: %v", err) + } + return id +} + +func seedFleetHost(t *testing.T, s *Store, name string) string { + t.Helper() + id := ulid.Make().String() + if err := s.CreateHost(context.Background(), Host{ + ID: id, Name: name, OS: "linux", Arch: "amd64", + EnrolledAt: time.Now().UTC(), + }, "tokenhash-"+id[:6], ""); err != nil { + t.Fatalf("create host: %v", err) + } + return id +} + +func TestCreateFleetUpdate_RefusesIfRunning(t *testing.T) { + t.Parallel() + s := openTestStore(t) + uid := seedFleetUser(t, s) + h1 := seedFleetHost(t, s, "h1") + + fu1 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"} + if err := s.CreateFleetUpdate(context.Background(), fu1, []string{h1}); err != nil { + t.Fatalf("create #1: %v", err) + } + fu2 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v2"} + err := s.CreateFleetUpdate(context.Background(), fu2, []string{h1}) + if !errors.Is(err, ErrFleetUpdateRunning) { + t.Fatalf("want ErrFleetUpdateRunning, got %v", err) + } +} + +func TestCreateFleetUpdate_HydrateRoundTrip(t *testing.T) { + t.Parallel() + s := openTestStore(t) + uid := seedFleetUser(t, s) + h1 := seedFleetHost(t, s, "h1") + h2 := seedFleetHost(t, s, "h2") + + fu := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1.2.3"} + if err := s.CreateFleetUpdate(context.Background(), fu, []string{h1, h2}); err != nil { + t.Fatal(err) + } + + got, hosts, err := s.GetFleetUpdate(context.Background(), fu.ID) + if err != nil { + t.Fatal(err) + } + if got.Status != "running" || got.TargetVersion != "v1.2.3" { + t.Fatalf("parent: %+v", got) + } + if len(hosts) != 2 || hosts[0].Position != 0 || hosts[1].Position != 1 { + t.Fatalf("hosts: %+v", hosts) + } + if hosts[0].Status != "pending" || hosts[1].Status != "pending" { + t.Fatalf("hosts status: %+v", hosts) + } +} + +func TestSetFleetUpdateHostStatus_ProgressesAndStoresJobID(t *testing.T) { + t.Parallel() + s := openTestStore(t) + uid := seedFleetUser(t, s) + h := seedFleetHost(t, s, "h1") + fu := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"} + _ = s.CreateFleetUpdate(context.Background(), fu, []string{h}) + + jobID := ulid.Make().String() + if err := s.CreateJob(context.Background(), Job{ + ID: jobID, HostID: h, Kind: "update", + ActorKind: "user", ActorID: ptrStr(uid), CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatal(err) + } + + if err := s.SetFleetUpdateHostStatus(context.Background(), fu.ID, h, "running", "", ""); err != nil { + t.Fatal(err) + } + if err := s.SetFleetUpdateHostStatus(context.Background(), fu.ID, h, "succeeded", "", jobID); err != nil { + t.Fatal(err) + } + _, hs, _ := s.GetFleetUpdate(context.Background(), fu.ID) + if hs[0].Status != "succeeded" || hs[0].JobID != jobID { + t.Fatalf("after succeed: %+v", hs[0]) + } + + pending, _ := s.ListPendingFleetUpdateHosts(context.Background(), fu.ID) + if len(pending) != 0 { + t.Fatalf("pending should be empty: %+v", pending) + } +} + +func TestHaltAndCompleteFleetUpdate(t *testing.T) { + t.Parallel() + s := openTestStore(t) + uid := seedFleetUser(t, s) + h := seedFleetHost(t, s, "h1") + + fu1 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"} + _ = s.CreateFleetUpdate(context.Background(), fu1, []string{h}) + if err := s.HaltFleetUpdate(context.Background(), fu1.ID, "boom", time.Now().UTC()); err != nil { + t.Fatal(err) + } + got, _, _ := s.GetFleetUpdate(context.Background(), fu1.ID) + if got.Status != "halted" || got.HaltedReason != "boom" { + t.Fatalf("after halt: %+v", got) + } + if got.CompletedAt == nil { + t.Fatal("halted must stamp completed_at") + } + if active, _ := s.ActiveFleetUpdate(context.Background()); active != nil { + t.Fatalf("halted should clear active: %+v", active) + } + + // Now a fresh run can start. + fu2 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v2"} + if err := s.CreateFleetUpdate(context.Background(), fu2, []string{h}); err != nil { + t.Fatalf("create after halt: %v", err) + } + if err := s.CompleteFleetUpdate(context.Background(), fu2.ID, time.Now().UTC()); err != nil { + t.Fatal(err) + } + got, _, _ = s.GetFleetUpdate(context.Background(), fu2.ID) + if got.Status != "completed" { + t.Fatalf("after complete: %+v", got) + } +} + +func TestRunningUpdateJobForHost(t *testing.T) { + t.Parallel() + s := openTestStore(t) + h := seedFleetHost(t, s, "h1") + + got, err := s.RunningUpdateJobForHost(context.Background(), h) + if err != nil || got != "" { + t.Fatalf("empty case: got=%q err=%v", got, err) + } + + jobID := ulid.Make().String() + if err := s.CreateJob(context.Background(), Job{ + ID: jobID, HostID: h, Kind: "update", + ActorKind: "user", ActorID: ptrStr("u-1"), CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatal(err) + } + got, err = s.RunningUpdateJobForHost(context.Background(), h) + if err != nil || got != jobID { + t.Fatalf("queued case: got=%q err=%v", got, err) + } + + // Mark succeeded → no longer "in flight". + if err := s.MarkJobFinished(context.Background(), jobID, "succeeded", 0, nil, "", time.Now().UTC()); err != nil { + t.Fatal(err) + } + got, err = s.RunningUpdateJobForHost(context.Background(), h) + if err != nil || got != "" { + t.Fatalf("after succeed: got=%q err=%v", got, err) + } +} diff --git a/internal/store/migrations/0021_jobs_update_kind.sql b/internal/store/migrations/0021_jobs_update_kind.sql new file mode 100644 index 0000000..241bdc0 --- /dev/null +++ b/internal/store/migrations/0021_jobs_update_kind.sql @@ -0,0 +1,57 @@ +-- 0021_jobs_update_kind.sql +-- +-- Add 'update' to the jobs.kind CHECK constraint so the agent +-- self-update flow (P6-01) can persist its job rows. SQLite can't +-- ALTER a CHECK in place, so we rebuild the table. +-- +-- Same safe rebuild pattern as 0012: +-- 1. Stash job_logs into a temp table BEFORE rebuilding jobs. +-- 2. Create jobs_new with the wider CHECK; copy data; DROP jobs; +-- RENAME jobs_new TO jobs. +-- 3. Restore job_logs (cascade-trap defence — see CLAUDE.md). +-- +-- jobs_new mirrors the live schema *including* post-0012 column +-- additions (0015 added source_group_id). When adding a new +-- migration that touches this table, mirror the latest column set. + +CREATE TEMPORARY TABLE _job_logs_backup AS + SELECT job_id, seq, ts, stream, payload FROM job_logs; + +CREATE TABLE jobs_new ( + id TEXT PRIMARY KEY, + host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE, + kind TEXT NOT NULL CHECK (kind IN + ('backup','init','forget','prune','check','unlock','restore','diff','update')), + status TEXT NOT NULL CHECK (status IN ('queued','running','succeeded','failed','cancelled')), + scheduled_id TEXT REFERENCES schedules(id) ON DELETE SET NULL, + actor_kind TEXT NOT NULL CHECK (actor_kind IN ('user','schedule','system')), + actor_id TEXT, + started_at TEXT, + finished_at TEXT, + exit_code INTEGER, + stats TEXT, + error TEXT, + created_at TEXT NOT NULL, + source_group_id TEXT REFERENCES source_groups(id) ON DELETE SET NULL +); + +INSERT INTO jobs_new + SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id, + started_at, finished_at, exit_code, stats, error, created_at, + source_group_id + FROM jobs; + +DROP TABLE jobs; +ALTER TABLE jobs_new RENAME TO jobs; + +CREATE INDEX jobs_host_id ON jobs(host_id); +CREATE INDEX jobs_status ON jobs(status); +CREATE INDEX jobs_created_at ON jobs(created_at); +CREATE INDEX jobs_source_group_id ON jobs(source_group_id); + +-- Defensive: restore job_logs from the temp backup. INSERT OR IGNORE +-- so a re-run is harmless. Same shape as 0012's safety net. +INSERT OR IGNORE INTO job_logs (job_id, seq, ts, stream, payload) + SELECT job_id, seq, ts, stream, payload FROM _job_logs_backup; + +DROP TABLE _job_logs_backup; diff --git a/internal/store/migrations/0022_fleet_updates.sql b/internal/store/migrations/0022_fleet_updates.sql new file mode 100644 index 0000000..a57242f --- /dev/null +++ b/internal/store/migrations/0022_fleet_updates.sql @@ -0,0 +1,35 @@ +-- 0022_fleet_updates.sql +-- +-- Tables backing the rolling fleet-update worker (P6-02). One row in +-- fleet_updates per "update all" invocation, a child row per host so +-- the worker can iterate in position order, report progress, and +-- record per-host outcome. Halt-on-fail semantics live in the worker +-- (internal/server/fleetupdate); this schema just captures state. + +CREATE TABLE fleet_updates ( + id TEXT PRIMARY KEY, + started_at TEXT NOT NULL, + started_by_user_id TEXT NOT NULL REFERENCES users(id), + target_version TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN + ('running','completed','halted','cancelled')), + current_host_id TEXT REFERENCES hosts(id), + halted_reason TEXT, + completed_at TEXT +); + +CREATE INDEX fleet_updates_status ON fleet_updates(status); + +CREATE TABLE fleet_update_hosts ( + fleet_update_id TEXT NOT NULL REFERENCES fleet_updates(id) ON DELETE CASCADE, + host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE, + position INTEGER NOT NULL, + status TEXT NOT NULL CHECK (status IN + ('pending','running','succeeded','failed','skipped')), + job_id TEXT REFERENCES jobs(id) ON DELETE SET NULL, + failed_reason TEXT, + PRIMARY KEY (fleet_update_id, host_id) +); + +CREATE INDEX fleet_update_hosts_position + ON fleet_update_hosts(fleet_update_id, position); diff --git a/internal/store/types.go b/internal/store/types.go index 5b16294..cc60e48 100644 --- a/internal/store/types.go +++ b/internal/store/types.go @@ -211,6 +211,33 @@ type PendingRun struct { LastError string } +// FleetUpdate captures one "update all" invocation. Status moves +// running → one of {completed, halted, cancelled}. CurrentHostID +// tracks the host the worker is actively waiting on; cleared (empty) +// outside an active dispatch. +type FleetUpdate struct { + ID string + StartedAt time.Time + StartedByUserID string + TargetVersion string + Status string + CurrentHostID string + HaltedReason string + CompletedAt *time.Time +} + +// FleetUpdateHost is one host's slot in a fleet update. Position is +// the iteration order. JobID is set once the worker has dispatched +// command.update for this host; FailedReason on a failed/halted row. +type FleetUpdateHost struct { + FleetUpdateID string + HostID string + Position int + Status string + JobID string + FailedReason string +} + // EnrollmentToken is the issuer's view of a one-time token. type EnrollmentToken struct { Raw string