feat(alerts): per-source-group dedup so two failing backups produce two alerts

Until now the open-alert key was (host_id, kind, resolved_at IS NULL).
A host with two source groups both failing collapsed onto one
backup_failed row — second failure bumped last_seen_at and
overwrote the message but never re-fan-out. Operators saw one
alert that appeared to flap, not two distinct broken things.

Schema changes (column-level ALTER, no rebuild):

- 0015 jobs.source_group_id (FK → source_groups, ON DELETE SET NULL,
  index). Populated for backup jobs in CreateJob.
- 0016 alerts.dedup_key (NOT NULL DEFAULT ''). The old alerts_open
  partial index gets dropped and replaced with a UNIQUE partial
  index on (host_id, kind, dedup_key) WHERE resolved_at IS NULL —
  the index is now the actual dedup primitive.

Plumbing:

- RaiseOrTouch / AutoResolve / Alert struct gain dedup_key.
- engine.JobFinishedEvent gains SourceGroupID; handleJobFinished
  passes it through for backup_failed only (forget/prune/check stay
  repo-scoped with key='').
- ws.handler reads SourceGroupID off the freshly-loaded job row.
- dispatchJobWithPayload gains a *string sourceGroupID arg; the
  per-group Run-now path and schedule.fire path pass &g.ID.

Test coverage: TestRaiseOrTouchDedupsPerSourceGroup proves two
distinct groups produce two distinct open alerts and that resolving
one does not auto-resolve the other.

Dev tool: cmd/_fake_alert gains -dedup-key flag.
This commit is contained in:
2026-05-04 22:58:29 +01:00
parent 9d7a714102
commit 350be3f19d
15 changed files with 214 additions and 95 deletions
+31 -18
View File
@@ -21,11 +21,16 @@ type AlertFilter struct {
}
// RaiseOrTouch implements the dedup + last_seen_at bump pattern. If
// an alert with (host_id, kind, resolved_at IS NULL) already exists,
// it touches last_seen_at + message and returns (id, false). Otherwise
// inserts a fresh row and returns (id, true). Caller fires a
// notification only when didRaise=true.
func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, message string, when time.Time) (id string, didRaise bool, err error) {
// an alert with (host_id, kind, dedup_key, resolved_at IS NULL)
// already exists, it touches last_seen_at + message and returns
// (id, false). Otherwise inserts a fresh row and returns (id, true).
// Caller fires a notification only when didRaise=true.
//
// dedupKey is the source-group id for backup/forget/prune/check
// failures (so two failing groups on the same host produce two open
// alerts), and the empty string for one-per-host alerts like
// agent_offline / stale_schedule.
func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, dedupKey, severity, message string, when time.Time) (id string, didRaise bool, err error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return "", false, fmt.Errorf("store: begin: %w", err)
@@ -33,8 +38,10 @@ func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, messag
defer func() { _ = tx.Rollback() }()
row := tx.QueryRowContext(ctx,
`SELECT id FROM alerts WHERE host_id = ? AND kind = ? AND resolved_at IS NULL LIMIT 1`,
hostID, kind)
`SELECT id FROM alerts
WHERE host_id = ? AND kind = ? AND dedup_key = ? AND resolved_at IS NULL
LIMIT 1`,
hostID, kind, dedupKey)
var existing string
switch err := row.Scan(&existing); {
case err == nil:
@@ -57,9 +64,9 @@ func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, messag
id = ulid.Make().String()
whenStr := when.UTC().Format(time.RFC3339Nano)
_, err = tx.ExecContext(ctx,
`INSERT INTO alerts (id, host_id, kind, severity, message, created_at, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
id, hostID, kind, severity, message, whenStr, whenStr)
`INSERT INTO alerts (id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
id, hostID, kind, dedupKey, severity, message, whenStr, whenStr)
if err != nil {
return "", false, fmt.Errorf("store: insert alert: %w", err)
}
@@ -127,14 +134,20 @@ func (s *Store) Resolve(ctx context.Context, id string, when time.Time) error {
return nil
}
// AutoResolve closes every open alert for the (host_id, kind) pair.
// AutoResolve closes the open alert for (host_id, kind, dedup_key).
// Used by the engine when a rule's underlying condition clears (e.g.
// next backup succeeded so backup_failed clears).
func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time.Time) error {
// next backup succeeded for the same source group so backup_failed
// clears). Pass dedupKey="" for one-per-host alerts (agent_offline).
//
// Closes only the dedup-key-matching row, not every open alert of
// the same kind on the host — distinct source groups now have
// distinct rows and a recovery in one shouldn't auto-resolve the
// others.
func (s *Store) AutoResolve(ctx context.Context, hostID, kind, dedupKey string, when time.Time) error {
_, err := s.db.ExecContext(ctx,
`UPDATE alerts SET resolved_at = ?
WHERE host_id = ? AND kind = ? AND resolved_at IS NULL`,
when.UTC().Format(time.RFC3339Nano), hostID, kind)
WHERE host_id = ? AND kind = ? AND dedup_key = ? AND resolved_at IS NULL`,
when.UTC().Format(time.RFC3339Nano), hostID, kind, dedupKey)
if err != nil {
return fmt.Errorf("store: auto-resolve: %w", err)
}
@@ -145,7 +158,7 @@ func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time.
// GetAlert reads one row.
func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) {
row := s.db.QueryRowContext(ctx,
`SELECT id, host_id, kind, severity, message, created_at, last_seen_at,
`SELECT id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at,
acknowledged_at, acknowledged_by, resolved_at
FROM alerts WHERE id = ?`, id)
return scanAlert(row.Scan)
@@ -153,7 +166,7 @@ func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) {
// ListAlerts is the filtered list. Sort: open-first, then by created_at desc.
func (s *Store) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) {
q := `SELECT id, host_id, kind, severity, message, created_at, last_seen_at,
q := `SELECT id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at,
acknowledged_at, acknowledged_by, resolved_at FROM alerts`
conds := []string{}
args := []any{}
@@ -209,7 +222,7 @@ func scanAlert(scan func(...any) error) (*Alert, error) {
var a Alert
var hostID, lastSeen, ackedAt, ackedBy, resolvedAt sql.NullString
var createdAt string
if err := scan(&a.ID, &hostID, &a.Kind, &a.Severity, &a.Message,
if err := scan(&a.ID, &hostID, &a.Kind, &a.DedupKey, &a.Severity, &a.Message,
&createdAt, &lastSeen, &ackedAt, &ackedBy, &resolvedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
+41 -9
View File
@@ -33,7 +33,7 @@ func TestRaiseOrTouchInsertsThenTouches(t *testing.T) {
ctx := context.Background()
t0 := time.Now().UTC()
id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning",
id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning",
"Backup failed: 401", t0)
if err != nil {
t.Fatalf("first raise: %v", err)
@@ -47,7 +47,7 @@ func TestRaiseOrTouchInsertsThenTouches(t *testing.T) {
// Second call within the same open window should touch, not insert.
t1 := t0.Add(60 * time.Second)
id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning",
id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning",
"Backup failed: 401 (still)", t1)
if err != nil {
t.Fatalf("touch: %v", err)
@@ -78,7 +78,7 @@ func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) {
ctx := context.Background()
t0 := time.Now().UTC()
id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "first", t0)
id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "first", t0)
if err != nil {
t.Fatalf("raise: %v", err)
}
@@ -86,7 +86,7 @@ func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) {
t.Fatalf("resolve: %v", err)
}
id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "second", t0.Add(2*time.Minute))
id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "second", t0.Add(2*time.Minute))
if err != nil {
t.Fatalf("re-raise: %v", err)
}
@@ -98,6 +98,38 @@ func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) {
}
}
// Two source groups failing on the same host produce two distinct
// open alerts (not one collapsed). Pre-dedup-key, this would have
// touched the existing row and silently dropped the second failure.
func TestRaiseOrTouchDedupsPerSourceGroup(t *testing.T) {
t.Parallel()
st, hostID := newTestStoreWithHost(t)
ctx := context.Background()
t0 := time.Now().UTC()
idA, didA, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "group-a",
"warning", "group A failed", t0)
if err != nil || !didA {
t.Fatalf("group A raise: id=%q didRaise=%v err=%v", idA, didA, err)
}
idB, didB, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "group-b",
"warning", "group B failed", t0.Add(time.Second))
if err != nil || !didB {
t.Fatalf("group B raise: id=%q didRaise=%v err=%v", idB, didB, err)
}
if idA == idB {
t.Fatalf("expected distinct alert ids per source group, got %q twice", idA)
}
// Resolving group A must not auto-resolve group B.
if err := st.AutoResolve(ctx, hostID, "backup_failed", "group-a", t0.Add(2*time.Second)); err != nil {
t.Fatalf("auto-resolve A: %v", err)
}
gotB, _ := st.GetAlert(ctx, idB)
if gotB.ResolvedAt != nil {
t.Errorf("group B got auto-resolved by group A's recovery; resolved_at=%v", gotB.ResolvedAt)
}
}
func TestAcknowledgeKeepsAlertOpen(t *testing.T) {
t.Parallel()
st, hostID := newTestStoreWithHost(t)
@@ -112,7 +144,7 @@ func TestAcknowledgeKeepsAlertOpen(t *testing.T) {
t.Fatalf("create user: %v", err)
}
id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", time.Now().UTC())
id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "m", time.Now().UTC())
if err != nil {
t.Fatalf("raise: %v", err)
}
@@ -140,8 +172,8 @@ func TestAutoResolveClearsOpenAlerts(t *testing.T) {
ctx := context.Background()
t0 := time.Now().UTC()
id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", t0)
if err := st.AutoResolve(ctx, hostID, "backup_failed", t0.Add(time.Minute)); err != nil {
id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "m", t0)
if err := st.AutoResolve(ctx, hostID, "backup_failed", "", t0.Add(time.Minute)); err != nil {
t.Fatalf("auto-resolve: %v", err)
}
got, _ := st.GetAlert(ctx, id)
@@ -157,8 +189,8 @@ func TestListAlertsFilters(t *testing.T) {
t0 := time.Now().UTC()
// One open warning + one resolved info.
_, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "open", t0)
id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "info", "done", t0)
_, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "open", t0)
id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "", "info", "done", t0)
_ = st.Resolve(ctx, id2, t0.Add(time.Minute))
open, err := st.ListAlerts(ctx, AlertFilter{Status: "open"})
+32 -20
View File
@@ -11,19 +11,20 @@ import (
// Job mirrors the jobs table.
type Job struct {
ID string
HostID string
Kind string
Status string
ScheduledID *string
ActorKind string // user|schedule|system
ActorID *string
StartedAt *time.Time
FinishedAt *time.Time
ExitCode *int
Stats json.RawMessage
Error *string
CreatedAt time.Time
ID string
HostID string
Kind string
Status string
ScheduledID *string
SourceGroupID *string // populated for backup jobs; alert engine dedup key
ActorKind string // user|schedule|system
ActorID *string
StartedAt *time.Time
FinishedAt *time.Time
ExitCode *int
Stats json.RawMessage
Error *string
CreatedAt time.Time
}
// CreateJob inserts a queued job. The agent will mark it running
@@ -32,10 +33,11 @@ type Job struct {
// operator-driven run-now.
func (s *Store) CreateJob(ctx context.Context, j Job) error {
_, err := s.db.ExecContext(ctx,
`INSERT INTO jobs (id, host_id, kind, status, scheduled_id, actor_kind, actor_id, created_at)
VALUES (?, ?, ?, 'queued', ?, ?, ?, ?)`,
`INSERT INTO jobs (id, host_id, kind, status, scheduled_id, source_group_id, actor_kind, actor_id, created_at)
VALUES (?, ?, ?, 'queued', ?, ?, ?, ?, ?)`,
j.ID, j.HostID, j.Kind,
nullable(j.ScheduledID), j.ActorKind, nullable(j.ActorID),
nullable(j.ScheduledID), nullable(j.SourceGroupID),
j.ActorKind, nullable(j.ActorID),
j.CreatedAt.UTC().Format(time.RFC3339Nano))
if err != nil {
return fmt.Errorf("store: create job: %w", err)
@@ -139,12 +141,13 @@ func (s *Store) ListJobLogs(ctx context.Context, jobID string, afterSeq int64, l
// GetJob returns a job row.
func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
row := s.db.QueryRowContext(ctx,
`SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
`SELECT id, host_id, kind, status, scheduled_id, source_group_id, actor_kind, actor_id,
started_at, finished_at, exit_code, stats, error, created_at
FROM jobs WHERE id = ?`, id)
var (
j Job
schedID sql.NullString
groupID sql.NullString
actorID sql.NullString
startedAt sql.NullString
finishedAt sql.NullString
@@ -153,7 +156,7 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
errMsg sql.NullString
createdAt string
)
if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID,
if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, &groupID,
&j.ActorKind, &actorID, &startedAt, &finishedAt,
&exitCode, &stats, &errMsg, &createdAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
@@ -165,6 +168,10 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
s := schedID.String
j.ScheduledID = &s
}
if groupID.Valid {
s := groupID.String
j.SourceGroupID = &s
}
if actorID.Valid {
s := actorID.String
j.ActorID = &s
@@ -201,7 +208,7 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
// would re-fire on the next tick while the first is still running.
func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, error) {
row := s.db.QueryRowContext(ctx,
`SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
`SELECT id, host_id, kind, status, scheduled_id, source_group_id, actor_kind, actor_id,
started_at, finished_at, exit_code, stats, error, created_at
FROM jobs
WHERE host_id = ? AND kind = ?
@@ -210,6 +217,7 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job,
var (
j Job
schedID sql.NullString
groupID sql.NullString
actorID sql.NullString
startedAt sql.NullString
finishedAt sql.NullString
@@ -218,7 +226,7 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job,
errMsg sql.NullString
createdAt string
)
if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID,
if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, &groupID,
&j.ActorKind, &actorID, &startedAt, &finishedAt,
&exitCode, &stats, &errMsg, &createdAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
@@ -230,6 +238,10 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job,
s := schedID.String
j.ScheduledID = &s
}
if groupID.Valid {
s := groupID.String
j.SourceGroupID = &s
}
if actorID.Valid {
s := actorID.String
j.ActorID = &s
@@ -0,0 +1,16 @@
-- 0015_jobs_source_group_id.sql
--
-- Add source_group_id to jobs so the alert engine can dedup
-- backup/forget/prune/check failures per source group rather than
-- collapsing every failed thing on a host onto one open alert per
-- kind. Backup jobs always have one set (each group is its own
-- restic invocation); forget/prune/check are repo-scoped and leave
-- it NULL.
--
-- Column-level ALTER is safe under foreign_keys=ON (CLAUDE.md). The
-- existing rebuild pattern in 0012 should not be repeated here.
ALTER TABLE jobs ADD COLUMN source_group_id TEXT
REFERENCES source_groups(id) ON DELETE SET NULL;
CREATE INDEX jobs_source_group_id ON jobs(source_group_id);
@@ -0,0 +1,23 @@
-- 0016_alerts_dedup_key.sql
--
-- Widen the open-alert uniqueness key from (host_id, kind) to
-- (host_id, kind, dedup_key) so two distinct failing source groups
-- on the same host produce two open alerts instead of collapsing
-- onto one. dedup_key is the source_group_id for
-- backup/forget/prune/check failures and the empty string for
-- agent_offline / stale_schedule (one-per-host alerts).
--
-- The original alerts_open partial index keyed on host_id only.
-- That was a coarse "is this host happy?" lookup; we replace it
-- with a proper partial unique index that the dedup logic relies
-- on. NOT NULL DEFAULT '' so existing rows backfill cleanly.
--
-- Column-level ALTER is safe under foreign_keys=ON.
ALTER TABLE alerts ADD COLUMN dedup_key TEXT NOT NULL DEFAULT '';
DROP INDEX IF EXISTS alerts_open;
CREATE UNIQUE INDEX alerts_open_unique
ON alerts(host_id, kind, dedup_key)
WHERE resolved_at IS NULL;
+1
View File
@@ -198,6 +198,7 @@ type Alert struct {
ID string
HostID *string
Kind string
DedupKey string // empty for one-per-host alerts; source-group id otherwise
Severity string
Message string
CreatedAt time.Time