package store import ( "context" "database/sql" "errors" "fmt" "strings" "time" "github.com/oklog/ulid/v2" ) // AlertFilter narrows ListAlerts. type AlertFilter struct { Status string // "open" | "acknowledged" | "resolved" | "all" | "" Severity string // "info" | "warning" | "critical" | "" HostID string // empty = any host Search string // substring match on message Limit int // 0 = no limit } // RaiseOrTouch implements the dedup + last_seen_at bump pattern. If // an alert with (host_id, kind, dedup_key, resolved_at IS NULL) // already exists, it touches last_seen_at + message and returns // (id, false). Otherwise inserts a fresh row and returns (id, true). // Caller fires a notification only when didRaise=true. // // dedupKey is the source-group id for backup/forget/prune/check // failures (so two failing groups on the same host produce two open // alerts), and the empty string for one-per-host alerts like // agent_offline / stale_schedule. func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, dedupKey, severity, message string, when time.Time) (id string, didRaise bool, err error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return "", false, fmt.Errorf("store: begin: %w", err) } defer func() { _ = tx.Rollback() }() row := tx.QueryRowContext(ctx, `SELECT id FROM alerts WHERE host_id = ? AND kind = ? AND dedup_key = ? AND resolved_at IS NULL LIMIT 1`, hostID, kind, dedupKey) var existing string switch err := row.Scan(&existing); { case err == nil: _, uerr := tx.ExecContext(ctx, `UPDATE alerts SET last_seen_at = ?, message = ? WHERE id = ?`, when.UTC().Format(time.RFC3339Nano), message, existing) if uerr != nil { return "", false, fmt.Errorf("store: touch alert: %w", uerr) } if err := tx.Commit(); err != nil { return "", false, err } return existing, false, nil case errors.Is(err, sql.ErrNoRows): // fall through to insert default: return "", false, fmt.Errorf("store: lookup alert: %w", err) } id = ulid.Make().String() whenStr := when.UTC().Format(time.RFC3339Nano) _, err = tx.ExecContext(ctx, `INSERT INTO alerts (id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, id, hostID, kind, dedupKey, severity, message, whenStr, whenStr) if err != nil { return "", false, fmt.Errorf("store: insert alert: %w", err) } if err := tx.Commit(); err != nil { return "", false, err } _ = s.refreshHostOpenAlertCount(ctx, s.db, hostID) return id, true, nil } // refreshHostOpenAlertCount recomputes hosts.open_alert_count from the // alerts table for one host. Self-healing: idempotent and survives // out-of-order edits. Best-effort — errors are returned but callers // generally discard them since the projection is non-critical. func (s *Store) refreshHostOpenAlertCount(ctx context.Context, exec interface { ExecContext(context.Context, string, ...any) (sql.Result, error) }, hostID string, ) error { if hostID == "" { return nil } _, err := exec.ExecContext(ctx, `UPDATE hosts SET open_alert_count = ( SELECT COUNT(*) FROM alerts WHERE host_id = ? AND resolved_at IS NULL ) WHERE id = ?`, hostID, hostID) if err != nil { return fmt.Errorf("store: refresh open_alert_count: %w", err) } return nil } // Acknowledge sets acknowledged_at + acknowledged_by; does NOT set // resolved_at. Idempotent — re-acknowledging just refreshes the timestamp. func (s *Store) Acknowledge(ctx context.Context, id, userID string, when time.Time) error { res, err := s.db.ExecContext(ctx, `UPDATE alerts SET acknowledged_at = ?, acknowledged_by = ? WHERE id = ? AND resolved_at IS NULL`, when.UTC().Format(time.RFC3339Nano), userID, id) if err != nil { return fmt.Errorf("store: ack alert: %w", err) } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } // Resolve marks the alert resolved. Idempotent on already-resolved rows // (no-op). func (s *Store) Resolve(ctx context.Context, id string, when time.Time) error { var hostID sql.NullString _ = s.db.QueryRowContext(ctx, `SELECT host_id FROM alerts WHERE id = ?`, id).Scan(&hostID) _, err := s.db.ExecContext(ctx, `UPDATE alerts SET resolved_at = ? WHERE id = ? AND resolved_at IS NULL`, when.UTC().Format(time.RFC3339Nano), id) if err != nil { return fmt.Errorf("store: resolve alert: %w", err) } if hostID.Valid { _ = s.refreshHostOpenAlertCount(ctx, s.db, hostID.String) } return nil } // AutoResolve closes the open alert for (host_id, kind, dedup_key). // Used by the engine when a rule's underlying condition clears (e.g. // next backup succeeded for the same source group so backup_failed // clears). Pass dedupKey="" for one-per-host alerts (agent_offline). // // Closes only the dedup-key-matching row, not every open alert of // the same kind on the host — distinct source groups now have // distinct rows and a recovery in one shouldn't auto-resolve the // others. func (s *Store) AutoResolve(ctx context.Context, hostID, kind, dedupKey string, when time.Time) error { _, err := s.db.ExecContext(ctx, `UPDATE alerts SET resolved_at = ? WHERE host_id = ? AND kind = ? AND dedup_key = ? AND resolved_at IS NULL`, when.UTC().Format(time.RFC3339Nano), hostID, kind, dedupKey) if err != nil { return fmt.Errorf("store: auto-resolve: %w", err) } _ = s.refreshHostOpenAlertCount(ctx, s.db, hostID) return nil } // GetAlert reads one row. func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) { row := s.db.QueryRowContext(ctx, `SELECT id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at, acknowledged_at, acknowledged_by, resolved_at FROM alerts WHERE id = ?`, id) return scanAlert(row.Scan) } // ListAlerts is the filtered list. Sort: open-first, then by created_at desc. func (s *Store) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) { q := `SELECT id, host_id, kind, dedup_key, severity, message, created_at, last_seen_at, acknowledged_at, acknowledged_by, resolved_at FROM alerts` conds := []string{} args := []any{} switch f.Status { case "open": conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NULL") case "acknowledged": conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NOT NULL") case "resolved": conds = append(conds, "resolved_at IS NOT NULL") case "all", "": // no-op } if f.Severity != "" { conds = append(conds, "severity = ?") args = append(args, f.Severity) } if f.HostID != "" { conds = append(conds, "host_id = ?") args = append(args, f.HostID) } if f.Search != "" { conds = append(conds, "message LIKE ?") args = append(args, "%"+f.Search+"%") } if len(conds) > 0 { q += " WHERE " + strings.Join(conds, " AND ") } q += ` ORDER BY (resolved_at IS NULL) DESC, created_at DESC` if f.Limit > 0 { q += ` LIMIT ?` args = append(args, f.Limit) } rows, err := s.db.QueryContext(ctx, q, args...) if err != nil { return nil, fmt.Errorf("store: list alerts: %w", err) } defer func() { _ = rows.Close() }() var out []Alert for rows.Next() { a, err := scanAlert(rows.Scan) if err != nil { return nil, err } out = append(out, *a) } return out, rows.Err() } // scanAlert centralises the column read so the GetAlert and // ListAlerts paths agree on column order. Pass row.Scan or rows.Scan. func scanAlert(scan func(...any) error) (*Alert, error) { var a Alert var hostID, lastSeen, ackedAt, ackedBy, resolvedAt sql.NullString var createdAt string if err := scan(&a.ID, &hostID, &a.Kind, &a.DedupKey, &a.Severity, &a.Message, &createdAt, &lastSeen, &ackedAt, &ackedBy, &resolvedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound } return nil, fmt.Errorf("store: scan alert: %w", err) } if hostID.Valid { v := hostID.String a.HostID = &v } t, err := time.Parse(time.RFC3339Nano, createdAt) if err != nil { return nil, fmt.Errorf("store: parse created_at: %w", err) } a.CreatedAt = t if lastSeen.Valid { t, _ := time.Parse(time.RFC3339Nano, lastSeen.String) a.LastSeenAt = &t } if ackedAt.Valid { t, _ := time.Parse(time.RFC3339Nano, ackedAt.String) a.AcknowledgedAt = &t } if ackedBy.Valid { v := ackedBy.String a.AcknowledgedBy = &v } if resolvedAt.Valid { t, _ := time.Parse(time.RFC3339Nano, resolvedAt.String) a.ResolvedAt = &t } return &a, nil }