store: migrations 0021+0022 + fleet_updates CRUD

This commit is contained in:
2026-05-06 21:47:54 +01:00
parent 74cf24c28b
commit d413896302
5 changed files with 557 additions and 0 deletions
+258
View File
@@ -0,0 +1,258 @@
package store
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
)
// ErrFleetUpdateRunning is returned by CreateFleetUpdate if another
// fleet update is already in 'running' state. The HTTP layer surfaces
// this as a 409 with a structured error code.
var ErrFleetUpdateRunning = errors.New("store: fleet update already running")
// CreateFleetUpdate inserts the parent row and one pending child per
// hostID, in the order given (position = index). Returns
// ErrFleetUpdateRunning if a fleet update is already in flight.
func (st *Store) CreateFleetUpdate(ctx context.Context, fu FleetUpdate, hostIDs []string) error {
if fu.ID == "" || fu.StartedByUserID == "" || fu.TargetVersion == "" {
return errors.New("store: fleet update id, user_id, target_version required")
}
if fu.Status == "" {
fu.Status = "running"
}
if fu.StartedAt.IsZero() {
fu.StartedAt = time.Now().UTC()
}
tx, err := st.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("store: begin: %w", err)
}
defer func() { _ = tx.Rollback() }()
var existing string
if err := tx.QueryRowContext(ctx,
`SELECT id FROM fleet_updates WHERE status = 'running' LIMIT 1`).
Scan(&existing); err == nil {
return fmt.Errorf("%w: %s", ErrFleetUpdateRunning, existing)
} else if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("store: check active fleet update: %w", err)
}
if _, err := tx.ExecContext(ctx,
`INSERT INTO fleet_updates (id, started_at, started_by_user_id, target_version, status)
VALUES (?, ?, ?, ?, ?)`,
fu.ID, fu.StartedAt.UTC().Format(time.RFC3339Nano), fu.StartedByUserID, fu.TargetVersion, fu.Status,
); err != nil {
return fmt.Errorf("store: insert fleet_updates: %w", err)
}
for i, hid := range hostIDs {
if _, err := tx.ExecContext(ctx,
`INSERT INTO fleet_update_hosts (fleet_update_id, host_id, position, status)
VALUES (?, ?, ?, 'pending')`,
fu.ID, hid, i,
); err != nil {
return fmt.Errorf("store: insert fleet_update_hosts: %w", err)
}
}
return tx.Commit()
}
// ActiveFleetUpdate returns the currently-running fleet update or nil.
func (st *Store) ActiveFleetUpdate(ctx context.Context) (*FleetUpdate, error) {
var fu FleetUpdate
var startedAt string
var current sql.NullString
var halted sql.NullString
var completedAt sql.NullString
err := st.db.QueryRowContext(ctx,
`SELECT id, started_at, started_by_user_id, target_version, status,
current_host_id, halted_reason, completed_at
FROM fleet_updates WHERE status = 'running' LIMIT 1`).
Scan(&fu.ID, &startedAt, &fu.StartedByUserID, &fu.TargetVersion, &fu.Status,
&current, &halted, &completedAt)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("store: active fleet update: %w", err)
}
fu.StartedAt, _ = time.Parse(time.RFC3339Nano, startedAt)
fu.CurrentHostID = current.String
fu.HaltedReason = halted.String
if completedAt.Valid {
t, _ := time.Parse(time.RFC3339Nano, completedAt.String)
fu.CompletedAt = &t
}
return &fu, nil
}
// GetFleetUpdate hydrates parent + ordered child rows. Returns
// ErrNotFound on missing id.
func (st *Store) GetFleetUpdate(ctx context.Context, id string) (*FleetUpdate, []FleetUpdateHost, error) {
var fu FleetUpdate
var startedAt string
var current sql.NullString
var halted sql.NullString
var completedAt sql.NullString
err := st.db.QueryRowContext(ctx,
`SELECT id, started_at, started_by_user_id, target_version, status,
current_host_id, halted_reason, completed_at
FROM fleet_updates WHERE id = ?`, id).
Scan(&fu.ID, &startedAt, &fu.StartedByUserID, &fu.TargetVersion, &fu.Status,
&current, &halted, &completedAt)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil, ErrNotFound
}
if err != nil {
return nil, nil, fmt.Errorf("store: get fleet update: %w", err)
}
fu.StartedAt, _ = time.Parse(time.RFC3339Nano, startedAt)
fu.CurrentHostID = current.String
fu.HaltedReason = halted.String
if completedAt.Valid {
t, _ := time.Parse(time.RFC3339Nano, completedAt.String)
fu.CompletedAt = &t
}
rows, err := st.db.QueryContext(ctx,
`SELECT host_id, position, status, COALESCE(job_id, ''), COALESCE(failed_reason, '')
FROM fleet_update_hosts
WHERE fleet_update_id = ?
ORDER BY position`, id)
if err != nil {
return nil, nil, fmt.Errorf("store: list fleet hosts: %w", err)
}
defer func() { _ = rows.Close() }()
out := []FleetUpdateHost{}
for rows.Next() {
fh := FleetUpdateHost{FleetUpdateID: id}
if err := rows.Scan(&fh.HostID, &fh.Position, &fh.Status, &fh.JobID, &fh.FailedReason); err != nil {
return nil, nil, fmt.Errorf("store: scan fleet host: %w", err)
}
out = append(out, fh)
}
return &fu, out, rows.Err()
}
// ListPendingFleetUpdateHosts returns rows with status='pending' for
// this fleet update, in position order. The worker calls this to
// pick the next host to dispatch.
func (st *Store) ListPendingFleetUpdateHosts(ctx context.Context, fuID string) ([]FleetUpdateHost, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT host_id, position, status, COALESCE(job_id, ''), COALESCE(failed_reason, '')
FROM fleet_update_hosts
WHERE fleet_update_id = ? AND status = 'pending'
ORDER BY position`, fuID)
if err != nil {
return nil, fmt.Errorf("store: list pending fleet hosts: %w", err)
}
defer func() { _ = rows.Close() }()
out := []FleetUpdateHost{}
for rows.Next() {
fh := FleetUpdateHost{FleetUpdateID: fuID}
if err := rows.Scan(&fh.HostID, &fh.Position, &fh.Status, &fh.JobID, &fh.FailedReason); err != nil {
return nil, err
}
out = append(out, fh)
}
return out, rows.Err()
}
// SetFleetUpdateHostStatus moves one row through pending → running →
// {succeeded, failed, skipped}. failedReason and jobID may be empty
// (e.g. on succeeded). Empty values are stored as NULL so subsequent
// reads round-trip cleanly via COALESCE.
func (st *Store) SetFleetUpdateHostStatus(ctx context.Context, fuID, hostID, status, failedReason, jobID string) error {
_, err := st.db.ExecContext(ctx,
`UPDATE fleet_update_hosts
SET status = ?, failed_reason = ?, job_id = COALESCE(?, job_id)
WHERE fleet_update_id = ? AND host_id = ?`,
status, nullableString(failedReason), nullableString(jobID),
fuID, hostID,
)
if err != nil {
return fmt.Errorf("store: set fleet host status: %w", err)
}
return nil
}
// SetFleetUpdateCurrentHost stamps which host the worker is actively
// waiting on. Pass empty string to clear.
func (st *Store) SetFleetUpdateCurrentHost(ctx context.Context, fuID, hostID string) error {
_, err := st.db.ExecContext(ctx,
`UPDATE fleet_updates SET current_host_id = ? WHERE id = ?`,
nullableString(hostID), fuID,
)
if err != nil {
return fmt.Errorf("store: set fleet current host: %w", err)
}
return nil
}
// HaltFleetUpdate flips status to 'halted', stamps the reason, and
// clears current_host_id.
func (st *Store) HaltFleetUpdate(ctx context.Context, fuID, reason string, when time.Time) error {
_, err := st.db.ExecContext(ctx,
`UPDATE fleet_updates
SET status = 'halted', halted_reason = ?, current_host_id = NULL,
completed_at = ?
WHERE id = ? AND status = 'running'`,
reason, when.UTC().Format(time.RFC3339Nano), fuID,
)
if err != nil {
return fmt.Errorf("store: halt fleet update: %w", err)
}
return nil
}
// CancelFleetUpdate flips status to 'cancelled'. Caller checks that
// the row is still 'running' before calling.
func (st *Store) CancelFleetUpdate(ctx context.Context, fuID string, when time.Time) error {
_, err := st.db.ExecContext(ctx,
`UPDATE fleet_updates
SET status = 'cancelled', current_host_id = NULL, completed_at = ?
WHERE id = ? AND status = 'running'`,
when.UTC().Format(time.RFC3339Nano), fuID,
)
if err != nil {
return fmt.Errorf("store: cancel fleet update: %w", err)
}
return nil
}
// CompleteFleetUpdate flips status to 'completed' once every host has
// reached a terminal state.
func (st *Store) CompleteFleetUpdate(ctx context.Context, fuID string, when time.Time) error {
_, err := st.db.ExecContext(ctx,
`UPDATE fleet_updates
SET status = 'completed', current_host_id = NULL, completed_at = ?
WHERE id = ? AND status = 'running'`,
when.UTC().Format(time.RFC3339Nano), fuID,
)
if err != nil {
return fmt.Errorf("store: complete fleet update: %w", err)
}
return nil
}
// RunningUpdateJobForHost returns the id of any in-flight (queued or
// running) `update` job for hostID, or "" + nil if none. Used by the
// host-update HTTP handler to refuse double-dispatch and by the
// fleet worker to dedupe on retry.
func (st *Store) RunningUpdateJobForHost(ctx context.Context, hostID string) (string, error) {
var id string
err := st.db.QueryRowContext(ctx,
`SELECT id FROM jobs
WHERE host_id = ? AND kind = 'update' AND status IN ('queued','running')
ORDER BY created_at DESC LIMIT 1`, hostID).Scan(&id)
if errors.Is(err, sql.ErrNoRows) {
return "", nil
}
if err != nil {
return "", fmt.Errorf("store: running update job: %w", err)
}
return id, nil
}
+180
View File
@@ -0,0 +1,180 @@
package store
import (
"context"
"errors"
"testing"
"time"
"github.com/oklog/ulid/v2"
)
func ptrStr(s string) *string { return &s }
func seedFleetUser(t *testing.T, s *Store) string {
t.Helper()
id := ulid.Make().String()
if err := s.CreateUser(context.Background(), User{
ID: id, Username: "u-" + id[:6], PasswordHash: "x", Role: RoleAdmin,
}); err != nil {
t.Fatalf("create user: %v", err)
}
return id
}
func seedFleetHost(t *testing.T, s *Store, name string) string {
t.Helper()
id := ulid.Make().String()
if err := s.CreateHost(context.Background(), Host{
ID: id, Name: name, OS: "linux", Arch: "amd64",
EnrolledAt: time.Now().UTC(),
}, "tokenhash-"+id[:6], ""); err != nil {
t.Fatalf("create host: %v", err)
}
return id
}
func TestCreateFleetUpdate_RefusesIfRunning(t *testing.T) {
t.Parallel()
s := openTestStore(t)
uid := seedFleetUser(t, s)
h1 := seedFleetHost(t, s, "h1")
fu1 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"}
if err := s.CreateFleetUpdate(context.Background(), fu1, []string{h1}); err != nil {
t.Fatalf("create #1: %v", err)
}
fu2 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v2"}
err := s.CreateFleetUpdate(context.Background(), fu2, []string{h1})
if !errors.Is(err, ErrFleetUpdateRunning) {
t.Fatalf("want ErrFleetUpdateRunning, got %v", err)
}
}
func TestCreateFleetUpdate_HydrateRoundTrip(t *testing.T) {
t.Parallel()
s := openTestStore(t)
uid := seedFleetUser(t, s)
h1 := seedFleetHost(t, s, "h1")
h2 := seedFleetHost(t, s, "h2")
fu := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1.2.3"}
if err := s.CreateFleetUpdate(context.Background(), fu, []string{h1, h2}); err != nil {
t.Fatal(err)
}
got, hosts, err := s.GetFleetUpdate(context.Background(), fu.ID)
if err != nil {
t.Fatal(err)
}
if got.Status != "running" || got.TargetVersion != "v1.2.3" {
t.Fatalf("parent: %+v", got)
}
if len(hosts) != 2 || hosts[0].Position != 0 || hosts[1].Position != 1 {
t.Fatalf("hosts: %+v", hosts)
}
if hosts[0].Status != "pending" || hosts[1].Status != "pending" {
t.Fatalf("hosts status: %+v", hosts)
}
}
func TestSetFleetUpdateHostStatus_ProgressesAndStoresJobID(t *testing.T) {
t.Parallel()
s := openTestStore(t)
uid := seedFleetUser(t, s)
h := seedFleetHost(t, s, "h1")
fu := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"}
_ = s.CreateFleetUpdate(context.Background(), fu, []string{h})
jobID := ulid.Make().String()
if err := s.CreateJob(context.Background(), Job{
ID: jobID, HostID: h, Kind: "update",
ActorKind: "user", ActorID: ptrStr(uid), CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatal(err)
}
if err := s.SetFleetUpdateHostStatus(context.Background(), fu.ID, h, "running", "", ""); err != nil {
t.Fatal(err)
}
if err := s.SetFleetUpdateHostStatus(context.Background(), fu.ID, h, "succeeded", "", jobID); err != nil {
t.Fatal(err)
}
_, hs, _ := s.GetFleetUpdate(context.Background(), fu.ID)
if hs[0].Status != "succeeded" || hs[0].JobID != jobID {
t.Fatalf("after succeed: %+v", hs[0])
}
pending, _ := s.ListPendingFleetUpdateHosts(context.Background(), fu.ID)
if len(pending) != 0 {
t.Fatalf("pending should be empty: %+v", pending)
}
}
func TestHaltAndCompleteFleetUpdate(t *testing.T) {
t.Parallel()
s := openTestStore(t)
uid := seedFleetUser(t, s)
h := seedFleetHost(t, s, "h1")
fu1 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"}
_ = s.CreateFleetUpdate(context.Background(), fu1, []string{h})
if err := s.HaltFleetUpdate(context.Background(), fu1.ID, "boom", time.Now().UTC()); err != nil {
t.Fatal(err)
}
got, _, _ := s.GetFleetUpdate(context.Background(), fu1.ID)
if got.Status != "halted" || got.HaltedReason != "boom" {
t.Fatalf("after halt: %+v", got)
}
if got.CompletedAt == nil {
t.Fatal("halted must stamp completed_at")
}
if active, _ := s.ActiveFleetUpdate(context.Background()); active != nil {
t.Fatalf("halted should clear active: %+v", active)
}
// Now a fresh run can start.
fu2 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v2"}
if err := s.CreateFleetUpdate(context.Background(), fu2, []string{h}); err != nil {
t.Fatalf("create after halt: %v", err)
}
if err := s.CompleteFleetUpdate(context.Background(), fu2.ID, time.Now().UTC()); err != nil {
t.Fatal(err)
}
got, _, _ = s.GetFleetUpdate(context.Background(), fu2.ID)
if got.Status != "completed" {
t.Fatalf("after complete: %+v", got)
}
}
func TestRunningUpdateJobForHost(t *testing.T) {
t.Parallel()
s := openTestStore(t)
h := seedFleetHost(t, s, "h1")
got, err := s.RunningUpdateJobForHost(context.Background(), h)
if err != nil || got != "" {
t.Fatalf("empty case: got=%q err=%v", got, err)
}
jobID := ulid.Make().String()
if err := s.CreateJob(context.Background(), Job{
ID: jobID, HostID: h, Kind: "update",
ActorKind: "user", ActorID: ptrStr("u-1"), CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatal(err)
}
got, err = s.RunningUpdateJobForHost(context.Background(), h)
if err != nil || got != jobID {
t.Fatalf("queued case: got=%q err=%v", got, err)
}
// Mark succeeded → no longer "in flight".
if err := s.MarkJobFinished(context.Background(), jobID, "succeeded", 0, nil, "", time.Now().UTC()); err != nil {
t.Fatal(err)
}
got, err = s.RunningUpdateJobForHost(context.Background(), h)
if err != nil || got != "" {
t.Fatalf("after succeed: got=%q err=%v", got, err)
}
}
@@ -0,0 +1,57 @@
-- 0021_jobs_update_kind.sql
--
-- Add 'update' to the jobs.kind CHECK constraint so the agent
-- self-update flow (P6-01) can persist its job rows. SQLite can't
-- ALTER a CHECK in place, so we rebuild the table.
--
-- Same safe rebuild pattern as 0012:
-- 1. Stash job_logs into a temp table BEFORE rebuilding jobs.
-- 2. Create jobs_new with the wider CHECK; copy data; DROP jobs;
-- RENAME jobs_new TO jobs.
-- 3. Restore job_logs (cascade-trap defence — see CLAUDE.md).
--
-- jobs_new mirrors the live schema *including* post-0012 column
-- additions (0015 added source_group_id). When adding a new
-- migration that touches this table, mirror the latest column set.
CREATE TEMPORARY TABLE _job_logs_backup AS
SELECT job_id, seq, ts, stream, payload FROM job_logs;
CREATE TABLE jobs_new (
id TEXT PRIMARY KEY,
host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE,
kind TEXT NOT NULL CHECK (kind IN
('backup','init','forget','prune','check','unlock','restore','diff','update')),
status TEXT NOT NULL CHECK (status IN ('queued','running','succeeded','failed','cancelled')),
scheduled_id TEXT REFERENCES schedules(id) ON DELETE SET NULL,
actor_kind TEXT NOT NULL CHECK (actor_kind IN ('user','schedule','system')),
actor_id TEXT,
started_at TEXT,
finished_at TEXT,
exit_code INTEGER,
stats TEXT,
error TEXT,
created_at TEXT NOT NULL,
source_group_id TEXT REFERENCES source_groups(id) ON DELETE SET NULL
);
INSERT INTO jobs_new
SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
started_at, finished_at, exit_code, stats, error, created_at,
source_group_id
FROM jobs;
DROP TABLE jobs;
ALTER TABLE jobs_new RENAME TO jobs;
CREATE INDEX jobs_host_id ON jobs(host_id);
CREATE INDEX jobs_status ON jobs(status);
CREATE INDEX jobs_created_at ON jobs(created_at);
CREATE INDEX jobs_source_group_id ON jobs(source_group_id);
-- Defensive: restore job_logs from the temp backup. INSERT OR IGNORE
-- so a re-run is harmless. Same shape as 0012's safety net.
INSERT OR IGNORE INTO job_logs (job_id, seq, ts, stream, payload)
SELECT job_id, seq, ts, stream, payload FROM _job_logs_backup;
DROP TABLE _job_logs_backup;
@@ -0,0 +1,35 @@
-- 0022_fleet_updates.sql
--
-- Tables backing the rolling fleet-update worker (P6-02). One row in
-- fleet_updates per "update all" invocation, a child row per host so
-- the worker can iterate in position order, report progress, and
-- record per-host outcome. Halt-on-fail semantics live in the worker
-- (internal/server/fleetupdate); this schema just captures state.
CREATE TABLE fleet_updates (
id TEXT PRIMARY KEY,
started_at TEXT NOT NULL,
started_by_user_id TEXT NOT NULL REFERENCES users(id),
target_version TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN
('running','completed','halted','cancelled')),
current_host_id TEXT REFERENCES hosts(id),
halted_reason TEXT,
completed_at TEXT
);
CREATE INDEX fleet_updates_status ON fleet_updates(status);
CREATE TABLE fleet_update_hosts (
fleet_update_id TEXT NOT NULL REFERENCES fleet_updates(id) ON DELETE CASCADE,
host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE,
position INTEGER NOT NULL,
status TEXT NOT NULL CHECK (status IN
('pending','running','succeeded','failed','skipped')),
job_id TEXT REFERENCES jobs(id) ON DELETE SET NULL,
failed_reason TEXT,
PRIMARY KEY (fleet_update_id, host_id)
);
CREATE INDEX fleet_update_hosts_position
ON fleet_update_hosts(fleet_update_id, position);
+27
View File
@@ -211,6 +211,33 @@ type PendingRun struct {
LastError string LastError string
} }
// FleetUpdate captures one "update all" invocation. Status moves
// running → one of {completed, halted, cancelled}. CurrentHostID
// tracks the host the worker is actively waiting on; cleared (empty)
// outside an active dispatch.
type FleetUpdate struct {
ID string
StartedAt time.Time
StartedByUserID string
TargetVersion string
Status string
CurrentHostID string
HaltedReason string
CompletedAt *time.Time
}
// FleetUpdateHost is one host's slot in a fleet update. Position is
// the iteration order. JobID is set once the worker has dispatched
// command.update for this host; FailedReason on a failed/halted row.
type FleetUpdateHost struct {
FleetUpdateID string
HostID string
Position int
Status string
JobID string
FailedReason string
}
// EnrollmentToken is the issuer's view of a one-time token. // EnrollmentToken is the issuer's view of a one-time token.
type EnrollmentToken struct { type EnrollmentToken struct {
Raw string Raw string