From a45c80188407f9296666b04f7ea937a1a655f8d0 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 4 May 2026 22:58:29 +0100 Subject: [PATCH] feat(alerts): per-source-group dedup so two failing backups produce two alerts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Until now the open-alert key was (host_id, kind, resolved_at IS NULL). A host with two source groups both failing collapsed onto one backup_failed row — second failure bumped last_seen_at and overwrote the message but never re-fan-out. Operators saw one alert that appeared to flap, not two distinct broken things. Schema changes (column-level ALTER, no rebuild): - 0015 jobs.source_group_id (FK → source_groups, ON DELETE SET NULL, index). Populated for backup jobs in CreateJob. - 0016 alerts.dedup_key (NOT NULL DEFAULT ''). The old alerts_open partial index gets dropped and replaced with a UNIQUE partial index on (host_id, kind, dedup_key) WHERE resolved_at IS NULL — the index is now the actual dedup primitive. Plumbing: - RaiseOrTouch / AutoResolve / Alert struct gain dedup_key. - engine.JobFinishedEvent gains SourceGroupID; handleJobFinished passes it through for backup_failed only (forget/prune/check stay repo-scoped with key=''). - ws.handler reads SourceGroupID off the freshly-loaded job row. - dispatchJobWithPayload gains a *string sourceGroupID arg; the per-group Run-now path and schedule.fire path pass &g.ID. Test coverage: TestRaiseOrTouchDedupsPerSourceGroup proves two distinct groups produce two distinct open alerts and that resolving one does not auto-resolve the other. Dev tool: cmd/_fake_alert gains -dedup-key flag. --- internal/alert/engine.go | 30 +++++++---- internal/alert/rules.go | 22 ++++---- internal/server/http/jobs.go | 21 +++++--- internal/server/http/maintenance_dispatch.go | 6 +-- internal/server/http/repo_ops.go | 6 +-- internal/server/http/run_group.go | 2 +- internal/server/http/schedule_push.go | 14 ++--- internal/server/http/ui_alerts_test.go | 2 +- internal/server/ws/handler.go | 15 ++++-- internal/store/alerts.go | 49 ++++++++++------- internal/store/alerts_test.go | 50 ++++++++++++++---- internal/store/jobs.go | 52 ++++++++++++------- .../migrations/0015_jobs_source_group_id.sql | 16 ++++++ .../migrations/0016_alerts_dedup_key.sql | 23 ++++++++ internal/store/types.go | 1 + 15 files changed, 214 insertions(+), 95 deletions(-) create mode 100644 internal/store/migrations/0015_jobs_source_group_id.sql create mode 100644 internal/store/migrations/0016_alerts_dedup_key.sql diff --git a/internal/alert/engine.go b/internal/alert/engine.go index 2ef67db..4c2004a 100644 --- a/internal/alert/engine.go +++ b/internal/alert/engine.go @@ -26,11 +26,12 @@ import ( // the failed-X rules. Pushed via Engine.NotifyJobFinished from the // MarkJobFinished site. type JobFinishedEvent struct { - HostID string - JobID string - Kind string // backup | forget | prune | check | unlock | restore | diff - Status string // succeeded | failed | cancelled - When time.Time + HostID string + JobID string + Kind string // backup | forget | prune | check | unlock | restore | diff + Status string // succeeded | failed | cancelled + SourceGroupID string // dedup key for backup/forget/prune/check; empty otherwise + When time.Time } // Engine evaluates hardcoded alert rules and dispatches via notification.Hub. @@ -133,12 +134,21 @@ func (e *Engine) handleJobFinished(ctx context.Context, ev JobFinishedEvent) { default: return } + // dedupKey scopes the alert to a specific subject. For backups it's + // the source-group id (each group = its own restic run = its own + // failure surface). forget/prune/check are repo-scoped — leave the + // key empty so we get one alert per host per kind, matching the + // "is this repo healthy?" mental model. + dedupKey := "" + if ev.Kind == "backup" { + dedupKey = ev.SourceGroupID + } switch ev.Status { case "failed": - e.raiseAndNotify(ctx, ev.HostID, kind, severity, + e.raiseAndNotify(ctx, ev.HostID, kind, dedupKey, severity, fmt.Sprintf("%s job %s failed", ev.Kind, ev.JobID), ev.When) case "succeeded": - e.resolveAndNotify(ctx, ev.HostID, kind, ev.When) + e.resolveAndNotify(ctx, ev.HostID, kind, dedupKey, ev.When) } } @@ -157,14 +167,14 @@ func (e *Engine) handleHostOffline(ctx context.Context, hostID string) { if time.Since(*host.LastSeenAt) < e.agentOfflineFloor { return } - e.raiseAndNotify(ctx, hostID, KindAgentOffline, "warning", + e.raiseAndNotify(ctx, hostID, KindAgentOffline, "", "warning", fmt.Sprintf("Agent offline for %s (threshold %s)", roundDur(time.Since(*host.LastSeenAt)), e.agentOfflineFloor), time.Now().UTC()) } func (e *Engine) handleHostOnline(ctx context.Context, hostID string) { - e.resolveAndNotify(ctx, hostID, KindAgentOffline, time.Now().UTC()) + e.resolveAndNotify(ctx, hostID, KindAgentOffline, "", time.Now().UTC()) } // tick is the 60-second sweep. Responsibilities: @@ -186,7 +196,7 @@ func (e *Engine) tick(ctx context.Context, now time.Time) { continue } if now.Sub(*h.LastSeenAt) >= e.agentOfflineFloor { - e.raiseAndNotify(ctx, h.ID, KindAgentOffline, "warning", + e.raiseAndNotify(ctx, h.ID, KindAgentOffline, "", "warning", fmt.Sprintf("Agent offline for %s (threshold %s)", roundDur(now.Sub(*h.LastSeenAt)), e.agentOfflineFloor), now) } diff --git a/internal/alert/rules.go b/internal/alert/rules.go index e55cfe7..54e2015 100644 --- a/internal/alert/rules.go +++ b/internal/alert/rules.go @@ -42,10 +42,10 @@ const ( // deduplicates, and notification.Hub.Dispatch fires only on the first // raise (didRaise=true). Subsequent occurrences of the same open alert // are "touched" (last_seen_at bumped) without a second notification. -func (e *Engine) raiseAndNotify(ctx context.Context, hostID, kind, severity, message string, when time.Time) { - id, didRaise, err := e.store.RaiseOrTouch(ctx, hostID, kind, severity, message, when) +func (e *Engine) raiseAndNotify(ctx context.Context, hostID, kind, dedupKey, severity, message string, when time.Time) { + id, didRaise, err := e.store.RaiseOrTouch(ctx, hostID, kind, dedupKey, severity, message, when) if err != nil { - slog.Warn("alert: raise", "kind", kind, "host_id", hostID, "err", err) + slog.Warn("alert: raise", "kind", kind, "host_id", hostID, "dedup_key", dedupKey, "err", err) return } if !didRaise { @@ -122,11 +122,11 @@ func alertPayload(ctx context.Context, st *store.Store, ev notification.Event, a } } -// resolveAndNotify clears every open (or acknowledged) alert for -// (host_id, kind) via store.AutoResolve, then fires alert.resolved -// for each row that was actually open. Best-effort — errors are -// logged but do not propagate. -func (e *Engine) resolveAndNotify(ctx context.Context, hostID, kind string, when time.Time) { +// resolveAndNotify clears the open (or acknowledged) alert matching +// (host_id, kind, dedup_key) via store.AutoResolve, then fires +// alert.resolved for the row(s) actually closed. Best-effort — +// errors are logged but do not propagate. +func (e *Engine) resolveAndNotify(ctx context.Context, hostID, kind, dedupKey string, when time.Time) { open, err := e.store.ListAlerts(ctx, store.AlertFilter{ Status: "open", HostID: hostID, }) @@ -137,8 +137,8 @@ func (e *Engine) resolveAndNotify(ctx context.Context, hostID, kind string, when Status: "acknowledged", HostID: hostID, }) all := append(open, openAcked...) - if err := e.store.AutoResolve(ctx, hostID, kind, when); err != nil { - slog.Warn("alert: auto-resolve", "kind", kind, "host_id", hostID, "err", err) + if err := e.store.AutoResolve(ctx, hostID, kind, dedupKey, when); err != nil { + slog.Warn("alert: auto-resolve", "kind", kind, "host_id", hostID, "dedup_key", dedupKey, "err", err) return } host, _ := e.store.GetHost(ctx, hostID) @@ -147,7 +147,7 @@ func (e *Engine) resolveAndNotify(ctx context.Context, hostID, kind string, when hostName = host.Name } for _, a := range all { - if a.Kind != kind { + if a.Kind != kind || a.DedupKey != dedupKey { continue } go e.hub.Dispatch(ctx, notification.Payload{ diff --git a/internal/server/http/jobs.go b/internal/server/http/jobs.go index 8740abe..592c64e 100644 --- a/internal/server/http/jobs.go +++ b/internal/server/http/jobs.go @@ -65,7 +65,7 @@ func (s *Server) handleRunNow(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) dispatchJob(ctx context.Context, user *store.User, hostID string, kind api.JobKind, args []string, ) (res runNowResponse, status int, code, msg string) { - return s.dispatchJobWithPayload(ctx, user, hostID, kind, api.CommandRunPayload{ + return s.dispatchJobWithPayload(ctx, user, hostID, kind, nil, api.CommandRunPayload{ Kind: kind, Args: args, }) @@ -75,8 +75,12 @@ func (s *Server) dispatchJob(ctx context.Context, user *store.User, // fill in structured fields (Includes/Excludes/Tag/ForgetGroups/RequiresAdminCreds) // — used by the per-source-group Run-now path. JobID is filled in // here; callers leave it zero on the input payload. +// +// sourceGroupID is the dedup key the alert engine will key on for +// backup_failed. Pass non-nil for backups; nil for prune/check/unlock +// (those are repo-scoped and dedup at host_id only). func (s *Server) dispatchJobWithPayload(ctx context.Context, user *store.User, - hostID string, kind api.JobKind, payload api.CommandRunPayload, + hostID string, kind api.JobKind, sourceGroupID *string, payload api.CommandRunPayload, ) (res runNowResponse, status int, code, msg string) { if !validJobKind(kind) { return res, stdhttp.StatusBadRequest, "invalid_kind", @@ -100,12 +104,13 @@ func (s *Server) dispatchJobWithPayload(ctx context.Context, user *store.User, actorID = &user.ID } if err := s.deps.Store.CreateJob(ctx, store.Job{ - ID: jobID, - HostID: host.ID, - Kind: string(kind), - ActorKind: actor, - ActorID: actorID, - CreatedAt: now, + ID: jobID, + HostID: host.ID, + Kind: string(kind), + SourceGroupID: sourceGroupID, + ActorKind: actor, + ActorID: actorID, + CreatedAt: now, }); err != nil { return res, stdhttp.StatusInternalServerError, "internal", "" } diff --git a/internal/server/http/maintenance_dispatch.go b/internal/server/http/maintenance_dispatch.go index 598533f..2bb4b31 100644 --- a/internal/server/http/maintenance_dispatch.go +++ b/internal/server/http/maintenance_dispatch.go @@ -43,7 +43,7 @@ func (s *Server) DispatchMaintenance(ctx context.Context, decisions []maintenanc "host_id", d.HostID) continue } - _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobForget, payload) + _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobForget, nil, payload) if code != "" { slog.Warn("maintenance: forget dispatch failed", "host_id", d.HostID, "code", code, "msg", msg) @@ -65,14 +65,14 @@ func (s *Server) DispatchMaintenance(ctx context.Context, decisions []maintenanc continue } payload := api.CommandRunPayload{RequiresAdminCreds: true} - _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobPrune, payload) + _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobPrune, nil, payload) if code != "" { slog.Warn("maintenance: prune dispatch failed", "host_id", d.HostID, "code", code, "msg", msg) } case "check": payload := api.CommandRunPayload{Args: []string{strconv.Itoa(d.SubsetPct)}} - _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobCheck, payload) + _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobCheck, nil, payload) if code != "" { slog.Warn("maintenance: check dispatch failed", "host_id", d.HostID, "code", code, "msg", msg) diff --git a/internal/server/http/repo_ops.go b/internal/server/http/repo_ops.go index 00c8078..94e1fcb 100644 --- a/internal/server/http/repo_ops.go +++ b/internal/server/http/repo_ops.go @@ -52,7 +52,7 @@ func (s *Server) handleRunRepoPrune(w stdhttp.ResponseWriter, r *stdhttp.Request return } - res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobPrune, + res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobPrune, nil, api.CommandRunPayload{RequiresAdminCreds: true}) if code != "" { s.runOpError(w, r, status, code, msg) @@ -107,7 +107,7 @@ func (s *Server) handleRunRepoCheck(w stdhttp.ResponseWriter, r *stdhttp.Request // Non-numeric ?subset silently falls back to DB value. } - res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobCheck, + res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobCheck, nil, api.CommandRunPayload{Args: []string{strconv.Itoa(subset)}}) if code != "" { s.runOpError(w, r, status, code, msg) @@ -134,7 +134,7 @@ func (s *Server) handleRunRepoUnlock(w stdhttp.ResponseWriter, r *stdhttp.Reques return } - res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobUnlock, + res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobUnlock, nil, api.CommandRunPayload{}) if code != "" { s.runOpError(w, r, status, code, msg) diff --git a/internal/server/http/run_group.go b/internal/server/http/run_group.go index d47ecf3..0931bab 100644 --- a/internal/server/http/run_group.go +++ b/internal/server/http/run_group.go @@ -88,7 +88,7 @@ func (s *Server) handleRunSourceGroup(w stdhttp.ResponseWriter, r *stdhttp.Reque // Backup invocations don't consume RetentionPolicy — that lives on // forget. Sending the resolved set here would just be dead weight. - res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobBackup, + res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobBackup, &g.ID, api.CommandRunPayload{ Includes: g.Includes, Excludes: g.Excludes, diff --git a/internal/server/http/schedule_push.go b/internal/server/http/schedule_push.go index 3c453ad..96765d4 100644 --- a/internal/server/http/schedule_push.go +++ b/internal/server/http/schedule_push.go @@ -180,13 +180,15 @@ func (s *Server) dispatchBackupForGroupCore(ctx context.Context, conn *ws.Conn, jobID := ulid.Make().String() now := time.Now().UTC() scheduleRef := scheduleID + groupRef := g.ID if err := s.deps.Store.CreateJob(ctx, store.Job{ - ID: jobID, - HostID: hostID, - Kind: string(api.JobBackup), - ScheduledID: &scheduleRef, - ActorKind: "schedule", - CreatedAt: now, + ID: jobID, + HostID: hostID, + Kind: string(api.JobBackup), + ScheduledID: &scheduleRef, + SourceGroupID: &groupRef, + ActorKind: "schedule", + CreatedAt: now, }); err != nil { slog.Warn("schedule.fire: persist job", "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err) diff --git a/internal/server/http/ui_alerts_test.go b/internal/server/http/ui_alerts_test.go index 633773f..2d8b611 100644 --- a/internal/server/http/ui_alerts_test.go +++ b/internal/server/http/ui_alerts_test.go @@ -17,7 +17,7 @@ func TestAPIAlertsListsOpen(t *testing.T) { srv, ts, st := rawTestServer(t) hostID, _ := enrolHostForWS(t, srv, st, "host-alerts") _, _, _ = st.RaiseOrTouch(context.Background(), hostID, - "backup_failed", "warning", "x", time.Now().UTC()) + "backup_failed", "", "warning", "x", time.Now().UTC()) cookie := loginAsAdmin(t, st) req, _ := stdhttp.NewRequest("GET", ts.URL+"/api/alerts?status=open", nil) diff --git a/internal/server/ws/handler.go b/internal/server/ws/handler.go index 4ef61ee..9550081 100644 --- a/internal/server/ws/handler.go +++ b/internal/server/ws/handler.go @@ -219,12 +219,17 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E } if deps.AlertEngine != nil { if job, err := deps.Store.GetJob(ctx, p.JobID); err == nil && job != nil { + groupID := "" + if job.SourceGroupID != nil { + groupID = *job.SourceGroupID + } deps.AlertEngine.NotifyJobFinished(alert.JobFinishedEvent{ - HostID: hostID, - JobID: p.JobID, - Kind: job.Kind, - Status: string(p.Status), - When: p.FinishedAt, + HostID: hostID, + JobID: p.JobID, + Kind: job.Kind, + Status: string(p.Status), + SourceGroupID: groupID, + When: p.FinishedAt, }) } } diff --git a/internal/store/alerts.go b/internal/store/alerts.go index ef9036f..b12d6fa 100644 --- a/internal/store/alerts.go +++ b/internal/store/alerts.go @@ -21,11 +21,16 @@ type AlertFilter struct { } // RaiseOrTouch implements the dedup + last_seen_at bump pattern. If -// an alert with (host_id, kind, resolved_at IS NULL) already exists, -// it touches last_seen_at + message and returns (id, false). Otherwise -// inserts a fresh row and returns (id, true). Caller fires a -// notification only when didRaise=true. -func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, message string, when time.Time) (id string, didRaise bool, err error) { +// an alert with (host_id, kind, dedup_key, resolved_at IS NULL) +// already exists, it touches last_seen_at + message and returns +// (id, false). Otherwise inserts a fresh row and returns (id, true). +// Caller fires a notification only when didRaise=true. +// +// dedupKey is the source-group id for backup/forget/prune/check +// failures (so two failing groups on the same host produce two open +// alerts), and the empty string for one-per-host alerts like +// agent_offline / stale_schedule. +func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, dedupKey, severity, message string, when time.Time) (id string, didRaise bool, err error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return "", false, fmt.Errorf("store: begin: %w", err) @@ -33,8 +38,10 @@ func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, messag defer func() { _ = tx.Rollback() }() row := tx.QueryRowContext(ctx, - `SELECT id FROM alerts WHERE host_id = ? AND kind = ? AND resolved_at IS NULL LIMIT 1`, - hostID, kind) + `SELECT id FROM alerts + WHERE host_id = ? AND kind = ? AND dedup_key = ? AND resolved_at IS NULL + LIMIT 1`, + hostID, kind, dedupKey) var existing string switch err := row.Scan(&existing); { case err == nil: @@ -57,9 +64,9 @@ func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, messag id = ulid.Make().String() whenStr := when.UTC().Format(time.RFC3339Nano) _, err = tx.ExecContext(ctx, - `INSERT INTO alerts (id, host_id, kind, severity, message, created_at, last_seen_at) - VALUES (?, ?, ?, ?, ?, ?, ?)`, - id, hostID, kind, severity, message, whenStr, whenStr) + `INSERT INTO alerts (id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + id, hostID, kind, dedupKey, severity, message, whenStr, whenStr) if err != nil { return "", false, fmt.Errorf("store: insert alert: %w", err) } @@ -127,14 +134,20 @@ func (s *Store) Resolve(ctx context.Context, id string, when time.Time) error { return nil } -// AutoResolve closes every open alert for the (host_id, kind) pair. +// AutoResolve closes the open alert for (host_id, kind, dedup_key). // Used by the engine when a rule's underlying condition clears (e.g. -// next backup succeeded so backup_failed clears). -func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time.Time) error { +// next backup succeeded for the same source group so backup_failed +// clears). Pass dedupKey="" for one-per-host alerts (agent_offline). +// +// Closes only the dedup-key-matching row, not every open alert of +// the same kind on the host — distinct source groups now have +// distinct rows and a recovery in one shouldn't auto-resolve the +// others. +func (s *Store) AutoResolve(ctx context.Context, hostID, kind, dedupKey string, when time.Time) error { _, err := s.db.ExecContext(ctx, `UPDATE alerts SET resolved_at = ? - WHERE host_id = ? AND kind = ? AND resolved_at IS NULL`, - when.UTC().Format(time.RFC3339Nano), hostID, kind) + WHERE host_id = ? AND kind = ? AND dedup_key = ? AND resolved_at IS NULL`, + when.UTC().Format(time.RFC3339Nano), hostID, kind, dedupKey) if err != nil { return fmt.Errorf("store: auto-resolve: %w", err) } @@ -145,7 +158,7 @@ func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time. // GetAlert reads one row. func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) { row := s.db.QueryRowContext(ctx, - `SELECT id, host_id, kind, severity, message, created_at, last_seen_at, + `SELECT id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at, acknowledged_at, acknowledged_by, resolved_at FROM alerts WHERE id = ?`, id) return scanAlert(row.Scan) @@ -153,7 +166,7 @@ func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) { // ListAlerts is the filtered list. Sort: open-first, then by created_at desc. func (s *Store) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) { - q := `SELECT id, host_id, kind, severity, message, created_at, last_seen_at, + q := `SELECT id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at, acknowledged_at, acknowledged_by, resolved_at FROM alerts` conds := []string{} args := []any{} @@ -209,7 +222,7 @@ func scanAlert(scan func(...any) error) (*Alert, error) { var a Alert var hostID, lastSeen, ackedAt, ackedBy, resolvedAt sql.NullString var createdAt string - if err := scan(&a.ID, &hostID, &a.Kind, &a.Severity, &a.Message, + if err := scan(&a.ID, &hostID, &a.Kind, &a.DedupKey, &a.Severity, &a.Message, &createdAt, &lastSeen, &ackedAt, &ackedBy, &resolvedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound diff --git a/internal/store/alerts_test.go b/internal/store/alerts_test.go index 8771cb7..fb3e288 100644 --- a/internal/store/alerts_test.go +++ b/internal/store/alerts_test.go @@ -33,7 +33,7 @@ func TestRaiseOrTouchInsertsThenTouches(t *testing.T) { ctx := context.Background() t0 := time.Now().UTC() - id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", + id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "Backup failed: 401", t0) if err != nil { t.Fatalf("first raise: %v", err) @@ -47,7 +47,7 @@ func TestRaiseOrTouchInsertsThenTouches(t *testing.T) { // Second call within the same open window should touch, not insert. t1 := t0.Add(60 * time.Second) - id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", + id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "Backup failed: 401 (still)", t1) if err != nil { t.Fatalf("touch: %v", err) @@ -78,7 +78,7 @@ func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) { ctx := context.Background() t0 := time.Now().UTC() - id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "first", t0) + id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "first", t0) if err != nil { t.Fatalf("raise: %v", err) } @@ -86,7 +86,7 @@ func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) { t.Fatalf("resolve: %v", err) } - id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "second", t0.Add(2*time.Minute)) + id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "second", t0.Add(2*time.Minute)) if err != nil { t.Fatalf("re-raise: %v", err) } @@ -98,6 +98,38 @@ func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) { } } +// Two source groups failing on the same host produce two distinct +// open alerts (not one collapsed). Pre-dedup-key, this would have +// touched the existing row and silently dropped the second failure. +func TestRaiseOrTouchDedupsPerSourceGroup(t *testing.T) { + t.Parallel() + st, hostID := newTestStoreWithHost(t) + ctx := context.Background() + t0 := time.Now().UTC() + + idA, didA, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "group-a", + "warning", "group A failed", t0) + if err != nil || !didA { + t.Fatalf("group A raise: id=%q didRaise=%v err=%v", idA, didA, err) + } + idB, didB, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "group-b", + "warning", "group B failed", t0.Add(time.Second)) + if err != nil || !didB { + t.Fatalf("group B raise: id=%q didRaise=%v err=%v", idB, didB, err) + } + if idA == idB { + t.Fatalf("expected distinct alert ids per source group, got %q twice", idA) + } + // Resolving group A must not auto-resolve group B. + if err := st.AutoResolve(ctx, hostID, "backup_failed", "group-a", t0.Add(2*time.Second)); err != nil { + t.Fatalf("auto-resolve A: %v", err) + } + gotB, _ := st.GetAlert(ctx, idB) + if gotB.ResolvedAt != nil { + t.Errorf("group B got auto-resolved by group A's recovery; resolved_at=%v", gotB.ResolvedAt) + } +} + func TestAcknowledgeKeepsAlertOpen(t *testing.T) { t.Parallel() st, hostID := newTestStoreWithHost(t) @@ -112,7 +144,7 @@ func TestAcknowledgeKeepsAlertOpen(t *testing.T) { t.Fatalf("create user: %v", err) } - id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", time.Now().UTC()) + id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "m", time.Now().UTC()) if err != nil { t.Fatalf("raise: %v", err) } @@ -140,8 +172,8 @@ func TestAutoResolveClearsOpenAlerts(t *testing.T) { ctx := context.Background() t0 := time.Now().UTC() - id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", t0) - if err := st.AutoResolve(ctx, hostID, "backup_failed", t0.Add(time.Minute)); err != nil { + id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "m", t0) + if err := st.AutoResolve(ctx, hostID, "backup_failed", "", t0.Add(time.Minute)); err != nil { t.Fatalf("auto-resolve: %v", err) } got, _ := st.GetAlert(ctx, id) @@ -157,8 +189,8 @@ func TestListAlertsFilters(t *testing.T) { t0 := time.Now().UTC() // One open warning + one resolved info. - _, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "open", t0) - id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "info", "done", t0) + _, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "", "warning", "open", t0) + id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "", "info", "done", t0) _ = st.Resolve(ctx, id2, t0.Add(time.Minute)) open, err := st.ListAlerts(ctx, AlertFilter{Status: "open"}) diff --git a/internal/store/jobs.go b/internal/store/jobs.go index 633fcc6..f589c24 100644 --- a/internal/store/jobs.go +++ b/internal/store/jobs.go @@ -11,19 +11,20 @@ import ( // Job mirrors the jobs table. type Job struct { - ID string - HostID string - Kind string - Status string - ScheduledID *string - ActorKind string // user|schedule|system - ActorID *string - StartedAt *time.Time - FinishedAt *time.Time - ExitCode *int - Stats json.RawMessage - Error *string - CreatedAt time.Time + ID string + HostID string + Kind string + Status string + ScheduledID *string + SourceGroupID *string // populated for backup jobs; alert engine dedup key + ActorKind string // user|schedule|system + ActorID *string + StartedAt *time.Time + FinishedAt *time.Time + ExitCode *int + Stats json.RawMessage + Error *string + CreatedAt time.Time } // CreateJob inserts a queued job. The agent will mark it running @@ -32,10 +33,11 @@ type Job struct { // operator-driven run-now. func (s *Store) CreateJob(ctx context.Context, j Job) error { _, err := s.db.ExecContext(ctx, - `INSERT INTO jobs (id, host_id, kind, status, scheduled_id, actor_kind, actor_id, created_at) - VALUES (?, ?, ?, 'queued', ?, ?, ?, ?)`, + `INSERT INTO jobs (id, host_id, kind, status, scheduled_id, source_group_id, actor_kind, actor_id, created_at) + VALUES (?, ?, ?, 'queued', ?, ?, ?, ?, ?)`, j.ID, j.HostID, j.Kind, - nullable(j.ScheduledID), j.ActorKind, nullable(j.ActorID), + nullable(j.ScheduledID), nullable(j.SourceGroupID), + j.ActorKind, nullable(j.ActorID), j.CreatedAt.UTC().Format(time.RFC3339Nano)) if err != nil { return fmt.Errorf("store: create job: %w", err) @@ -139,12 +141,13 @@ func (s *Store) ListJobLogs(ctx context.Context, jobID string, afterSeq int64, l // GetJob returns a job row. func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { row := s.db.QueryRowContext(ctx, - `SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id, + `SELECT id, host_id, kind, status, scheduled_id, source_group_id, actor_kind, actor_id, started_at, finished_at, exit_code, stats, error, created_at FROM jobs WHERE id = ?`, id) var ( j Job schedID sql.NullString + groupID sql.NullString actorID sql.NullString startedAt sql.NullString finishedAt sql.NullString @@ -153,7 +156,7 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { errMsg sql.NullString createdAt string ) - if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, + if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, &groupID, &j.ActorKind, &actorID, &startedAt, &finishedAt, &exitCode, &stats, &errMsg, &createdAt); err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -165,6 +168,10 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { s := schedID.String j.ScheduledID = &s } + if groupID.Valid { + s := groupID.String + j.SourceGroupID = &s + } if actorID.Valid { s := actorID.String j.ActorID = &s @@ -201,7 +208,7 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { // would re-fire on the next tick while the first is still running. func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, error) { row := s.db.QueryRowContext(ctx, - `SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id, + `SELECT id, host_id, kind, status, scheduled_id, source_group_id, actor_kind, actor_id, started_at, finished_at, exit_code, stats, error, created_at FROM jobs WHERE host_id = ? AND kind = ? @@ -210,6 +217,7 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, var ( j Job schedID sql.NullString + groupID sql.NullString actorID sql.NullString startedAt sql.NullString finishedAt sql.NullString @@ -218,7 +226,7 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, errMsg sql.NullString createdAt string ) - if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, + if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, &groupID, &j.ActorKind, &actorID, &startedAt, &finishedAt, &exitCode, &stats, &errMsg, &createdAt); err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -230,6 +238,10 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, s := schedID.String j.ScheduledID = &s } + if groupID.Valid { + s := groupID.String + j.SourceGroupID = &s + } if actorID.Valid { s := actorID.String j.ActorID = &s diff --git a/internal/store/migrations/0015_jobs_source_group_id.sql b/internal/store/migrations/0015_jobs_source_group_id.sql new file mode 100644 index 0000000..045745b --- /dev/null +++ b/internal/store/migrations/0015_jobs_source_group_id.sql @@ -0,0 +1,16 @@ +-- 0015_jobs_source_group_id.sql +-- +-- Add source_group_id to jobs so the alert engine can dedup +-- backup/forget/prune/check failures per source group rather than +-- collapsing every failed thing on a host onto one open alert per +-- kind. Backup jobs always have one set (each group is its own +-- restic invocation); forget/prune/check are repo-scoped and leave +-- it NULL. +-- +-- Column-level ALTER is safe under foreign_keys=ON (CLAUDE.md). The +-- existing rebuild pattern in 0012 should not be repeated here. + +ALTER TABLE jobs ADD COLUMN source_group_id TEXT + REFERENCES source_groups(id) ON DELETE SET NULL; + +CREATE INDEX jobs_source_group_id ON jobs(source_group_id); diff --git a/internal/store/migrations/0016_alerts_dedup_key.sql b/internal/store/migrations/0016_alerts_dedup_key.sql new file mode 100644 index 0000000..f2909a5 --- /dev/null +++ b/internal/store/migrations/0016_alerts_dedup_key.sql @@ -0,0 +1,23 @@ +-- 0016_alerts_dedup_key.sql +-- +-- Widen the open-alert uniqueness key from (host_id, kind) to +-- (host_id, kind, dedup_key) so two distinct failing source groups +-- on the same host produce two open alerts instead of collapsing +-- onto one. dedup_key is the source_group_id for +-- backup/forget/prune/check failures and the empty string for +-- agent_offline / stale_schedule (one-per-host alerts). +-- +-- The original alerts_open partial index keyed on host_id only. +-- That was a coarse "is this host happy?" lookup; we replace it +-- with a proper partial unique index that the dedup logic relies +-- on. NOT NULL DEFAULT '' so existing rows backfill cleanly. +-- +-- Column-level ALTER is safe under foreign_keys=ON. + +ALTER TABLE alerts ADD COLUMN dedup_key TEXT NOT NULL DEFAULT ''; + +DROP INDEX IF EXISTS alerts_open; + +CREATE UNIQUE INDEX alerts_open_unique + ON alerts(host_id, kind, dedup_key) + WHERE resolved_at IS NULL; diff --git a/internal/store/types.go b/internal/store/types.go index 63ecf77..762bee7 100644 --- a/internal/store/types.go +++ b/internal/store/types.go @@ -198,6 +198,7 @@ type Alert struct { ID string HostID *string Kind string + DedupKey string // empty for one-per-host alerts; source-group id otherwise Severity string Message string CreatedAt time.Time