From b5a0aa466721b415f7e80bb162b261e6087a2047 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 4 May 2026 19:24:17 +0100 Subject: [PATCH] store: alerts CRUD with dedup + last_seen_at bump --- internal/store/alerts.go | 216 ++++++++++++++++++++++++++++++++++ internal/store/alerts_test.go | 179 ++++++++++++++++++++++++++++ internal/store/types.go | 14 +++ 3 files changed, 409 insertions(+) create mode 100644 internal/store/alerts.go create mode 100644 internal/store/alerts_test.go diff --git a/internal/store/alerts.go b/internal/store/alerts.go new file mode 100644 index 0000000..42fb2d1 --- /dev/null +++ b/internal/store/alerts.go @@ -0,0 +1,216 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + "time" + + "github.com/oklog/ulid/v2" +) + +// AlertFilter narrows ListAlerts. +type AlertFilter struct { + Status string // "open" | "acknowledged" | "resolved" | "all" | "" + Severity string // "info" | "warning" | "critical" | "" + HostID string // empty = any host + Search string // substring match on message + Limit int // 0 = no limit +} + +// RaiseOrTouch implements the dedup + last_seen_at bump pattern. If +// an alert with (host_id, kind, resolved_at IS NULL) already exists, +// it touches last_seen_at + message and returns (id, false). Otherwise +// inserts a fresh row and returns (id, true). Caller fires a +// notification only when didRaise=true. +func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, message string, when time.Time) (id string, didRaise bool, err error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return "", false, fmt.Errorf("store: begin: %w", err) + } + defer func() { _ = tx.Rollback() }() + + row := tx.QueryRowContext(ctx, + `SELECT id FROM alerts WHERE host_id = ? AND kind = ? AND resolved_at IS NULL LIMIT 1`, + hostID, kind) + var existing string + switch err := row.Scan(&existing); { + case err == nil: + _, uerr := tx.ExecContext(ctx, + `UPDATE alerts SET last_seen_at = ?, message = ? WHERE id = ?`, + when.UTC().Format(time.RFC3339Nano), message, existing) + if uerr != nil { + return "", false, fmt.Errorf("store: touch alert: %w", uerr) + } + if err := tx.Commit(); err != nil { + return "", false, err + } + return existing, false, nil + case errors.Is(err, sql.ErrNoRows): + // fall through to insert + default: + return "", false, fmt.Errorf("store: lookup alert: %w", err) + } + + id = ulid.Make().String() + whenStr := when.UTC().Format(time.RFC3339Nano) + _, err = tx.ExecContext(ctx, + `INSERT INTO alerts (id, host_id, kind, severity, message, created_at, last_seen_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + id, hostID, kind, severity, message, whenStr, whenStr) + if err != nil { + return "", false, fmt.Errorf("store: insert alert: %w", err) + } + if err := tx.Commit(); err != nil { + return "", false, err + } + return id, true, nil +} + +// Acknowledge sets acknowledged_at + acknowledged_by; does NOT set +// resolved_at. Idempotent — re-acknowledging just refreshes the timestamp. +func (s *Store) Acknowledge(ctx context.Context, id, userID string, when time.Time) error { + res, err := s.db.ExecContext(ctx, + `UPDATE alerts SET acknowledged_at = ?, acknowledged_by = ? + WHERE id = ? AND resolved_at IS NULL`, + when.UTC().Format(time.RFC3339Nano), userID, id) + if err != nil { + return fmt.Errorf("store: ack alert: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + return nil +} + +// Resolve marks the alert resolved. Idempotent on already-resolved rows +// (no-op). +func (s *Store) Resolve(ctx context.Context, id string, when time.Time) error { + _, err := s.db.ExecContext(ctx, + `UPDATE alerts SET resolved_at = ? + WHERE id = ? AND resolved_at IS NULL`, + when.UTC().Format(time.RFC3339Nano), id) + if err != nil { + return fmt.Errorf("store: resolve alert: %w", err) + } + return nil +} + +// AutoResolve closes every open alert for the (host_id, kind) pair. +// Used by the engine when a rule's underlying condition clears (e.g. +// next backup succeeded so backup_failed clears). +func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time.Time) error { + _, err := s.db.ExecContext(ctx, + `UPDATE alerts SET resolved_at = ? + WHERE host_id = ? AND kind = ? AND resolved_at IS NULL`, + when.UTC().Format(time.RFC3339Nano), hostID, kind) + if err != nil { + return fmt.Errorf("store: auto-resolve: %w", err) + } + return nil +} + +// GetAlert reads one row. +func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) { + row := s.db.QueryRowContext(ctx, + `SELECT id, host_id, kind, severity, message, created_at, last_seen_at, + acknowledged_at, acknowledged_by, resolved_at + FROM alerts WHERE id = ?`, id) + return scanAlert(row.Scan) +} + +// ListAlerts is the filtered list. Sort: open-first, then by created_at desc. +func (s *Store) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) { + q := `SELECT id, host_id, kind, severity, message, created_at, last_seen_at, + acknowledged_at, acknowledged_by, resolved_at FROM alerts` + conds := []string{} + args := []any{} + switch f.Status { + case "open": + conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NULL") + case "acknowledged": + conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NOT NULL") + case "resolved": + conds = append(conds, "resolved_at IS NOT NULL") + case "all", "": + // no-op + } + if f.Severity != "" { + conds = append(conds, "severity = ?") + args = append(args, f.Severity) + } + if f.HostID != "" { + conds = append(conds, "host_id = ?") + args = append(args, f.HostID) + } + if f.Search != "" { + conds = append(conds, "message LIKE ?") + args = append(args, "%"+f.Search+"%") + } + if len(conds) > 0 { + q += " WHERE " + strings.Join(conds, " AND ") + } + q += ` ORDER BY (resolved_at IS NULL) DESC, created_at DESC` + if f.Limit > 0 { + q += ` LIMIT ?` + args = append(args, f.Limit) + } + rows, err := s.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, fmt.Errorf("store: list alerts: %w", err) + } + defer func() { _ = rows.Close() }() + var out []Alert + for rows.Next() { + a, err := scanAlert(rows.Scan) + if err != nil { + return nil, err + } + out = append(out, *a) + } + return out, rows.Err() +} + +// scanAlert centralises the column read so the GetAlert and +// ListAlerts paths agree on column order. Pass row.Scan or rows.Scan. +func scanAlert(scan func(...any) error) (*Alert, error) { + var a Alert + var hostID, lastSeen, ackedAt, ackedBy, resolvedAt sql.NullString + var createdAt string + if err := scan(&a.ID, &hostID, &a.Kind, &a.Severity, &a.Message, + &createdAt, &lastSeen, &ackedAt, &ackedBy, &resolvedAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("store: scan alert: %w", err) + } + if hostID.Valid { + v := hostID.String + a.HostID = &v + } + t, err := time.Parse(time.RFC3339Nano, createdAt) + if err != nil { + return nil, fmt.Errorf("store: parse created_at: %w", err) + } + a.CreatedAt = t + if lastSeen.Valid { + t, _ := time.Parse(time.RFC3339Nano, lastSeen.String) + a.LastSeenAt = &t + } + if ackedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, ackedAt.String) + a.AcknowledgedAt = &t + } + if ackedBy.Valid { + v := ackedBy.String + a.AcknowledgedBy = &v + } + if resolvedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, resolvedAt.String) + a.ResolvedAt = &t + } + return &a, nil +} diff --git a/internal/store/alerts_test.go b/internal/store/alerts_test.go new file mode 100644 index 0000000..8771cb7 --- /dev/null +++ b/internal/store/alerts_test.go @@ -0,0 +1,179 @@ +package store + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/oklog/ulid/v2" +) + +func newTestStoreWithHost(t *testing.T) (*Store, string) { + t.Helper() + dir := t.TempDir() + st, err := Open(context.Background(), filepath.Join(dir, "rm.db")) + if err != nil { + t.Fatalf("open: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + hostID := ulid.Make().String() + if err := st.CreateHost(context.Background(), Host{ + ID: hostID, Name: "h", OS: "linux", Arch: "amd64", + EnrolledAt: time.Now().UTC(), + }, "deadbeef", ""); err != nil { + t.Fatalf("create host: %v", err) + } + return st, hostID +} + +func TestRaiseOrTouchInsertsThenTouches(t *testing.T) { + t.Parallel() + st, hostID := newTestStoreWithHost(t) + ctx := context.Background() + + t0 := time.Now().UTC() + id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", + "Backup failed: 401", t0) + if err != nil { + t.Fatalf("first raise: %v", err) + } + if !didRaise { + t.Fatalf("first call must didRaise=true") + } + if id1 == "" { + t.Fatalf("expected non-empty id") + } + + // Second call within the same open window should touch, not insert. + t1 := t0.Add(60 * time.Second) + id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", + "Backup failed: 401 (still)", t1) + if err != nil { + t.Fatalf("touch: %v", err) + } + if didRaise2 { + t.Fatalf("second call must didRaise=false") + } + if id2 != id1 { + t.Fatalf("touch returned a different id: got %q want %q", id2, id1) + } + + // last_seen_at and message must be updated. + got, err := st.GetAlert(ctx, id1) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.LastSeenAt == nil || !got.LastSeenAt.Equal(t1) { + t.Errorf("last_seen_at: got %v want %v", got.LastSeenAt, t1) + } + if got.Message != "Backup failed: 401 (still)" { + t.Errorf("message not refreshed: %q", got.Message) + } +} + +func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) { + t.Parallel() + st, hostID := newTestStoreWithHost(t) + ctx := context.Background() + + t0 := time.Now().UTC() + id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "first", t0) + if err != nil { + t.Fatalf("raise: %v", err) + } + if err := st.Resolve(ctx, id1, t0.Add(time.Minute)); err != nil { + t.Fatalf("resolve: %v", err) + } + + id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "second", t0.Add(2*time.Minute)) + if err != nil { + t.Fatalf("re-raise: %v", err) + } + if !didRaise { + t.Fatalf("post-resolve raise must didRaise=true") + } + if id2 == id1 { + t.Fatalf("re-raise reused the resolved id; want a fresh row") + } +} + +func TestAcknowledgeKeepsAlertOpen(t *testing.T) { + t.Parallel() + st, hostID := newTestStoreWithHost(t) + ctx := context.Background() + + // Create a real user so the acknowledged_by FK is satisfied. + userID := ulid.Make().String() + if err := st.CreateUser(ctx, User{ + ID: userID, Username: "ackuser", PasswordHash: "x", + Role: RoleOperator, CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("create user: %v", err) + } + + id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", time.Now().UTC()) + if err != nil { + t.Fatalf("raise: %v", err) + } + if err := st.Acknowledge(ctx, id, userID, time.Now().UTC()); err != nil { + t.Fatalf("ack: %v", err) + } + got, err := st.GetAlert(ctx, id) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.AcknowledgedAt == nil { + t.Errorf("acknowledged_at not set") + } + if got.AcknowledgedBy == nil || *got.AcknowledgedBy != userID { + t.Errorf("acknowledged_by: got %v want %q", got.AcknowledgedBy, userID) + } + if got.ResolvedAt != nil { + t.Errorf("ack must not set resolved_at; got %v", got.ResolvedAt) + } +} + +func TestAutoResolveClearsOpenAlerts(t *testing.T) { + t.Parallel() + st, hostID := newTestStoreWithHost(t) + ctx := context.Background() + + t0 := time.Now().UTC() + id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", t0) + if err := st.AutoResolve(ctx, hostID, "backup_failed", t0.Add(time.Minute)); err != nil { + t.Fatalf("auto-resolve: %v", err) + } + got, _ := st.GetAlert(ctx, id) + if got.ResolvedAt == nil { + t.Errorf("expected resolved_at set") + } +} + +func TestListAlertsFilters(t *testing.T) { + t.Parallel() + st, hostID := newTestStoreWithHost(t) + ctx := context.Background() + t0 := time.Now().UTC() + + // One open warning + one resolved info. + _, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "open", t0) + id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "info", "done", t0) + _ = st.Resolve(ctx, id2, t0.Add(time.Minute)) + + open, err := st.ListAlerts(ctx, AlertFilter{Status: "open"}) + if err != nil { + t.Fatalf("list open: %v", err) + } + if len(open) != 1 || open[0].Severity != "warning" { + t.Errorf("open filter: got %+v", open) + } + + all, err := st.ListAlerts(ctx, AlertFilter{Status: "all"}) + if err != nil { + t.Fatalf("list all: %v", err) + } + if len(all) != 2 { + t.Errorf("all filter: got %d, want 2", len(all)) + } +} diff --git a/internal/store/types.go b/internal/store/types.go index c42bc89..63ecf77 100644 --- a/internal/store/types.go +++ b/internal/store/types.go @@ -193,6 +193,20 @@ type EnrollmentToken struct { ExpiresAt time.Time } +// Alert mirrors the alerts table. +type Alert struct { + ID string + HostID *string + Kind string + Severity string + Message string + CreatedAt time.Time + LastSeenAt *time.Time + AcknowledgedAt *time.Time + AcknowledgedBy *string + ResolvedAt *time.Time +} + // AuditEntry mirrors the audit_log table. type AuditEntry struct { ID string