store: migrations 0021+0022 + fleet_updates CRUD
This commit is contained in:
@@ -0,0 +1,258 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrFleetUpdateRunning is returned by CreateFleetUpdate if another
|
||||
// fleet update is already in 'running' state. The HTTP layer surfaces
|
||||
// this as a 409 with a structured error code.
|
||||
var ErrFleetUpdateRunning = errors.New("store: fleet update already running")
|
||||
|
||||
// CreateFleetUpdate inserts the parent row and one pending child per
|
||||
// hostID, in the order given (position = index). Returns
|
||||
// ErrFleetUpdateRunning if a fleet update is already in flight.
|
||||
func (st *Store) CreateFleetUpdate(ctx context.Context, fu FleetUpdate, hostIDs []string) error {
|
||||
if fu.ID == "" || fu.StartedByUserID == "" || fu.TargetVersion == "" {
|
||||
return errors.New("store: fleet update id, user_id, target_version required")
|
||||
}
|
||||
if fu.Status == "" {
|
||||
fu.Status = "running"
|
||||
}
|
||||
if fu.StartedAt.IsZero() {
|
||||
fu.StartedAt = time.Now().UTC()
|
||||
}
|
||||
tx, err := st.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: begin: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
var existing string
|
||||
if err := tx.QueryRowContext(ctx,
|
||||
`SELECT id FROM fleet_updates WHERE status = 'running' LIMIT 1`).
|
||||
Scan(&existing); err == nil {
|
||||
return fmt.Errorf("%w: %s", ErrFleetUpdateRunning, existing)
|
||||
} else if !errors.Is(err, sql.ErrNoRows) {
|
||||
return fmt.Errorf("store: check active fleet update: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`INSERT INTO fleet_updates (id, started_at, started_by_user_id, target_version, status)
|
||||
VALUES (?, ?, ?, ?, ?)`,
|
||||
fu.ID, fu.StartedAt.UTC().Format(time.RFC3339Nano), fu.StartedByUserID, fu.TargetVersion, fu.Status,
|
||||
); err != nil {
|
||||
return fmt.Errorf("store: insert fleet_updates: %w", err)
|
||||
}
|
||||
for i, hid := range hostIDs {
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`INSERT INTO fleet_update_hosts (fleet_update_id, host_id, position, status)
|
||||
VALUES (?, ?, ?, 'pending')`,
|
||||
fu.ID, hid, i,
|
||||
); err != nil {
|
||||
return fmt.Errorf("store: insert fleet_update_hosts: %w", err)
|
||||
}
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// ActiveFleetUpdate returns the currently-running fleet update or nil.
|
||||
func (st *Store) ActiveFleetUpdate(ctx context.Context) (*FleetUpdate, error) {
|
||||
var fu FleetUpdate
|
||||
var startedAt string
|
||||
var current sql.NullString
|
||||
var halted sql.NullString
|
||||
var completedAt sql.NullString
|
||||
err := st.db.QueryRowContext(ctx,
|
||||
`SELECT id, started_at, started_by_user_id, target_version, status,
|
||||
current_host_id, halted_reason, completed_at
|
||||
FROM fleet_updates WHERE status = 'running' LIMIT 1`).
|
||||
Scan(&fu.ID, &startedAt, &fu.StartedByUserID, &fu.TargetVersion, &fu.Status,
|
||||
¤t, &halted, &completedAt)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store: active fleet update: %w", err)
|
||||
}
|
||||
fu.StartedAt, _ = time.Parse(time.RFC3339Nano, startedAt)
|
||||
fu.CurrentHostID = current.String
|
||||
fu.HaltedReason = halted.String
|
||||
if completedAt.Valid {
|
||||
t, _ := time.Parse(time.RFC3339Nano, completedAt.String)
|
||||
fu.CompletedAt = &t
|
||||
}
|
||||
return &fu, nil
|
||||
}
|
||||
|
||||
// GetFleetUpdate hydrates parent + ordered child rows. Returns
|
||||
// ErrNotFound on missing id.
|
||||
func (st *Store) GetFleetUpdate(ctx context.Context, id string) (*FleetUpdate, []FleetUpdateHost, error) {
|
||||
var fu FleetUpdate
|
||||
var startedAt string
|
||||
var current sql.NullString
|
||||
var halted sql.NullString
|
||||
var completedAt sql.NullString
|
||||
err := st.db.QueryRowContext(ctx,
|
||||
`SELECT id, started_at, started_by_user_id, target_version, status,
|
||||
current_host_id, halted_reason, completed_at
|
||||
FROM fleet_updates WHERE id = ?`, id).
|
||||
Scan(&fu.ID, &startedAt, &fu.StartedByUserID, &fu.TargetVersion, &fu.Status,
|
||||
¤t, &halted, &completedAt)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("store: get fleet update: %w", err)
|
||||
}
|
||||
fu.StartedAt, _ = time.Parse(time.RFC3339Nano, startedAt)
|
||||
fu.CurrentHostID = current.String
|
||||
fu.HaltedReason = halted.String
|
||||
if completedAt.Valid {
|
||||
t, _ := time.Parse(time.RFC3339Nano, completedAt.String)
|
||||
fu.CompletedAt = &t
|
||||
}
|
||||
|
||||
rows, err := st.db.QueryContext(ctx,
|
||||
`SELECT host_id, position, status, COALESCE(job_id, ''), COALESCE(failed_reason, '')
|
||||
FROM fleet_update_hosts
|
||||
WHERE fleet_update_id = ?
|
||||
ORDER BY position`, id)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("store: list fleet hosts: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
out := []FleetUpdateHost{}
|
||||
for rows.Next() {
|
||||
fh := FleetUpdateHost{FleetUpdateID: id}
|
||||
if err := rows.Scan(&fh.HostID, &fh.Position, &fh.Status, &fh.JobID, &fh.FailedReason); err != nil {
|
||||
return nil, nil, fmt.Errorf("store: scan fleet host: %w", err)
|
||||
}
|
||||
out = append(out, fh)
|
||||
}
|
||||
return &fu, out, rows.Err()
|
||||
}
|
||||
|
||||
// ListPendingFleetUpdateHosts returns rows with status='pending' for
|
||||
// this fleet update, in position order. The worker calls this to
|
||||
// pick the next host to dispatch.
|
||||
func (st *Store) ListPendingFleetUpdateHosts(ctx context.Context, fuID string) ([]FleetUpdateHost, error) {
|
||||
rows, err := st.db.QueryContext(ctx,
|
||||
`SELECT host_id, position, status, COALESCE(job_id, ''), COALESCE(failed_reason, '')
|
||||
FROM fleet_update_hosts
|
||||
WHERE fleet_update_id = ? AND status = 'pending'
|
||||
ORDER BY position`, fuID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store: list pending fleet hosts: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
out := []FleetUpdateHost{}
|
||||
for rows.Next() {
|
||||
fh := FleetUpdateHost{FleetUpdateID: fuID}
|
||||
if err := rows.Scan(&fh.HostID, &fh.Position, &fh.Status, &fh.JobID, &fh.FailedReason); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, fh)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// SetFleetUpdateHostStatus moves one row through pending → running →
|
||||
// {succeeded, failed, skipped}. failedReason and jobID may be empty
|
||||
// (e.g. on succeeded). Empty values are stored as NULL so subsequent
|
||||
// reads round-trip cleanly via COALESCE.
|
||||
func (st *Store) SetFleetUpdateHostStatus(ctx context.Context, fuID, hostID, status, failedReason, jobID string) error {
|
||||
_, err := st.db.ExecContext(ctx,
|
||||
`UPDATE fleet_update_hosts
|
||||
SET status = ?, failed_reason = ?, job_id = COALESCE(?, job_id)
|
||||
WHERE fleet_update_id = ? AND host_id = ?`,
|
||||
status, nullableString(failedReason), nullableString(jobID),
|
||||
fuID, hostID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: set fleet host status: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetFleetUpdateCurrentHost stamps which host the worker is actively
|
||||
// waiting on. Pass empty string to clear.
|
||||
func (st *Store) SetFleetUpdateCurrentHost(ctx context.Context, fuID, hostID string) error {
|
||||
_, err := st.db.ExecContext(ctx,
|
||||
`UPDATE fleet_updates SET current_host_id = ? WHERE id = ?`,
|
||||
nullableString(hostID), fuID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: set fleet current host: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HaltFleetUpdate flips status to 'halted', stamps the reason, and
|
||||
// clears current_host_id.
|
||||
func (st *Store) HaltFleetUpdate(ctx context.Context, fuID, reason string, when time.Time) error {
|
||||
_, err := st.db.ExecContext(ctx,
|
||||
`UPDATE fleet_updates
|
||||
SET status = 'halted', halted_reason = ?, current_host_id = NULL,
|
||||
completed_at = ?
|
||||
WHERE id = ? AND status = 'running'`,
|
||||
reason, when.UTC().Format(time.RFC3339Nano), fuID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: halt fleet update: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelFleetUpdate flips status to 'cancelled'. Caller checks that
|
||||
// the row is still 'running' before calling.
|
||||
func (st *Store) CancelFleetUpdate(ctx context.Context, fuID string, when time.Time) error {
|
||||
_, err := st.db.ExecContext(ctx,
|
||||
`UPDATE fleet_updates
|
||||
SET status = 'cancelled', current_host_id = NULL, completed_at = ?
|
||||
WHERE id = ? AND status = 'running'`,
|
||||
when.UTC().Format(time.RFC3339Nano), fuID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: cancel fleet update: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CompleteFleetUpdate flips status to 'completed' once every host has
|
||||
// reached a terminal state.
|
||||
func (st *Store) CompleteFleetUpdate(ctx context.Context, fuID string, when time.Time) error {
|
||||
_, err := st.db.ExecContext(ctx,
|
||||
`UPDATE fleet_updates
|
||||
SET status = 'completed', current_host_id = NULL, completed_at = ?
|
||||
WHERE id = ? AND status = 'running'`,
|
||||
when.UTC().Format(time.RFC3339Nano), fuID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: complete fleet update: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunningUpdateJobForHost returns the id of any in-flight (queued or
|
||||
// running) `update` job for hostID, or "" + nil if none. Used by the
|
||||
// host-update HTTP handler to refuse double-dispatch and by the
|
||||
// fleet worker to dedupe on retry.
|
||||
func (st *Store) RunningUpdateJobForHost(ctx context.Context, hostID string) (string, error) {
|
||||
var id string
|
||||
err := st.db.QueryRowContext(ctx,
|
||||
`SELECT id FROM jobs
|
||||
WHERE host_id = ? AND kind = 'update' AND status IN ('queued','running')
|
||||
ORDER BY created_at DESC LIMIT 1`, hostID).Scan(&id)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return "", nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("store: running update job: %w", err)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
@@ -0,0 +1,180 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
)
|
||||
|
||||
func ptrStr(s string) *string { return &s }
|
||||
|
||||
func seedFleetUser(t *testing.T, s *Store) string {
|
||||
t.Helper()
|
||||
id := ulid.Make().String()
|
||||
if err := s.CreateUser(context.Background(), User{
|
||||
ID: id, Username: "u-" + id[:6], PasswordHash: "x", Role: RoleAdmin,
|
||||
}); err != nil {
|
||||
t.Fatalf("create user: %v", err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func seedFleetHost(t *testing.T, s *Store, name string) string {
|
||||
t.Helper()
|
||||
id := ulid.Make().String()
|
||||
if err := s.CreateHost(context.Background(), Host{
|
||||
ID: id, Name: name, OS: "linux", Arch: "amd64",
|
||||
EnrolledAt: time.Now().UTC(),
|
||||
}, "tokenhash-"+id[:6], ""); err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func TestCreateFleetUpdate_RefusesIfRunning(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := openTestStore(t)
|
||||
uid := seedFleetUser(t, s)
|
||||
h1 := seedFleetHost(t, s, "h1")
|
||||
|
||||
fu1 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"}
|
||||
if err := s.CreateFleetUpdate(context.Background(), fu1, []string{h1}); err != nil {
|
||||
t.Fatalf("create #1: %v", err)
|
||||
}
|
||||
fu2 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v2"}
|
||||
err := s.CreateFleetUpdate(context.Background(), fu2, []string{h1})
|
||||
if !errors.Is(err, ErrFleetUpdateRunning) {
|
||||
t.Fatalf("want ErrFleetUpdateRunning, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateFleetUpdate_HydrateRoundTrip(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := openTestStore(t)
|
||||
uid := seedFleetUser(t, s)
|
||||
h1 := seedFleetHost(t, s, "h1")
|
||||
h2 := seedFleetHost(t, s, "h2")
|
||||
|
||||
fu := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1.2.3"}
|
||||
if err := s.CreateFleetUpdate(context.Background(), fu, []string{h1, h2}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got, hosts, err := s.GetFleetUpdate(context.Background(), fu.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got.Status != "running" || got.TargetVersion != "v1.2.3" {
|
||||
t.Fatalf("parent: %+v", got)
|
||||
}
|
||||
if len(hosts) != 2 || hosts[0].Position != 0 || hosts[1].Position != 1 {
|
||||
t.Fatalf("hosts: %+v", hosts)
|
||||
}
|
||||
if hosts[0].Status != "pending" || hosts[1].Status != "pending" {
|
||||
t.Fatalf("hosts status: %+v", hosts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetFleetUpdateHostStatus_ProgressesAndStoresJobID(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := openTestStore(t)
|
||||
uid := seedFleetUser(t, s)
|
||||
h := seedFleetHost(t, s, "h1")
|
||||
fu := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"}
|
||||
_ = s.CreateFleetUpdate(context.Background(), fu, []string{h})
|
||||
|
||||
jobID := ulid.Make().String()
|
||||
if err := s.CreateJob(context.Background(), Job{
|
||||
ID: jobID, HostID: h, Kind: "update",
|
||||
ActorKind: "user", ActorID: ptrStr(uid), CreatedAt: time.Now().UTC(),
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.SetFleetUpdateHostStatus(context.Background(), fu.ID, h, "running", "", ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.SetFleetUpdateHostStatus(context.Background(), fu.ID, h, "succeeded", "", jobID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, hs, _ := s.GetFleetUpdate(context.Background(), fu.ID)
|
||||
if hs[0].Status != "succeeded" || hs[0].JobID != jobID {
|
||||
t.Fatalf("after succeed: %+v", hs[0])
|
||||
}
|
||||
|
||||
pending, _ := s.ListPendingFleetUpdateHosts(context.Background(), fu.ID)
|
||||
if len(pending) != 0 {
|
||||
t.Fatalf("pending should be empty: %+v", pending)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHaltAndCompleteFleetUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := openTestStore(t)
|
||||
uid := seedFleetUser(t, s)
|
||||
h := seedFleetHost(t, s, "h1")
|
||||
|
||||
fu1 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v1"}
|
||||
_ = s.CreateFleetUpdate(context.Background(), fu1, []string{h})
|
||||
if err := s.HaltFleetUpdate(context.Background(), fu1.ID, "boom", time.Now().UTC()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _, _ := s.GetFleetUpdate(context.Background(), fu1.ID)
|
||||
if got.Status != "halted" || got.HaltedReason != "boom" {
|
||||
t.Fatalf("after halt: %+v", got)
|
||||
}
|
||||
if got.CompletedAt == nil {
|
||||
t.Fatal("halted must stamp completed_at")
|
||||
}
|
||||
if active, _ := s.ActiveFleetUpdate(context.Background()); active != nil {
|
||||
t.Fatalf("halted should clear active: %+v", active)
|
||||
}
|
||||
|
||||
// Now a fresh run can start.
|
||||
fu2 := FleetUpdate{ID: ulid.Make().String(), StartedByUserID: uid, TargetVersion: "v2"}
|
||||
if err := s.CreateFleetUpdate(context.Background(), fu2, []string{h}); err != nil {
|
||||
t.Fatalf("create after halt: %v", err)
|
||||
}
|
||||
if err := s.CompleteFleetUpdate(context.Background(), fu2.ID, time.Now().UTC()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _, _ = s.GetFleetUpdate(context.Background(), fu2.ID)
|
||||
if got.Status != "completed" {
|
||||
t.Fatalf("after complete: %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunningUpdateJobForHost(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := openTestStore(t)
|
||||
h := seedFleetHost(t, s, "h1")
|
||||
|
||||
got, err := s.RunningUpdateJobForHost(context.Background(), h)
|
||||
if err != nil || got != "" {
|
||||
t.Fatalf("empty case: got=%q err=%v", got, err)
|
||||
}
|
||||
|
||||
jobID := ulid.Make().String()
|
||||
if err := s.CreateJob(context.Background(), Job{
|
||||
ID: jobID, HostID: h, Kind: "update",
|
||||
ActorKind: "user", ActorID: ptrStr("u-1"), CreatedAt: time.Now().UTC(),
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, err = s.RunningUpdateJobForHost(context.Background(), h)
|
||||
if err != nil || got != jobID {
|
||||
t.Fatalf("queued case: got=%q err=%v", got, err)
|
||||
}
|
||||
|
||||
// Mark succeeded → no longer "in flight".
|
||||
if err := s.MarkJobFinished(context.Background(), jobID, "succeeded", 0, nil, "", time.Now().UTC()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, err = s.RunningUpdateJobForHost(context.Background(), h)
|
||||
if err != nil || got != "" {
|
||||
t.Fatalf("after succeed: got=%q err=%v", got, err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
-- 0021_jobs_update_kind.sql
|
||||
--
|
||||
-- Add 'update' to the jobs.kind CHECK constraint so the agent
|
||||
-- self-update flow (P6-01) can persist its job rows. SQLite can't
|
||||
-- ALTER a CHECK in place, so we rebuild the table.
|
||||
--
|
||||
-- Same safe rebuild pattern as 0012:
|
||||
-- 1. Stash job_logs into a temp table BEFORE rebuilding jobs.
|
||||
-- 2. Create jobs_new with the wider CHECK; copy data; DROP jobs;
|
||||
-- RENAME jobs_new TO jobs.
|
||||
-- 3. Restore job_logs (cascade-trap defence — see CLAUDE.md).
|
||||
--
|
||||
-- jobs_new mirrors the live schema *including* post-0012 column
|
||||
-- additions (0015 added source_group_id). When adding a new
|
||||
-- migration that touches this table, mirror the latest column set.
|
||||
|
||||
CREATE TEMPORARY TABLE _job_logs_backup AS
|
||||
SELECT job_id, seq, ts, stream, payload FROM job_logs;
|
||||
|
||||
CREATE TABLE jobs_new (
|
||||
id TEXT PRIMARY KEY,
|
||||
host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE,
|
||||
kind TEXT NOT NULL CHECK (kind IN
|
||||
('backup','init','forget','prune','check','unlock','restore','diff','update')),
|
||||
status TEXT NOT NULL CHECK (status IN ('queued','running','succeeded','failed','cancelled')),
|
||||
scheduled_id TEXT REFERENCES schedules(id) ON DELETE SET NULL,
|
||||
actor_kind TEXT NOT NULL CHECK (actor_kind IN ('user','schedule','system')),
|
||||
actor_id TEXT,
|
||||
started_at TEXT,
|
||||
finished_at TEXT,
|
||||
exit_code INTEGER,
|
||||
stats TEXT,
|
||||
error TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
source_group_id TEXT REFERENCES source_groups(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
INSERT INTO jobs_new
|
||||
SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
|
||||
started_at, finished_at, exit_code, stats, error, created_at,
|
||||
source_group_id
|
||||
FROM jobs;
|
||||
|
||||
DROP TABLE jobs;
|
||||
ALTER TABLE jobs_new RENAME TO jobs;
|
||||
|
||||
CREATE INDEX jobs_host_id ON jobs(host_id);
|
||||
CREATE INDEX jobs_status ON jobs(status);
|
||||
CREATE INDEX jobs_created_at ON jobs(created_at);
|
||||
CREATE INDEX jobs_source_group_id ON jobs(source_group_id);
|
||||
|
||||
-- Defensive: restore job_logs from the temp backup. INSERT OR IGNORE
|
||||
-- so a re-run is harmless. Same shape as 0012's safety net.
|
||||
INSERT OR IGNORE INTO job_logs (job_id, seq, ts, stream, payload)
|
||||
SELECT job_id, seq, ts, stream, payload FROM _job_logs_backup;
|
||||
|
||||
DROP TABLE _job_logs_backup;
|
||||
@@ -0,0 +1,35 @@
|
||||
-- 0022_fleet_updates.sql
|
||||
--
|
||||
-- Tables backing the rolling fleet-update worker (P6-02). One row in
|
||||
-- fleet_updates per "update all" invocation, a child row per host so
|
||||
-- the worker can iterate in position order, report progress, and
|
||||
-- record per-host outcome. Halt-on-fail semantics live in the worker
|
||||
-- (internal/server/fleetupdate); this schema just captures state.
|
||||
|
||||
CREATE TABLE fleet_updates (
|
||||
id TEXT PRIMARY KEY,
|
||||
started_at TEXT NOT NULL,
|
||||
started_by_user_id TEXT NOT NULL REFERENCES users(id),
|
||||
target_version TEXT NOT NULL,
|
||||
status TEXT NOT NULL CHECK (status IN
|
||||
('running','completed','halted','cancelled')),
|
||||
current_host_id TEXT REFERENCES hosts(id),
|
||||
halted_reason TEXT,
|
||||
completed_at TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX fleet_updates_status ON fleet_updates(status);
|
||||
|
||||
CREATE TABLE fleet_update_hosts (
|
||||
fleet_update_id TEXT NOT NULL REFERENCES fleet_updates(id) ON DELETE CASCADE,
|
||||
host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE,
|
||||
position INTEGER NOT NULL,
|
||||
status TEXT NOT NULL CHECK (status IN
|
||||
('pending','running','succeeded','failed','skipped')),
|
||||
job_id TEXT REFERENCES jobs(id) ON DELETE SET NULL,
|
||||
failed_reason TEXT,
|
||||
PRIMARY KEY (fleet_update_id, host_id)
|
||||
);
|
||||
|
||||
CREATE INDEX fleet_update_hosts_position
|
||||
ON fleet_update_hosts(fleet_update_id, position);
|
||||
@@ -211,6 +211,33 @@ type PendingRun struct {
|
||||
LastError string
|
||||
}
|
||||
|
||||
// FleetUpdate captures one "update all" invocation. Status moves
|
||||
// running → one of {completed, halted, cancelled}. CurrentHostID
|
||||
// tracks the host the worker is actively waiting on; cleared (empty)
|
||||
// outside an active dispatch.
|
||||
type FleetUpdate struct {
|
||||
ID string
|
||||
StartedAt time.Time
|
||||
StartedByUserID string
|
||||
TargetVersion string
|
||||
Status string
|
||||
CurrentHostID string
|
||||
HaltedReason string
|
||||
CompletedAt *time.Time
|
||||
}
|
||||
|
||||
// FleetUpdateHost is one host's slot in a fleet update. Position is
|
||||
// the iteration order. JobID is set once the worker has dispatched
|
||||
// command.update for this host; FailedReason on a failed/halted row.
|
||||
type FleetUpdateHost struct {
|
||||
FleetUpdateID string
|
||||
HostID string
|
||||
Position int
|
||||
Status string
|
||||
JobID string
|
||||
FailedReason string
|
||||
}
|
||||
|
||||
// EnrollmentToken is the issuer's view of a one-time token.
|
||||
type EnrollmentToken struct {
|
||||
Raw string
|
||||
|
||||
Reference in New Issue
Block a user