p6-01/02: agent self-update + fleet update server cluster

- alert: update_failed (per-host, dedup=hostID) + fleet_update_halted
  (system-scoped, host_id NULL via new RaiseOrTouchSystem helper).
- ws: UpdateWatcher tracks in-flight command.update dispatches and
  reconciles them against incoming hello envelopes — success path
  marks the job succeeded and auto-resolves the alert; 90s timeout
  marks the job failed and raises update_failed.
- http: POST /api/hosts/{id}/update (admin-only JSON) + the HTMX
  /hosts/{id}/update form variant. Pre-checks: host exists, online,
  agent_version != current, no running update job. Refactored core
  into Server.dispatchHostUpdate so the fleet worker can share it
  without going through HTTP.
- fleetupdate: rolling worker iterating through host slots, halting
  on first failure and raising fleet_update_halted. Polling-based
  version-match (re-read hosts.agent_version every 1s up to 95s) —
  no extra plumbing into the WS hello path. At-most-one-running is
  enforced at the store layer (ErrFleetUpdateRunning).
- cmd/server: wire UpdateWatcher and FleetWorker into the main
  goroutine; the worker uses a small serverDispatcher adapter that
  delegates back into Server.DispatchHostUpdate.

Tests: watcher (success/timeout/mismatch/late-hello), HTTP endpoint
(happy + four pre-check branches + RBAC), worker (two-host happy,
timeout-halt, host-offline-halt, already-at-target skip, cancel
mid-run, double-Start guard).
This commit is contained in:
2026-05-06 22:03:50 +01:00
parent d413896302
commit 6fd2a2ff77
11 changed files with 1540 additions and 2 deletions
+221
View File
@@ -0,0 +1,221 @@
// Package fleetupdate drives a rolling, sequential agent self-update
// over a list of hosts. One worker goroutine per Start() call (gated
// at the store layer to at-most-one-running-fleet-update).
package fleetupdate
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// Hub is the slim "is this host connected?" surface.
type Hub interface {
Connected(hostID string) bool
}
// Dispatcher sends one command.update envelope. The implementer also
// creates the jobs row, writes audit, and registers with the update
// watcher. Pre-checks are the dispatcher's responsibility — the worker
// passes through whatever error it returns.
type Dispatcher interface {
DispatchUpdate(ctx context.Context, hostID string, actorUserID string) (jobID string, code string, err error)
}
// AlertRaiser is the slim view of the alert engine's host-less raise
// path. Used to emit fleet_update_halted on first failure.
type AlertRaiser interface {
RaiseFleetUpdateHalted(ctx context.Context, fleetUpdateID, reason string, when time.Time)
}
// Worker is the long-lived fleet-update orchestrator. There is at most
// one *running* fleet update at a time (enforced by the store).
type Worker struct {
store *store.Store
hub Hub
disp Dispatcher
alerts AlertRaiser
// targetVersion is the version every dispatched agent is expected
// to come back with. Captured at Start time to avoid drift.
targetVersion string
// pollPeriod controls the cadence at which the worker re-reads the
// host row to check for the version transition. Exposed for tests.
pollPeriod time.Duration
// hostTimeout bounds how long the worker waits for one host to
// reach the target version before halting.
hostTimeout time.Duration
}
// NewWorker builds an unstarted worker. targetVersion is set on each
// Start call; the values here are defaults.
func NewWorker(st *store.Store, hub Hub, disp Dispatcher, alerts AlertRaiser) *Worker {
return &Worker{
store: st,
hub: hub,
disp: disp,
alerts: alerts,
pollPeriod: 1 * time.Second,
hostTimeout: 95 * time.Second,
}
}
// Start creates the parent + child rows, then spawns the per-host
// worker goroutine. Returns the new fleet_update_id on success.
// store.ErrFleetUpdateRunning bubbles up unchanged.
func (w *Worker) Start(ctx context.Context, userID, targetVersion string, hostIDs []string) (string, error) {
if userID == "" || targetVersion == "" {
return "", errors.New("fleetupdate: userID and targetVersion required")
}
if len(hostIDs) == 0 {
return "", errors.New("fleetupdate: at least one host required")
}
fuID := ulid.Make().String()
now := time.Now().UTC()
if err := w.store.CreateFleetUpdate(ctx, store.FleetUpdate{
ID: fuID,
StartedAt: now,
StartedByUserID: userID,
TargetVersion: targetVersion,
Status: "running",
}, hostIDs); err != nil {
return "", err
}
// The goroutine outlives the request that started it; carry a
// detached context so an HTTP-handler ctx cancel doesn't abort
// the long roll.
bg := context.WithoutCancel(ctx)
go w.run(bg, fuID, userID, targetVersion)
return fuID, nil
}
// Cancel marks the fleet update cancelled. The running goroutine
// observes the new status on its next pre-check and exits without
// dispatching further hosts. The currently-dispatched job is left to
// finish on its own — cancelling agent-side is out of scope for v1.
func (w *Worker) Cancel(ctx context.Context, fuID string) error {
return w.store.CancelFleetUpdate(ctx, fuID, time.Now().UTC())
}
// run is the per-host loop. Halts on first failure; emits one alert
// on transition.
func (w *Worker) run(ctx context.Context, fuID, userID, targetVersion string) {
w.targetVersion = targetVersion
for {
// Check the parent row's status — picks up Cancel.
fu, err := w.store.ActiveFleetUpdate(ctx)
if err != nil {
slog.Warn("fleetupdate: read active", "fu_id", fuID, "err", err)
return
}
if fu == nil || fu.ID != fuID {
// Cancelled, halted, or completed externally. Done.
return
}
pending, err := w.store.ListPendingFleetUpdateHosts(ctx, fuID)
if err != nil {
slog.Warn("fleetupdate: list pending", "fu_id", fuID, "err", err)
return
}
if len(pending) == 0 {
now := time.Now().UTC()
if err := w.store.CompleteFleetUpdate(ctx, fuID, now); err != nil {
slog.Warn("fleetupdate: complete", "fu_id", fuID, "err", err)
}
return
}
next := pending[0]
w.processHost(ctx, fuID, userID, next)
}
}
// processHost handles one host slot. Marks it skipped, succeeded, or
// failed (and halts the fleet on failure).
func (w *Worker) processHost(ctx context.Context, fuID, userID string, slot store.FleetUpdateHost) {
hostID := slot.HostID
_ = w.store.SetFleetUpdateCurrentHost(ctx, fuID, hostID)
// Pre-flight: re-read the host. The dispatch path repeats most of
// these checks but doing them up-front lets us emit the right
// per-host status (skipped vs failed) without consuming a job row.
host, err := w.store.GetHost(ctx, hostID)
if err != nil || host == nil {
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "skipped", "host not found", "")
return
}
if host.AgentVersion != "" && host.AgentVersion == w.targetVersion {
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "skipped", "already at target version", "")
return
}
if !w.hub.Connected(hostID) {
reason := fmt.Sprintf("host went offline: %s", hostID)
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, "")
w.halt(ctx, fuID, reason)
return
}
// Dispatch.
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "running", "", "")
jobID, code, err := w.disp.DispatchUpdate(ctx, hostID, userID)
if err != nil || code != "" {
reason := dispatchErrorReason(code, err)
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, jobID)
w.halt(ctx, fuID, reason)
return
}
// Poll until the host's recorded agent_version matches target, or
// timeout.
deadline := time.Now().Add(w.hostTimeout)
for time.Now().Before(deadline) {
// Honour cancellation between polls.
fu, err := w.store.ActiveFleetUpdate(ctx)
if err == nil && (fu == nil || fu.ID != fuID) {
// Cancelled mid-host; leave the slot in 'running' for the
// admin to inspect. No further dispatches.
return
}
time.Sleep(w.pollPeriod)
h, err := w.store.GetHost(ctx, hostID)
if err == nil && h != nil && h.AgentVersion == w.targetVersion {
if err := w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "succeeded", "", jobID); err != nil {
slog.Warn("fleetupdate: set succeeded", "fu_id", fuID, "host_id", hostID, "err", err)
}
return
}
}
reason := fmt.Sprintf("timeout waiting for %s to reach %s", hostID, w.targetVersion)
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, jobID)
w.halt(ctx, fuID, reason)
}
func (w *Worker) halt(ctx context.Context, fuID, reason string) {
now := time.Now().UTC()
if err := w.store.HaltFleetUpdate(ctx, fuID, reason, now); err != nil {
slog.Warn("fleetupdate: halt", "fu_id", fuID, "err", err)
}
if w.alerts != nil {
w.alerts.RaiseFleetUpdateHalted(ctx, fuID, reason, now)
}
}
func dispatchErrorReason(code string, err error) string {
if code != "" {
return "dispatch failed: " + code
}
if err != nil {
return err.Error()
}
return "dispatch failed"
}
+344
View File
@@ -0,0 +1,344 @@
package fleetupdate
import (
"context"
"errors"
"path/filepath"
"sync"
"testing"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
type fakeHub struct {
mu sync.Mutex
online map[string]bool
}
func (f *fakeHub) Connected(hostID string) bool {
f.mu.Lock()
defer f.mu.Unlock()
return f.online[hostID]
}
type fakeDispatcher struct {
mu sync.Mutex
calls []string // host IDs
// after dispatch, set the host's agent_version to this on the
// store so the worker observes the version transition.
st *store.Store
target string
delayMS int
failOnHost map[string]string // host → error code
}
func (f *fakeDispatcher) DispatchUpdate(ctx context.Context, hostID, _ string) (string, string, error) {
f.mu.Lock()
f.calls = append(f.calls, hostID)
if code, ok := f.failOnHost[hostID]; ok {
f.mu.Unlock()
return "", code, nil
}
st := f.st
target := f.target
delay := f.delayMS
f.mu.Unlock()
jobID := ulid.Make().String()
if st != nil {
_ = st.CreateJob(context.Background(), store.Job{
ID: jobID, HostID: hostID, Kind: "update",
ActorKind: "user", CreatedAt: time.Now().UTC(),
})
}
if st != nil && target != "" {
go func() {
if delay > 0 {
time.Sleep(time.Duration(delay) * time.Millisecond)
}
_ = st.MarkHostHello(context.Background(), hostID, target, "0.17", api.CurrentProtocolVersion, time.Now().UTC())
}()
}
return jobID, "", nil
}
type recAlert struct {
mu sync.Mutex
reasons []string
}
func (r *recAlert) RaiseFleetUpdateHalted(_ context.Context, _ string, reason string, _ time.Time) {
r.mu.Lock()
r.reasons = append(r.reasons, reason)
r.mu.Unlock()
}
func openStore(t *testing.T) *store.Store {
t.Helper()
dir := t.TempDir()
st, err := store.Open(context.Background(), filepath.Join(dir, "rm.db"))
if err != nil {
t.Fatalf("open: %v", err)
}
t.Cleanup(func() { _ = st.Close() })
return st
}
func mustCreateAdmin(t *testing.T, st *store.Store) string {
t.Helper()
uid := ulid.Make().String()
if err := st.CreateUser(context.Background(), store.User{
ID: uid, Username: "u-" + uid[:6],
PasswordHash: "x", Role: store.RoleAdmin, CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("user: %v", err)
}
return uid
}
func mustCreateHost(t *testing.T, st *store.Store, name, version string) string {
t.Helper()
hostID := ulid.Make().String()
if err := st.CreateHost(context.Background(), store.Host{
ID: hostID, Name: name, OS: "linux", Arch: "amd64",
EnrolledAt: time.Now().UTC(),
}, "deadbeef-"+hostID, ""); err != nil {
t.Fatalf("host: %v", err)
}
if version != "" {
if err := st.MarkHostHello(context.Background(), hostID, version, "0.17", api.CurrentProtocolVersion, time.Now().UTC()); err != nil {
t.Fatalf("hello: %v", err)
}
}
return hostID
}
func waitForStatus(t *testing.T, st *store.Store, fuID, want string, timeout time.Duration) *store.FleetUpdate {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
fu, _, err := st.GetFleetUpdate(context.Background(), fuID)
if err == nil && fu != nil && fu.Status == want {
return fu
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("status never reached %q", want)
return nil
}
func TestWorkerTwoHostsBothSucceed(t *testing.T) {
st := openStore(t)
uid := mustCreateAdmin(t, st)
h1 := mustCreateHost(t, st, "h1", "v0")
h2 := mustCreateHost(t, st, "h2", "v0")
hub := &fakeHub{online: map[string]bool{h1: true, h2: true}}
disp := &fakeDispatcher{st: st, target: "v2", delayMS: 30}
alerts := &recAlert{}
w := NewWorker(st, hub, disp, alerts)
w.pollPeriod = 20 * time.Millisecond
w.hostTimeout = 2 * time.Second
fuID, err := w.Start(context.Background(), uid, "v2", []string{h1, h2})
if err != nil {
t.Fatalf("start: %v", err)
}
waitForStatus(t, st, fuID, "completed", 5*time.Second)
_, hosts, _ := st.GetFleetUpdate(context.Background(), fuID)
for _, h := range hosts {
if h.Status != "succeeded" {
t.Errorf("host %s status %q want succeeded", h.HostID, h.Status)
}
}
if n := len(alerts.reasons); n != 0 {
t.Errorf("unexpected halt alert: %v", alerts.reasons)
}
}
func TestWorkerSecondHostTimesOutHalts(t *testing.T) {
st := openStore(t)
uid := mustCreateAdmin(t, st)
h1 := mustCreateHost(t, st, "h1", "v0")
h2 := mustCreateHost(t, st, "h2", "v0")
h3 := mustCreateHost(t, st, "h3", "v0")
hub := &fakeHub{online: map[string]bool{h1: true, h2: true, h3: true}}
// h1 dispatches normally (transitions to v2). h2 dispatch returns
// success but never transitions.
disp := &fakeDispatcher{st: st, target: "v2", delayMS: 20, failOnHost: map[string]string{
h2: "", // not a code-failure; simulate by clearing target on this disp run
}}
// Actually: drop h2 from the auto-transition by faking with a
// per-host store setter. Easiest: subclass via a wrapper.
_ = disp
customDisp := &perHostDispatcher{base: disp, st: st, target: "v2", noTransition: map[string]bool{h2: true}}
alerts := &recAlert{}
w := NewWorker(st, hub, customDisp, alerts)
w.pollPeriod = 20 * time.Millisecond
w.hostTimeout = 200 * time.Millisecond
fuID, err := w.Start(context.Background(), uid, "v2", []string{h1, h2, h3})
if err != nil {
t.Fatalf("start: %v", err)
}
waitForStatus(t, st, fuID, "halted", 3*time.Second)
_, hosts, _ := st.GetFleetUpdate(context.Background(), fuID)
gotStatus := map[string]string{}
for _, h := range hosts {
gotStatus[h.HostID] = h.Status
}
if gotStatus[h1] != "succeeded" {
t.Errorf("h1: %q", gotStatus[h1])
}
if gotStatus[h2] != "failed" {
t.Errorf("h2: %q", gotStatus[h2])
}
if gotStatus[h3] != "pending" {
t.Errorf("h3: %q", gotStatus[h3])
}
alerts.mu.Lock()
defer alerts.mu.Unlock()
if len(alerts.reasons) != 1 {
t.Errorf("alert reasons: %v", alerts.reasons)
}
}
// perHostDispatcher lets a test omit the auto-transition for selected
// hosts so we can simulate timeout.
type perHostDispatcher struct {
mu sync.Mutex
base *fakeDispatcher
st *store.Store
target string
noTransition map[string]bool
}
func (p *perHostDispatcher) DispatchUpdate(_ context.Context, hostID, _ string) (string, string, error) {
p.mu.Lock()
skip := p.noTransition[hostID]
p.mu.Unlock()
jobID := ulid.Make().String()
_ = p.st.CreateJob(context.Background(), store.Job{
ID: jobID, HostID: hostID, Kind: "update",
ActorKind: "user", CreatedAt: time.Now().UTC(),
})
if !skip {
go func() {
time.Sleep(20 * time.Millisecond)
_ = p.st.MarkHostHello(context.Background(), hostID, p.target, "0.17", api.CurrentProtocolVersion, time.Now().UTC())
}()
}
return jobID, "", nil
}
func TestWorkerHostOfflineHalts(t *testing.T) {
st := openStore(t)
uid := mustCreateAdmin(t, st)
h1 := mustCreateHost(t, st, "h1", "v0")
h2 := mustCreateHost(t, st, "h2", "v0")
hub := &fakeHub{online: map[string]bool{h1: false, h2: true}}
disp := &fakeDispatcher{st: st, target: "v2"}
alerts := &recAlert{}
w := NewWorker(st, hub, disp, alerts)
w.pollPeriod = 20 * time.Millisecond
w.hostTimeout = 500 * time.Millisecond
fuID, err := w.Start(context.Background(), uid, "v2", []string{h1, h2})
if err != nil {
t.Fatalf("start: %v", err)
}
waitForStatus(t, st, fuID, "halted", 2*time.Second)
_, hosts, _ := st.GetFleetUpdate(context.Background(), fuID)
if hosts[0].Status != "failed" {
t.Errorf("h1 status: %q", hosts[0].Status)
}
if hosts[1].Status != "pending" {
t.Errorf("h2 status: %q", hosts[1].Status)
}
}
func TestWorkerAlreadyAtTargetSkipped(t *testing.T) {
st := openStore(t)
uid := mustCreateAdmin(t, st)
h1 := mustCreateHost(t, st, "h1", "v2")
h2 := mustCreateHost(t, st, "h2", "v0")
hub := &fakeHub{online: map[string]bool{h1: true, h2: true}}
disp := &fakeDispatcher{st: st, target: "v2", delayMS: 20}
alerts := &recAlert{}
w := NewWorker(st, hub, disp, alerts)
w.pollPeriod = 20 * time.Millisecond
w.hostTimeout = 2 * time.Second
fuID, err := w.Start(context.Background(), uid, "v2", []string{h1, h2})
if err != nil {
t.Fatalf("start: %v", err)
}
waitForStatus(t, st, fuID, "completed", 4*time.Second)
_, hosts, _ := st.GetFleetUpdate(context.Background(), fuID)
want := map[string]string{h1: "skipped", h2: "succeeded"}
for _, h := range hosts {
if h.Status != want[h.HostID] {
t.Errorf("host %s: got %q want %q", h.HostID, h.Status, want[h.HostID])
}
}
}
func TestWorkerCancelMidRun(t *testing.T) {
st := openStore(t)
uid := mustCreateAdmin(t, st)
h1 := mustCreateHost(t, st, "h1", "v0")
h2 := mustCreateHost(t, st, "h2", "v0")
hub := &fakeHub{online: map[string]bool{h1: true, h2: true}}
// h1's transition is delayed long enough that we can cancel
// before it lands; h2 should never be touched.
disp := &fakeDispatcher{st: st, target: "v2", delayMS: 500}
alerts := &recAlert{}
w := NewWorker(st, hub, disp, alerts)
w.pollPeriod = 50 * time.Millisecond
w.hostTimeout = 5 * time.Second
fuID, err := w.Start(context.Background(), uid, "v2", []string{h1, h2})
if err != nil {
t.Fatalf("start: %v", err)
}
// Give the worker a moment to dispatch h1.
time.Sleep(100 * time.Millisecond)
if err := w.Cancel(context.Background(), fuID); err != nil {
t.Fatalf("cancel: %v", err)
}
waitForStatus(t, st, fuID, "cancelled", 2*time.Second)
// h2 should never be dispatched.
disp.mu.Lock()
defer disp.mu.Unlock()
for _, c := range disp.calls {
if c == h2 {
t.Errorf("h2 dispatched after cancel")
}
}
}
func TestWorkerStartWhileActiveErrors(t *testing.T) {
st := openStore(t)
uid := mustCreateAdmin(t, st)
h1 := mustCreateHost(t, st, "h1", "v0")
h2 := mustCreateHost(t, st, "h2", "v0")
hub := &fakeHub{online: map[string]bool{h1: true, h2: true}}
disp := &fakeDispatcher{st: st, target: "v2", delayMS: 5_000}
w := NewWorker(st, hub, disp, &recAlert{})
w.pollPeriod = 50 * time.Millisecond
w.hostTimeout = 2 * time.Second
if _, err := w.Start(context.Background(), uid, "v2", []string{h1}); err != nil {
t.Fatalf("first start: %v", err)
}
_, err := w.Start(context.Background(), uid, "v2", []string{h2})
if !errors.Is(err, store.ErrFleetUpdateRunning) {
t.Fatalf("err: %v want ErrFleetUpdateRunning", err)
}
}