// Package fleetupdate drives a rolling, sequential agent self-update // over a list of hosts. One worker goroutine per Start() call (gated // at the store layer to at-most-one-running-fleet-update). package fleetupdate import ( "context" "errors" "fmt" "log/slog" "time" "github.com/oklog/ulid/v2" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) // Hub is the slim "is this host connected?" surface. type Hub interface { Connected(hostID string) bool } // Dispatcher sends one command.update envelope. The implementer also // creates the jobs row, writes audit, and registers with the update // watcher. Pre-checks are the dispatcher's responsibility — the worker // passes through whatever error it returns. type Dispatcher interface { DispatchUpdate(ctx context.Context, hostID string, actorUserID string) (jobID string, code string, err error) } // AlertRaiser is the slim view of the alert engine's host-less raise // path. Used to emit fleet_update_halted on first failure. type AlertRaiser interface { RaiseFleetUpdateHalted(ctx context.Context, fleetUpdateID, reason string, when time.Time) } // Worker is the long-lived fleet-update orchestrator. There is at most // one *running* fleet update at a time (enforced by the store). type Worker struct { store *store.Store hub Hub disp Dispatcher alerts AlertRaiser // targetVersion is the version every dispatched agent is expected // to come back with. Captured at Start time to avoid drift. targetVersion string // pollPeriod controls the cadence at which the worker re-reads the // host row to check for the version transition. Exposed for tests. pollPeriod time.Duration // hostTimeout bounds how long the worker waits for one host to // reach the target version before halting. hostTimeout time.Duration } // NewWorker builds an unstarted worker. targetVersion is set on each // Start call; the values here are defaults. func NewWorker(st *store.Store, hub Hub, disp Dispatcher, alerts AlertRaiser) *Worker { return &Worker{ store: st, hub: hub, disp: disp, alerts: alerts, pollPeriod: 1 * time.Second, hostTimeout: 95 * time.Second, } } // Start creates the parent + child rows, then spawns the per-host // worker goroutine. Returns the new fleet_update_id on success. // store.ErrFleetUpdateRunning bubbles up unchanged. func (w *Worker) Start(ctx context.Context, userID, targetVersion string, hostIDs []string) (string, error) { if userID == "" || targetVersion == "" { return "", errors.New("fleetupdate: userID and targetVersion required") } if len(hostIDs) == 0 { return "", errors.New("fleetupdate: at least one host required") } fuID := ulid.Make().String() now := time.Now().UTC() if err := w.store.CreateFleetUpdate(ctx, store.FleetUpdate{ ID: fuID, StartedAt: now, StartedByUserID: userID, TargetVersion: targetVersion, Status: "running", }, hostIDs); err != nil { return "", err } // The goroutine outlives the request that started it; carry a // detached context so an HTTP-handler ctx cancel doesn't abort // the long roll. bg := context.WithoutCancel(ctx) go w.run(bg, fuID, userID, targetVersion) return fuID, nil } // Cancel marks the fleet update cancelled. The running goroutine // observes the new status on its next pre-check and exits without // dispatching further hosts. The currently-dispatched job is left to // finish on its own — cancelling agent-side is out of scope for v1. func (w *Worker) Cancel(ctx context.Context, fuID string) error { return w.store.CancelFleetUpdate(ctx, fuID, time.Now().UTC()) } // run is the per-host loop. Halts on first failure; emits one alert // on transition. func (w *Worker) run(ctx context.Context, fuID, userID, targetVersion string) { w.targetVersion = targetVersion for { // Check the parent row's status — picks up Cancel. fu, err := w.store.ActiveFleetUpdate(ctx) if err != nil { slog.Warn("fleetupdate: read active", "fu_id", fuID, "err", err) return } if fu == nil || fu.ID != fuID { // Cancelled, halted, or completed externally. Done. return } pending, err := w.store.ListPendingFleetUpdateHosts(ctx, fuID) if err != nil { slog.Warn("fleetupdate: list pending", "fu_id", fuID, "err", err) return } if len(pending) == 0 { now := time.Now().UTC() if err := w.store.CompleteFleetUpdate(ctx, fuID, now); err != nil { slog.Warn("fleetupdate: complete", "fu_id", fuID, "err", err) } return } next := pending[0] w.processHost(ctx, fuID, userID, next) } } // processHost handles one host slot. Marks it skipped, succeeded, or // failed (and halts the fleet on failure). func (w *Worker) processHost(ctx context.Context, fuID, userID string, slot store.FleetUpdateHost) { hostID := slot.HostID _ = w.store.SetFleetUpdateCurrentHost(ctx, fuID, hostID) // Pre-flight: re-read the host. The dispatch path repeats most of // these checks but doing them up-front lets us emit the right // per-host status (skipped vs failed) without consuming a job row. host, err := w.store.GetHost(ctx, hostID) if err != nil || host == nil { _ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "skipped", "host not found", "") return } if host.AgentVersion != "" && host.AgentVersion == w.targetVersion { _ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "skipped", "already at target version", "") return } if !w.hub.Connected(hostID) { reason := fmt.Sprintf("host went offline: %s", hostID) _ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, "") w.halt(ctx, fuID, reason) return } // Dispatch. _ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "running", "", "") jobID, code, err := w.disp.DispatchUpdate(ctx, hostID, userID) if err != nil || code != "" { reason := dispatchErrorReason(code, err) _ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, jobID) w.halt(ctx, fuID, reason) return } // Poll until the host's recorded agent_version matches target, or // timeout. deadline := time.Now().Add(w.hostTimeout) for time.Now().Before(deadline) { // Honour cancellation between polls. fu, err := w.store.ActiveFleetUpdate(ctx) if err == nil && (fu == nil || fu.ID != fuID) { // Cancelled mid-host; leave the slot in 'running' for the // admin to inspect. No further dispatches. return } time.Sleep(w.pollPeriod) h, err := w.store.GetHost(ctx, hostID) if err == nil && h != nil && h.AgentVersion == w.targetVersion { if err := w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "succeeded", "", jobID); err != nil { slog.Warn("fleetupdate: set succeeded", "fu_id", fuID, "host_id", hostID, "err", err) } return } } reason := fmt.Sprintf("timeout waiting for %s to reach %s", hostID, w.targetVersion) _ = w.store.SetFleetUpdateHostStatus(ctx, fuID, hostID, "failed", reason, jobID) w.halt(ctx, fuID, reason) } func (w *Worker) halt(ctx context.Context, fuID, reason string) { now := time.Now().UTC() if err := w.store.HaltFleetUpdate(ctx, fuID, reason, now); err != nil { slog.Warn("fleetupdate: halt", "fu_id", fuID, "err", err) } if w.alerts != nil { w.alerts.RaiseFleetUpdateHalted(ctx, fuID, reason, now) } } func dispatchErrorReason(code string, err error) string { if code != "" { return "dispatch failed: " + code } if err != nil { return err.Error() } return "dispatch failed" }