209 lines
6.3 KiB
Go
209 lines
6.3 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// NotificationChannel mirrors a row in notification_channels. The
|
|
// Config field is the AEAD-encrypted JSON blob; callers (in the
|
|
// notification package) decrypt before use.
|
|
type NotificationChannel struct {
|
|
ID string
|
|
Kind string // "webhook" | "ntfy" | "smtp"
|
|
Name string
|
|
Enabled bool
|
|
Config []byte // AEAD ciphertext; opaque at this layer
|
|
DefaultPriority *string
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
LastFiredAt *time.Time
|
|
}
|
|
|
|
// NotificationLogEntry is one row in notification_log.
|
|
type NotificationLogEntry struct {
|
|
ID string
|
|
ChannelID string
|
|
AlertID *string
|
|
Event string // alert.raised | alert.acknowledged | alert.resolved | alert.test
|
|
OK bool
|
|
StatusCode *int
|
|
LatencyMS *int
|
|
Error *string
|
|
FiredAt time.Time
|
|
}
|
|
|
|
// CreateNotificationChannel inserts a new notification channel row.
|
|
func (s *Store) CreateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
|
|
enabled := 0
|
|
if ch.Enabled {
|
|
enabled = 1
|
|
}
|
|
_, err := s.db.ExecContext(ctx,
|
|
`INSERT INTO notification_channels
|
|
(id, kind, name, enabled, config, default_priority, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
ch.ID, ch.Kind, ch.Name, enabled, ch.Config,
|
|
nullable(ch.DefaultPriority),
|
|
ch.CreatedAt.UTC().Format(time.RFC3339Nano),
|
|
ch.UpdatedAt.UTC().Format(time.RFC3339Nano))
|
|
if err != nil {
|
|
return fmt.Errorf("store: create channel: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateNotificationChannel updates mutable fields on an existing channel row.
|
|
func (s *Store) UpdateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
|
|
enabled := 0
|
|
if ch.Enabled {
|
|
enabled = 1
|
|
}
|
|
_, err := s.db.ExecContext(ctx,
|
|
`UPDATE notification_channels
|
|
SET kind = ?, name = ?, enabled = ?, config = ?,
|
|
default_priority = ?, updated_at = ?
|
|
WHERE id = ?`,
|
|
ch.Kind, ch.Name, enabled, ch.Config,
|
|
nullable(ch.DefaultPriority),
|
|
ch.UpdatedAt.UTC().Format(time.RFC3339Nano),
|
|
ch.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("store: update channel: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteNotificationChannel removes a channel row; cascades to notification_log.
|
|
func (s *Store) DeleteNotificationChannel(ctx context.Context, id string) error {
|
|
_, err := s.db.ExecContext(ctx,
|
|
`DELETE FROM notification_channels WHERE id = ?`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("store: delete channel: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetNotificationChannel returns one channel by primary key or ErrNotFound.
|
|
func (s *Store) GetNotificationChannel(ctx context.Context, id string) (*NotificationChannel, error) {
|
|
row := s.db.QueryRowContext(ctx,
|
|
`SELECT id, kind, name, enabled, config, default_priority,
|
|
created_at, updated_at, last_fired_at
|
|
FROM notification_channels WHERE id = ?`, id)
|
|
return scanChannel(row.Scan)
|
|
}
|
|
|
|
// ListNotificationChannels returns all channels ordered by created_at ascending.
|
|
func (s *Store) ListNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
|
|
rows, err := s.db.QueryContext(ctx,
|
|
`SELECT id, kind, name, enabled, config, default_priority,
|
|
created_at, updated_at, last_fired_at
|
|
FROM notification_channels ORDER BY created_at ASC`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("store: list channels: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
var out []NotificationChannel
|
|
for rows.Next() {
|
|
c, err := scanChannel(rows.Scan)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, *c)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// ListEnabledNotificationChannels returns only channels with enabled=1, ordered by created_at.
|
|
func (s *Store) ListEnabledNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
|
|
rows, err := s.db.QueryContext(ctx,
|
|
`SELECT id, kind, name, enabled, config, default_priority,
|
|
created_at, updated_at, last_fired_at
|
|
FROM notification_channels WHERE enabled = 1 ORDER BY created_at ASC`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("store: list enabled: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
var out []NotificationChannel
|
|
for rows.Next() {
|
|
c, err := scanChannel(rows.Scan)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, *c)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// AppendNotificationLog records a delivery attempt + bumps the
|
|
// channel's last_fired_at on success.
|
|
func (s *Store) AppendNotificationLog(ctx context.Context, e NotificationLogEntry) error {
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("store: begin: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
ok := 0
|
|
if e.OK {
|
|
ok = 1
|
|
}
|
|
_, err = tx.ExecContext(ctx,
|
|
`INSERT INTO notification_log
|
|
(id, channel_id, alert_id, event, ok, status_code, latency_ms, error, fired_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
e.ID, e.ChannelID, nullable(e.AlertID), e.Event, ok,
|
|
nullableInt(e.StatusCode), nullableInt(e.LatencyMS),
|
|
nullable(e.Error),
|
|
e.FiredAt.UTC().Format(time.RFC3339Nano))
|
|
if err != nil {
|
|
return fmt.Errorf("store: append notification_log: %w", err)
|
|
}
|
|
|
|
if e.OK {
|
|
if _, err := tx.ExecContext(ctx,
|
|
`UPDATE notification_channels SET last_fired_at = ? WHERE id = ?`,
|
|
e.FiredAt.UTC().Format(time.RFC3339Nano), e.ChannelID); err != nil {
|
|
return fmt.Errorf("store: bump last_fired_at: %w", err)
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func scanChannel(scan func(...any) error) (*NotificationChannel, error) {
|
|
var c NotificationChannel
|
|
var enabled int
|
|
var defaultPri, lastFired sql.NullString
|
|
var createdAt, updatedAt string
|
|
if err := scan(&c.ID, &c.Kind, &c.Name, &enabled, &c.Config,
|
|
&defaultPri, &createdAt, &updatedAt, &lastFired); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, fmt.Errorf("store: scan channel: %w", err)
|
|
}
|
|
c.Enabled = enabled == 1
|
|
if defaultPri.Valid {
|
|
v := defaultPri.String
|
|
c.DefaultPriority = &v
|
|
}
|
|
t, err := time.Parse(time.RFC3339Nano, createdAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("store: parse created_at: %w", err)
|
|
}
|
|
c.CreatedAt = t
|
|
t, err = time.Parse(time.RFC3339Nano, updatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("store: parse updated_at: %w", err)
|
|
}
|
|
c.UpdatedAt = t
|
|
if lastFired.Valid {
|
|
t, _ := time.Parse(time.RFC3339Nano, lastFired.String)
|
|
c.LastFiredAt = &t
|
|
}
|
|
return &c, nil
|
|
}
|