alert: engine skeleton + event channels
This commit is contained in:
@@ -0,0 +1,134 @@
|
|||||||
|
// Package alert evaluates the hardcoded rule set and persists raises
|
||||||
|
// / acknowledges / resolves. Three event sources feed it:
|
||||||
|
// - JobFinishedEvent — pushed when a job lands a terminal state
|
||||||
|
// (the existing MarkJobFinished site)
|
||||||
|
// - HostOfflineEvent / HostOnlineEvent — pushed by the offline
|
||||||
|
// sweeper and by the ws hello handler
|
||||||
|
// - 60s ticker (internal) — drives stale-schedule + auto-resolve
|
||||||
|
//
|
||||||
|
// All output goes through store.RaiseOrTouch / Acknowledge / Resolve
|
||||||
|
// and the notification.Hub. The engine is one goroutine started at
|
||||||
|
// boot; non-blocking sends from hot paths.
|
||||||
|
package alert
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JobFinishedEvent carries everything the engine needs to evaluate
|
||||||
|
// the failed-X rules. Pushed via Engine.NotifyJobFinished from the
|
||||||
|
// MarkJobFinished site.
|
||||||
|
type JobFinishedEvent struct {
|
||||||
|
HostID string
|
||||||
|
JobID string
|
||||||
|
Kind string // backup | forget | prune | check | unlock | restore | diff
|
||||||
|
Status string // succeeded | failed | cancelled
|
||||||
|
When time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Engine evaluates hardcoded alert rules and dispatches via notification.Hub.
|
||||||
|
type Engine struct {
|
||||||
|
store *store.Store
|
||||||
|
hub *notification.Hub
|
||||||
|
|
||||||
|
jobs chan JobFinishedEvent
|
||||||
|
hostDown chan string // host_id
|
||||||
|
hostUp chan string
|
||||||
|
|
||||||
|
// agentOfflineFloor is the duration a host must be offline before
|
||||||
|
// we raise. Configurable for tests; default 15m.
|
||||||
|
agentOfflineFloor time.Duration
|
||||||
|
tickPeriod time.Duration
|
||||||
|
|
||||||
|
closeOnce sync.Once
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEngine builds the engine. agentOfflineFloor + tickPeriod default
|
||||||
|
// to 15min and 60s respectively when zero.
|
||||||
|
func NewEngine(st *store.Store, hub *notification.Hub) *Engine {
|
||||||
|
return &Engine{
|
||||||
|
store: st,
|
||||||
|
hub: hub,
|
||||||
|
jobs: make(chan JobFinishedEvent, 32),
|
||||||
|
hostDown: make(chan string, 32),
|
||||||
|
hostUp: make(chan string, 32),
|
||||||
|
agentOfflineFloor: 15 * time.Minute,
|
||||||
|
tickPeriod: 60 * time.Second,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run drives the event loop. Returns when ctx is done. Blocks; call in
|
||||||
|
// its own goroutine.
|
||||||
|
func (e *Engine) Run(ctx context.Context) {
|
||||||
|
t := time.NewTicker(e.tickPeriod)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.closeOnce.Do(func() { close(e.done) })
|
||||||
|
return
|
||||||
|
case ev := <-e.jobs:
|
||||||
|
e.handleJobFinished(ctx, ev)
|
||||||
|
case hostID := <-e.hostDown:
|
||||||
|
e.handleHostOffline(ctx, hostID)
|
||||||
|
case hostID := <-e.hostUp:
|
||||||
|
e.handleHostOnline(ctx, hostID)
|
||||||
|
case now := <-t.C:
|
||||||
|
e.tick(ctx, now)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyJobFinished is the hot-path hook called from MarkJobFinished's
|
||||||
|
// caller (ws.handler.dispatchAgentMessage). Non-blocking: drops on a
|
||||||
|
// full channel with a slog warning.
|
||||||
|
func (e *Engine) NotifyJobFinished(ev JobFinishedEvent) {
|
||||||
|
select {
|
||||||
|
case e.jobs <- ev:
|
||||||
|
default:
|
||||||
|
slog.Warn("alert: jobs channel full; dropping event", "kind", ev.Kind, "host_id", ev.HostID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyHostOffline notifies the engine that a host is offline.
|
||||||
|
func (e *Engine) NotifyHostOffline(hostID string) {
|
||||||
|
select {
|
||||||
|
case e.hostDown <- hostID:
|
||||||
|
default:
|
||||||
|
slog.Warn("alert: hostDown channel full; dropping", "host_id", hostID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyHostOnline notifies the engine that a host is online.
|
||||||
|
func (e *Engine) NotifyHostOnline(hostID string) {
|
||||||
|
select {
|
||||||
|
case e.hostUp <- hostID:
|
||||||
|
default:
|
||||||
|
slog.Warn("alert: hostUp channel full; dropping", "host_id", hostID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Placeholder method stubs for C2 implementation
|
||||||
|
func (e *Engine) handleJobFinished(ctx context.Context, ev JobFinishedEvent) {
|
||||||
|
// Implemented in C2
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) handleHostOffline(ctx context.Context, hostID string) {
|
||||||
|
// Implemented in C2
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) handleHostOnline(ctx context.Context, hostID string) {
|
||||||
|
// Implemented in C2
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) tick(ctx context.Context, now time.Time) {
|
||||||
|
// Implemented in C2
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user