6fd2a2ff77
- alert: update_failed (per-host, dedup=hostID) + fleet_update_halted
(system-scoped, host_id NULL via new RaiseOrTouchSystem helper).
- ws: UpdateWatcher tracks in-flight command.update dispatches and
reconciles them against incoming hello envelopes — success path
marks the job succeeded and auto-resolves the alert; 90s timeout
marks the job failed and raises update_failed.
- http: POST /api/hosts/{id}/update (admin-only JSON) + the HTMX
/hosts/{id}/update form variant. Pre-checks: host exists, online,
agent_version != current, no running update job. Refactored core
into Server.dispatchHostUpdate so the fleet worker can share it
without going through HTTP.
- fleetupdate: rolling worker iterating through host slots, halting
on first failure and raising fleet_update_halted. Polling-based
version-match (re-read hosts.agent_version every 1s up to 95s) —
no extra plumbing into the WS hello path. At-most-one-running is
enforced at the store layer (ErrFleetUpdateRunning).
- cmd/server: wire UpdateWatcher and FleetWorker into the main
goroutine; the worker uses a small serverDispatcher adapter that
delegates back into Server.DispatchHostUpdate.
Tests: watcher (success/timeout/mismatch/late-hello), HTTP endpoint
(happy + four pre-check branches + RBAC), worker (two-host happy,
timeout-halt, host-offline-halt, already-at-target skip, cancel
mid-run, double-Start guard).
222 lines
7.3 KiB
Go
222 lines
7.3 KiB
Go
// Package fleetupdate drives a rolling, sequential agent self-update
|
|
// over a list of hosts. One worker goroutine per Start() call (gated
|
|
// at the store layer to at-most-one-running-fleet-update).
|
|
package fleetupdate
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/oklog/ulid/v2"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
|
)
|
|
|
|
// Hub is the slim "is this host connected?" surface.
|
|
type Hub interface {
|
|
Connected(hostID string) bool
|
|
}
|
|
|
|
// Dispatcher sends one command.update envelope. The implementer also
|
|
// creates the jobs row, writes audit, and registers with the update
|
|
// watcher. Pre-checks are the dispatcher's responsibility — the worker
|
|
// passes through whatever error it returns.
|
|
type Dispatcher interface {
|
|
DispatchUpdate(ctx context.Context, hostID string, actorUserID string) (jobID string, code string, err error)
|
|
}
|
|
|
|
// AlertRaiser is the slim view of the alert engine's host-less raise
|
|
// path. Used to emit fleet_update_halted on first failure.
|
|
type AlertRaiser interface {
|
|
RaiseFleetUpdateHalted(ctx context.Context, fleetUpdateID, reason string, when time.Time)
|
|
}
|
|
|
|
// Worker is the long-lived fleet-update orchestrator. There is at most
|
|
// one *running* fleet update at a time (enforced by the store).
|
|
type Worker struct {
|
|
store *store.Store
|
|
hub Hub
|
|
disp Dispatcher
|
|
alerts AlertRaiser
|
|
|
|
// targetVersion is the version every dispatched agent is expected
|
|
// to come back with. Captured at Start time to avoid drift.
|
|
targetVersion string
|
|
|
|
// pollPeriod controls the cadence at which the worker re-reads the
|
|
// host row to check for the version transition. Exposed for tests.
|
|
pollPeriod time.Duration
|
|
// hostTimeout bounds how long the worker waits for one host to
|
|
// reach the target version before halting.
|
|
hostTimeout time.Duration
|
|
}
|
|
|
|
// NewWorker builds an unstarted worker. targetVersion is set on each
|
|
// Start call; the values here are defaults.
|
|
func NewWorker(st *store.Store, hub Hub, disp Dispatcher, alerts AlertRaiser) *Worker {
|
|
return &Worker{
|
|
store: st,
|
|
hub: hub,
|
|
disp: disp,
|
|
alerts: alerts,
|
|
pollPeriod: 1 * time.Second,
|
|
hostTimeout: 95 * time.Second,
|
|
}
|
|
}
|
|
|
|
// Start creates the parent + child rows, then spawns the per-host
|
|
// worker goroutine. Returns the new fleet_update_id on success.
|
|
// store.ErrFleetUpdateRunning bubbles up unchanged.
|
|
func (w *Worker) Start(ctx context.Context, userID, targetVersion string, hostIDs []string) (string, error) {
|
|
if userID == "" || targetVersion == "" {
|
|
return "", errors.New("fleetupdate: userID and targetVersion required")
|
|
}
|
|
if len(hostIDs) == 0 {
|
|
return "", errors.New("fleetupdate: at least one host required")
|
|
}
|
|
fuID := ulid.Make().String()
|
|
now := time.Now().UTC()
|
|
if err := w.store.CreateFleetUpdate(ctx, store.FleetUpdate{
|
|
ID: fuID,
|
|
StartedAt: now,
|
|
StartedByUserID: userID,
|
|
TargetVersion: targetVersion,
|
|
Status: "running",
|
|
}, hostIDs); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// The goroutine outlives the request that started it; carry a
|
|
// detached context so an HTTP-handler ctx cancel doesn't abort
|
|
// the long roll.
|
|
bg := context.WithoutCancel(ctx)
|
|
go w.run(bg, fuID, userID, targetVersion)
|
|
return fuID, nil
|
|
}
|
|
|
|
// Cancel marks the fleet update cancelled. The running goroutine
|
|
// observes the new status on its next pre-check and exits without
|
|
// dispatching further hosts. The currently-dispatched job is left to
|
|
// finish on its own — cancelling agent-side is out of scope for v1.
|
|
func (w *Worker) Cancel(ctx context.Context, fuID string) error {
|
|
return w.store.CancelFleetUpdate(ctx, fuID, time.Now().UTC())
|
|
}
|
|
|
|
// run is the per-host loop. Halts on first failure; emits one alert
|
|
// on transition.
|
|
func (w *Worker) run(ctx context.Context, fuID, userID, targetVersion string) {
|
|
w.targetVersion = targetVersion
|
|
|
|
for {
|
|
// Check the parent row's status — picks up Cancel.
|
|
fu, err := w.store.ActiveFleetUpdate(ctx)
|
|
if err != nil {
|
|
slog.Warn("fleetupdate: read active", "fu_id", fuID, "err", err)
|
|
return
|
|
}
|
|
if fu == nil || fu.ID != fuID {
|
|
// Cancelled, halted, or completed externally. Done.
|
|
return
|
|
}
|
|
|
|
pending, err := w.store.ListPendingFleetUpdateHosts(ctx, fuID)
|
|
if err != nil {
|
|
slog.Warn("fleetupdate: list pending", "fu_id", fuID, "err", err)
|
|
return
|
|
}
|
|
if len(pending) == 0 {
|
|
now := time.Now().UTC()
|
|
if err := w.store.CompleteFleetUpdate(ctx, fuID, now); err != nil {
|
|
slog.Warn("fleetupdate: complete", "fu_id", fuID, "err", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
next := pending[0]
|
|
w.processHost(ctx, fuID, userID, next)
|
|
}
|
|
}
|
|
|
|
// processHost handles one host slot. Marks it skipped, succeeded, or
|
|
// failed (and halts the fleet on failure).
|
|
func (w *Worker) processHost(ctx context.Context, fuID, userID string, slot store.FleetUpdateHost) {
|
|
hostID := slot.HostID
|
|
_ = w.store.SetFleetUpdateCurrentHost(ctx, fuID, hostID)
|
|
|
|
// Pre-flight: re-read the host. The dispatch path repeats most of
|
|
// these checks but doing them up-front lets us emit the right
|
|
// per-host status (skipped vs failed) without consuming a job row.
|
|
host, err := w.store.GetHost(ctx, hostID)
|
|
if err != nil || host == nil {
|
|
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "skipped", "host not found", "")
|
|
return
|
|
}
|
|
if host.AgentVersion != "" && host.AgentVersion == w.targetVersion {
|
|
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "skipped", "already at target version", "")
|
|
return
|
|
}
|
|
if !w.hub.Connected(hostID) {
|
|
reason := fmt.Sprintf("host went offline: %s", hostID)
|
|
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, "")
|
|
w.halt(ctx, fuID, reason)
|
|
return
|
|
}
|
|
|
|
// Dispatch.
|
|
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "running", "", "")
|
|
jobID, code, err := w.disp.DispatchUpdate(ctx, hostID, userID)
|
|
if err != nil || code != "" {
|
|
reason := dispatchErrorReason(code, err)
|
|
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, jobID)
|
|
w.halt(ctx, fuID, reason)
|
|
return
|
|
}
|
|
|
|
// Poll until the host's recorded agent_version matches target, or
|
|
// timeout.
|
|
deadline := time.Now().Add(w.hostTimeout)
|
|
for time.Now().Before(deadline) {
|
|
// Honour cancellation between polls.
|
|
fu, err := w.store.ActiveFleetUpdate(ctx)
|
|
if err == nil && (fu == nil || fu.ID != fuID) {
|
|
// Cancelled mid-host; leave the slot in 'running' for the
|
|
// admin to inspect. No further dispatches.
|
|
return
|
|
}
|
|
time.Sleep(w.pollPeriod)
|
|
h, err := w.store.GetHost(ctx, hostID)
|
|
if err == nil && h != nil && h.AgentVersion == w.targetVersion {
|
|
if err := w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "succeeded", "", jobID); err != nil {
|
|
slog.Warn("fleetupdate: set succeeded", "fu_id", fuID, "host_id", hostID, "err", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
reason := fmt.Sprintf("timeout waiting for %s to reach %s", hostID, w.targetVersion)
|
|
_ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, jobID)
|
|
w.halt(ctx, fuID, reason)
|
|
}
|
|
|
|
func (w *Worker) halt(ctx context.Context, fuID, reason string) {
|
|
now := time.Now().UTC()
|
|
if err := w.store.HaltFleetUpdate(ctx, fuID, reason, now); err != nil {
|
|
slog.Warn("fleetupdate: halt", "fu_id", fuID, "err", err)
|
|
}
|
|
if w.alerts != nil {
|
|
w.alerts.RaiseFleetUpdateHalted(ctx, fuID, reason, now)
|
|
}
|
|
}
|
|
|
|
func dispatchErrorReason(code string, err error) string {
|
|
if code != "" {
|
|
return "dispatch failed: " + code
|
|
}
|
|
if err != nil {
|
|
return err.Error()
|
|
}
|
|
return "dispatch failed"
|
|
}
|