store: alerts CRUD with dedup + last_seen_at bump
This commit is contained in:
@@ -0,0 +1,216 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AlertFilter narrows ListAlerts.
|
||||||
|
type AlertFilter struct {
|
||||||
|
Status string // "open" | "acknowledged" | "resolved" | "all" | ""
|
||||||
|
Severity string // "info" | "warning" | "critical" | ""
|
||||||
|
HostID string // empty = any host
|
||||||
|
Search string // substring match on message
|
||||||
|
Limit int // 0 = no limit
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaiseOrTouch implements the dedup + last_seen_at bump pattern. If
|
||||||
|
// an alert with (host_id, kind, resolved_at IS NULL) already exists,
|
||||||
|
// it touches last_seen_at + message and returns (id, false). Otherwise
|
||||||
|
// inserts a fresh row and returns (id, true). Caller fires a
|
||||||
|
// notification only when didRaise=true.
|
||||||
|
func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, message string, when time.Time) (id string, didRaise bool, err error) {
|
||||||
|
tx, err := s.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", false, fmt.Errorf("store: begin: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
row := tx.QueryRowContext(ctx,
|
||||||
|
`SELECT id FROM alerts WHERE host_id = ? AND kind = ? AND resolved_at IS NULL LIMIT 1`,
|
||||||
|
hostID, kind)
|
||||||
|
var existing string
|
||||||
|
switch err := row.Scan(&existing); {
|
||||||
|
case err == nil:
|
||||||
|
_, uerr := tx.ExecContext(ctx,
|
||||||
|
`UPDATE alerts SET last_seen_at = ?, message = ? WHERE id = ?`,
|
||||||
|
when.UTC().Format(time.RFC3339Nano), message, existing)
|
||||||
|
if uerr != nil {
|
||||||
|
return "", false, fmt.Errorf("store: touch alert: %w", uerr)
|
||||||
|
}
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
return existing, false, nil
|
||||||
|
case errors.Is(err, sql.ErrNoRows):
|
||||||
|
// fall through to insert
|
||||||
|
default:
|
||||||
|
return "", false, fmt.Errorf("store: lookup alert: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id = ulid.Make().String()
|
||||||
|
whenStr := when.UTC().Format(time.RFC3339Nano)
|
||||||
|
_, err = tx.ExecContext(ctx,
|
||||||
|
`INSERT INTO alerts (id, host_id, kind, severity, message, created_at, last_seen_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||||
|
id, hostID, kind, severity, message, whenStr, whenStr)
|
||||||
|
if err != nil {
|
||||||
|
return "", false, fmt.Errorf("store: insert alert: %w", err)
|
||||||
|
}
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
return id, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acknowledge sets acknowledged_at + acknowledged_by; does NOT set
|
||||||
|
// resolved_at. Idempotent — re-acknowledging just refreshes the timestamp.
|
||||||
|
func (s *Store) Acknowledge(ctx context.Context, id, userID string, when time.Time) error {
|
||||||
|
res, err := s.db.ExecContext(ctx,
|
||||||
|
`UPDATE alerts SET acknowledged_at = ?, acknowledged_by = ?
|
||||||
|
WHERE id = ? AND resolved_at IS NULL`,
|
||||||
|
when.UTC().Format(time.RFC3339Nano), userID, id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: ack alert: %w", err)
|
||||||
|
}
|
||||||
|
n, _ := res.RowsAffected()
|
||||||
|
if n == 0 {
|
||||||
|
return ErrNotFound
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve marks the alert resolved. Idempotent on already-resolved rows
|
||||||
|
// (no-op).
|
||||||
|
func (s *Store) Resolve(ctx context.Context, id string, when time.Time) error {
|
||||||
|
_, err := s.db.ExecContext(ctx,
|
||||||
|
`UPDATE alerts SET resolved_at = ?
|
||||||
|
WHERE id = ? AND resolved_at IS NULL`,
|
||||||
|
when.UTC().Format(time.RFC3339Nano), id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: resolve alert: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AutoResolve closes every open alert for the (host_id, kind) pair.
|
||||||
|
// Used by the engine when a rule's underlying condition clears (e.g.
|
||||||
|
// next backup succeeded so backup_failed clears).
|
||||||
|
func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time.Time) error {
|
||||||
|
_, err := s.db.ExecContext(ctx,
|
||||||
|
`UPDATE alerts SET resolved_at = ?
|
||||||
|
WHERE host_id = ? AND kind = ? AND resolved_at IS NULL`,
|
||||||
|
when.UTC().Format(time.RFC3339Nano), hostID, kind)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: auto-resolve: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAlert reads one row.
|
||||||
|
func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) {
|
||||||
|
row := s.db.QueryRowContext(ctx,
|
||||||
|
`SELECT id, host_id, kind, severity, message, created_at, last_seen_at,
|
||||||
|
acknowledged_at, acknowledged_by, resolved_at
|
||||||
|
FROM alerts WHERE id = ?`, id)
|
||||||
|
return scanAlert(row.Scan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListAlerts is the filtered list. Sort: open-first, then by created_at desc.
|
||||||
|
func (s *Store) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) {
|
||||||
|
q := `SELECT id, host_id, kind, severity, message, created_at, last_seen_at,
|
||||||
|
acknowledged_at, acknowledged_by, resolved_at FROM alerts`
|
||||||
|
conds := []string{}
|
||||||
|
args := []any{}
|
||||||
|
switch f.Status {
|
||||||
|
case "open":
|
||||||
|
conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NULL")
|
||||||
|
case "acknowledged":
|
||||||
|
conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NOT NULL")
|
||||||
|
case "resolved":
|
||||||
|
conds = append(conds, "resolved_at IS NOT NULL")
|
||||||
|
case "all", "":
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
if f.Severity != "" {
|
||||||
|
conds = append(conds, "severity = ?")
|
||||||
|
args = append(args, f.Severity)
|
||||||
|
}
|
||||||
|
if f.HostID != "" {
|
||||||
|
conds = append(conds, "host_id = ?")
|
||||||
|
args = append(args, f.HostID)
|
||||||
|
}
|
||||||
|
if f.Search != "" {
|
||||||
|
conds = append(conds, "message LIKE ?")
|
||||||
|
args = append(args, "%"+f.Search+"%")
|
||||||
|
}
|
||||||
|
if len(conds) > 0 {
|
||||||
|
q += " WHERE " + strings.Join(conds, " AND ")
|
||||||
|
}
|
||||||
|
q += ` ORDER BY (resolved_at IS NULL) DESC, created_at DESC`
|
||||||
|
if f.Limit > 0 {
|
||||||
|
q += ` LIMIT ?`
|
||||||
|
args = append(args, f.Limit)
|
||||||
|
}
|
||||||
|
rows, err := s.db.QueryContext(ctx, q, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: list alerts: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
var out []Alert
|
||||||
|
for rows.Next() {
|
||||||
|
a, err := scanAlert(rows.Scan)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, *a)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// scanAlert centralises the column read so the GetAlert and
|
||||||
|
// ListAlerts paths agree on column order. Pass row.Scan or rows.Scan.
|
||||||
|
func scanAlert(scan func(...any) error) (*Alert, error) {
|
||||||
|
var a Alert
|
||||||
|
var hostID, lastSeen, ackedAt, ackedBy, resolvedAt sql.NullString
|
||||||
|
var createdAt string
|
||||||
|
if err := scan(&a.ID, &hostID, &a.Kind, &a.Severity, &a.Message,
|
||||||
|
&createdAt, &lastSeen, &ackedAt, &ackedBy, &resolvedAt); err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("store: scan alert: %w", err)
|
||||||
|
}
|
||||||
|
if hostID.Valid {
|
||||||
|
v := hostID.String
|
||||||
|
a.HostID = &v
|
||||||
|
}
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, createdAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: parse created_at: %w", err)
|
||||||
|
}
|
||||||
|
a.CreatedAt = t
|
||||||
|
if lastSeen.Valid {
|
||||||
|
t, _ := time.Parse(time.RFC3339Nano, lastSeen.String)
|
||||||
|
a.LastSeenAt = &t
|
||||||
|
}
|
||||||
|
if ackedAt.Valid {
|
||||||
|
t, _ := time.Parse(time.RFC3339Nano, ackedAt.String)
|
||||||
|
a.AcknowledgedAt = &t
|
||||||
|
}
|
||||||
|
if ackedBy.Valid {
|
||||||
|
v := ackedBy.String
|
||||||
|
a.AcknowledgedBy = &v
|
||||||
|
}
|
||||||
|
if resolvedAt.Valid {
|
||||||
|
t, _ := time.Parse(time.RFC3339Nano, resolvedAt.String)
|
||||||
|
a.ResolvedAt = &t
|
||||||
|
}
|
||||||
|
return &a, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,179 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestStoreWithHost(t *testing.T) (*Store, string) {
|
||||||
|
t.Helper()
|
||||||
|
dir := t.TempDir()
|
||||||
|
st, err := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { _ = st.Close() })
|
||||||
|
hostID := ulid.Make().String()
|
||||||
|
if err := st.CreateHost(context.Background(), Host{
|
||||||
|
ID: hostID, Name: "h", OS: "linux", Arch: "amd64",
|
||||||
|
EnrolledAt: time.Now().UTC(),
|
||||||
|
}, "deadbeef", ""); err != nil {
|
||||||
|
t.Fatalf("create host: %v", err)
|
||||||
|
}
|
||||||
|
return st, hostID
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRaiseOrTouchInsertsThenTouches(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
st, hostID := newTestStoreWithHost(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t0 := time.Now().UTC()
|
||||||
|
id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning",
|
||||||
|
"Backup failed: 401", t0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("first raise: %v", err)
|
||||||
|
}
|
||||||
|
if !didRaise {
|
||||||
|
t.Fatalf("first call must didRaise=true")
|
||||||
|
}
|
||||||
|
if id1 == "" {
|
||||||
|
t.Fatalf("expected non-empty id")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second call within the same open window should touch, not insert.
|
||||||
|
t1 := t0.Add(60 * time.Second)
|
||||||
|
id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning",
|
||||||
|
"Backup failed: 401 (still)", t1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("touch: %v", err)
|
||||||
|
}
|
||||||
|
if didRaise2 {
|
||||||
|
t.Fatalf("second call must didRaise=false")
|
||||||
|
}
|
||||||
|
if id2 != id1 {
|
||||||
|
t.Fatalf("touch returned a different id: got %q want %q", id2, id1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// last_seen_at and message must be updated.
|
||||||
|
got, err := st.GetAlert(ctx, id1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get: %v", err)
|
||||||
|
}
|
||||||
|
if got.LastSeenAt == nil || !got.LastSeenAt.Equal(t1) {
|
||||||
|
t.Errorf("last_seen_at: got %v want %v", got.LastSeenAt, t1)
|
||||||
|
}
|
||||||
|
if got.Message != "Backup failed: 401 (still)" {
|
||||||
|
t.Errorf("message not refreshed: %q", got.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
st, hostID := newTestStoreWithHost(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t0 := time.Now().UTC()
|
||||||
|
id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "first", t0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("raise: %v", err)
|
||||||
|
}
|
||||||
|
if err := st.Resolve(ctx, id1, t0.Add(time.Minute)); err != nil {
|
||||||
|
t.Fatalf("resolve: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "second", t0.Add(2*time.Minute))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("re-raise: %v", err)
|
||||||
|
}
|
||||||
|
if !didRaise {
|
||||||
|
t.Fatalf("post-resolve raise must didRaise=true")
|
||||||
|
}
|
||||||
|
if id2 == id1 {
|
||||||
|
t.Fatalf("re-raise reused the resolved id; want a fresh row")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAcknowledgeKeepsAlertOpen(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
st, hostID := newTestStoreWithHost(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Create a real user so the acknowledged_by FK is satisfied.
|
||||||
|
userID := ulid.Make().String()
|
||||||
|
if err := st.CreateUser(ctx, User{
|
||||||
|
ID: userID, Username: "ackuser", PasswordHash: "x",
|
||||||
|
Role: RoleOperator, CreatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("create user: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", time.Now().UTC())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("raise: %v", err)
|
||||||
|
}
|
||||||
|
if err := st.Acknowledge(ctx, id, userID, time.Now().UTC()); err != nil {
|
||||||
|
t.Fatalf("ack: %v", err)
|
||||||
|
}
|
||||||
|
got, err := st.GetAlert(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get: %v", err)
|
||||||
|
}
|
||||||
|
if got.AcknowledgedAt == nil {
|
||||||
|
t.Errorf("acknowledged_at not set")
|
||||||
|
}
|
||||||
|
if got.AcknowledgedBy == nil || *got.AcknowledgedBy != userID {
|
||||||
|
t.Errorf("acknowledged_by: got %v want %q", got.AcknowledgedBy, userID)
|
||||||
|
}
|
||||||
|
if got.ResolvedAt != nil {
|
||||||
|
t.Errorf("ack must not set resolved_at; got %v", got.ResolvedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAutoResolveClearsOpenAlerts(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
st, hostID := newTestStoreWithHost(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t0 := time.Now().UTC()
|
||||||
|
id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", t0)
|
||||||
|
if err := st.AutoResolve(ctx, hostID, "backup_failed", t0.Add(time.Minute)); err != nil {
|
||||||
|
t.Fatalf("auto-resolve: %v", err)
|
||||||
|
}
|
||||||
|
got, _ := st.GetAlert(ctx, id)
|
||||||
|
if got.ResolvedAt == nil {
|
||||||
|
t.Errorf("expected resolved_at set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListAlertsFilters(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
st, hostID := newTestStoreWithHost(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
t0 := time.Now().UTC()
|
||||||
|
|
||||||
|
// One open warning + one resolved info.
|
||||||
|
_, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "open", t0)
|
||||||
|
id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "info", "done", t0)
|
||||||
|
_ = st.Resolve(ctx, id2, t0.Add(time.Minute))
|
||||||
|
|
||||||
|
open, err := st.ListAlerts(ctx, AlertFilter{Status: "open"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("list open: %v", err)
|
||||||
|
}
|
||||||
|
if len(open) != 1 || open[0].Severity != "warning" {
|
||||||
|
t.Errorf("open filter: got %+v", open)
|
||||||
|
}
|
||||||
|
|
||||||
|
all, err := st.ListAlerts(ctx, AlertFilter{Status: "all"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("list all: %v", err)
|
||||||
|
}
|
||||||
|
if len(all) != 2 {
|
||||||
|
t.Errorf("all filter: got %d, want 2", len(all))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -193,6 +193,20 @@ type EnrollmentToken struct {
|
|||||||
ExpiresAt time.Time
|
ExpiresAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Alert mirrors the alerts table.
|
||||||
|
type Alert struct {
|
||||||
|
ID string
|
||||||
|
HostID *string
|
||||||
|
Kind string
|
||||||
|
Severity string
|
||||||
|
Message string
|
||||||
|
CreatedAt time.Time
|
||||||
|
LastSeenAt *time.Time
|
||||||
|
AcknowledgedAt *time.Time
|
||||||
|
AcknowledgedBy *string
|
||||||
|
ResolvedAt *time.Time
|
||||||
|
}
|
||||||
|
|
||||||
// AuditEntry mirrors the audit_log table.
|
// AuditEntry mirrors the audit_log table.
|
||||||
type AuditEntry struct {
|
type AuditEntry struct {
|
||||||
ID string
|
ID string
|
||||||
|
|||||||
Reference in New Issue
Block a user