Files
restic-manager/internal/alert/engine.go
T

280 lines
9.3 KiB
Go

// Package alert evaluates the hardcoded rule set and persists raises
// / acknowledges / resolves. Three event sources feed it:
// - JobFinishedEvent — pushed when a job lands a terminal state
// (the existing MarkJobFinished site)
// - HostOfflineEvent / HostOnlineEvent — pushed by the offline
// sweeper and by the ws hello handler
// - 60s ticker (internal) — drives stale-schedule + auto-resolve
//
// All output goes through store.RaiseOrTouch / Acknowledge / Resolve
// and the notification.Hub. The engine is one goroutine started at
// boot; non-blocking sends from hot paths.
package alert
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// staleBackupThreshold is how long an intermittent host may go without
// a successful backup before we raise a stale_schedule alert. Global
// constant for v1 (may become per-host later). Only intermittent hosts
// are evaluated — always-on hosts' stale_schedule stays a no-op.
const staleBackupThreshold = 7 * 24 * time.Hour
// JobFinishedEvent carries everything the engine needs to evaluate
// the failed-X rules. Pushed via Engine.NotifyJobFinished from the
// MarkJobFinished site.
type JobFinishedEvent struct {
HostID string
JobID string
Kind string // backup | forget | prune | check | unlock | restore | diff
Status string // succeeded | failed | cancelled
SourceGroupID string // dedup key for backup/forget/prune/check; empty otherwise
When time.Time
}
// Engine evaluates hardcoded alert rules and dispatches via notification.Hub.
type Engine struct {
store *store.Store
hub *notification.Hub
jobs chan JobFinishedEvent
hostDown chan string // host_id
hostUp chan string
// agentOfflineFloor is the duration a host must be offline before
// we raise. Configurable for tests; default 15m.
agentOfflineFloor time.Duration
tickPeriod time.Duration
closeOnce sync.Once
done chan struct{}
}
// NewEngine builds the engine. agentOfflineFloor + tickPeriod default
// to 15min and 60s respectively when zero.
func NewEngine(st *store.Store, hub *notification.Hub) *Engine {
return &Engine{
store: st,
hub: hub,
jobs: make(chan JobFinishedEvent, 32),
hostDown: make(chan string, 32),
hostUp: make(chan string, 32),
agentOfflineFloor: 15 * time.Minute,
tickPeriod: 60 * time.Second,
done: make(chan struct{}),
}
}
// Run drives the event loop. Returns when ctx is done. Blocks; call in
// its own goroutine.
func (e *Engine) Run(ctx context.Context) {
t := time.NewTicker(e.tickPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
e.closeOnce.Do(func() { close(e.done) })
return
case ev := <-e.jobs:
e.handleJobFinished(ctx, ev)
case hostID := <-e.hostDown:
e.handleHostOffline(ctx, hostID)
case hostID := <-e.hostUp:
e.handleHostOnline(ctx, hostID)
case now := <-t.C:
e.tick(ctx, now)
}
}
}
// NotifyJobFinished is the hot-path hook called from MarkJobFinished's
// caller (ws.handler.dispatchAgentMessage). Non-blocking: drops on a
// full channel with a slog warning.
func (e *Engine) NotifyJobFinished(ev JobFinishedEvent) {
select {
case e.jobs <- ev:
default:
slog.Warn("alert: jobs channel full; dropping event", "kind", ev.Kind, "host_id", ev.HostID)
}
}
// NotifyHostOffline notifies the engine that a host is offline.
func (e *Engine) NotifyHostOffline(hostID string) {
select {
case e.hostDown <- hostID:
default:
slog.Warn("alert: hostDown channel full; dropping", "host_id", hostID)
}
}
// NotifyHostOnline notifies the engine that a host is online.
func (e *Engine) NotifyHostOnline(hostID string) {
select {
case e.hostUp <- hostID:
default:
slog.Warn("alert: hostUp channel full; dropping", "host_id", hostID)
}
}
func (e *Engine) handleJobFinished(ctx context.Context, ev JobFinishedEvent) {
// Determine which kind/severity pair this job maps to. Jobs not
// listed here (init, unlock, restore, diff) produce no alerts in v1.
var kind, severity string
switch ev.Kind {
case "backup":
kind, severity = KindBackupFailed, "warning"
case "forget":
kind, severity = KindForgetFailed, "warning"
case "prune":
kind, severity = KindPruneFailed, "warning"
case "check":
kind, severity = KindCheckFailed, "critical"
default:
return
}
// dedupKey scopes the alert to a specific subject. For backups it's
// the source-group id (each group = its own restic run = its own
// failure surface). forget/prune/check are repo-scoped — leave the
// key empty so we get one alert per host per kind, matching the
// "is this repo healthy?" mental model.
dedupKey := ""
if ev.Kind == "backup" {
dedupKey = ev.SourceGroupID
}
switch ev.Status {
case "failed":
e.raiseAndNotify(ctx, ev.HostID, kind, dedupKey, severity,
fmt.Sprintf("%s job %s failed", ev.Kind, ev.JobID), ev.When)
case "succeeded":
e.resolveAndNotify(ctx, ev.HostID, kind, dedupKey, ev.When)
if ev.Kind == "backup" {
// A fresh backup clears staleness for intermittent hosts.
e.resolveAndNotify(ctx, ev.HostID, KindStaleSchedule, "", ev.When)
}
}
}
func (e *Engine) handleHostOffline(ctx context.Context, hostID string) {
host, err := e.store.GetHost(ctx, hostID)
if err != nil {
return
}
// Intermittent hosts (laptops) legitimately disappear — never raise
// agent_offline for them. The stale_schedule sweep in tick() is the
// only staleness signal for these hosts.
if !host.AlwaysOn {
return
}
// Apply the 15-min floor — raise only when last_seen_at is older
// than agentOfflineFloor. A nil last_seen_at (host enrolled but
// never connected) is treated as "now" so we don't raise
// immediately on enrolment.
if host.LastSeenAt == nil {
return
}
if time.Since(*host.LastSeenAt) < e.agentOfflineFloor {
return
}
e.raiseAndNotify(ctx, hostID, KindAgentOffline, "", "warning",
fmt.Sprintf("Agent offline for %s (threshold %s)",
roundDur(time.Since(*host.LastSeenAt)), e.agentOfflineFloor),
time.Now().UTC())
}
func (e *Engine) handleHostOnline(ctx context.Context, hostID string) {
e.resolveAndNotify(ctx, hostID, KindAgentOffline, "", time.Now().UTC())
}
// tick is the 60-second sweep. Responsibilities:
// 1. Re-evaluate agent_offline for every offline host that may have
// crossed the floor between explicit events.
// 2. Stale-schedule detection — declared in the spec but intentionally
// left as a no-op in v1. The precise "expected to have fired but
// didn't" trigger requires a store helper that lands in a later
// task. The KindStaleSchedule constant is exported so UI code can
// reference the tag string today.
func (e *Engine) tick(ctx context.Context, now time.Time) {
// User-management cleanup piggy-backed here for now. Setup tokens
// have a 1h expiry; the alert engine tick is the cheapest existing
// 60s loop. If more housekeeping queries appear, extract a
// dedicated maintenance loop.
if _, err := e.store.CleanupExpiredSetupTokens(ctx, now); err != nil {
slog.Warn("alert: cleanup expired setup tokens", "err", err)
}
if _, err := e.store.CleanupExpiredOIDCState(ctx, now.Add(-5*time.Minute)); err != nil {
slog.Warn("alert: cleanup expired oidc state", "err", err)
}
hosts, err := e.store.ListHosts(ctx)
if err != nil {
slog.Warn("alert: tick list hosts", "err", err)
return
}
for _, h := range hosts {
// Intermittent hosts: suppress agent_offline entirely; instead
// raise stale_schedule when they have gone too long with no
// successful backup AND they have at least one enabled schedule
// to be measured against. A nil LastBackupAt (never backed up)
// has no baseline — onboarding/repo_status covers that case.
if !h.AlwaysOn {
if h.LastBackupAt == nil {
continue
}
if now.Sub(*h.LastBackupAt) < staleBackupThreshold {
continue
}
hasEnabled, err := e.hostHasEnabledSchedule(ctx, h.ID)
if err != nil || !hasEnabled {
continue
}
e.raiseAndNotify(ctx, h.ID, KindStaleSchedule, "", "warning",
fmt.Sprintf("No backup in %s (threshold %s)",
roundDur(now.Sub(*h.LastBackupAt)), staleBackupThreshold), now)
continue
}
// Always-on hosts: existing agent_offline re-evaluation.
if h.Status != "offline" || h.LastSeenAt == nil {
continue
}
if now.Sub(*h.LastSeenAt) >= e.agentOfflineFloor {
e.raiseAndNotify(ctx, h.ID, KindAgentOffline, "", "warning",
fmt.Sprintf("Agent offline for %s (threshold %s)",
roundDur(now.Sub(*h.LastSeenAt)), e.agentOfflineFloor), now)
}
}
}
// roundDur returns a human-readable duration string, rounding to the
// nearest minute. Durations under a minute are reported as "less than
// a minute".
func roundDur(d time.Duration) string {
if d < time.Minute {
return "less than a minute"
}
return d.Round(time.Minute).String()
}
// hostHasEnabledSchedule reports whether the host has at least one
// enabled backup schedule — the precondition for a stale_schedule
// alert (no schedule = no backup expectation to measure against).
func (e *Engine) hostHasEnabledSchedule(ctx context.Context, hostID string) (bool, error) {
schedules, err := e.store.ListSchedulesByHost(ctx, hostID)
if err != nil {
return false, err
}
for _, sc := range schedules {
if sc.Enabled {
return true, nil
}
}
return false, nil
}