Files
restic-manager/internal/store/notification_channels.go
T

209 lines
6.3 KiB
Go

package store
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
)
// NotificationChannel mirrors a row in notification_channels. The
// Config field is the AEAD-encrypted JSON blob; callers (in the
// notification package) decrypt before use.
type NotificationChannel struct {
ID string
Kind string // "webhook" | "ntfy" | "smtp"
Name string
Enabled bool
Config []byte // AEAD ciphertext; opaque at this layer
DefaultPriority *string
CreatedAt time.Time
UpdatedAt time.Time
LastFiredAt *time.Time
}
// NotificationLogEntry is one row in notification_log.
type NotificationLogEntry struct {
ID string
ChannelID string
AlertID *string
Event string // alert.raised | alert.acknowledged | alert.resolved | alert.test
OK bool
StatusCode *int
LatencyMS *int
Error *string
FiredAt time.Time
}
// CreateNotificationChannel inserts a new notification channel row.
func (s *Store) CreateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
enabled := 0
if ch.Enabled {
enabled = 1
}
_, err := s.db.ExecContext(ctx,
`INSERT INTO notification_channels
(id, kind, name, enabled, config, default_priority, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
ch.ID, ch.Kind, ch.Name, enabled, ch.Config,
nullable(ch.DefaultPriority),
ch.CreatedAt.UTC().Format(time.RFC3339Nano),
ch.UpdatedAt.UTC().Format(time.RFC3339Nano))
if err != nil {
return fmt.Errorf("store: create channel: %w", err)
}
return nil
}
// UpdateNotificationChannel updates mutable fields on an existing channel row.
func (s *Store) UpdateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
enabled := 0
if ch.Enabled {
enabled = 1
}
_, err := s.db.ExecContext(ctx,
`UPDATE notification_channels
SET kind = ?, name = ?, enabled = ?, config = ?,
default_priority = ?, updated_at = ?
WHERE id = ?`,
ch.Kind, ch.Name, enabled, ch.Config,
nullable(ch.DefaultPriority),
ch.UpdatedAt.UTC().Format(time.RFC3339Nano),
ch.ID)
if err != nil {
return fmt.Errorf("store: update channel: %w", err)
}
return nil
}
// DeleteNotificationChannel removes a channel row; cascades to notification_log.
func (s *Store) DeleteNotificationChannel(ctx context.Context, id string) error {
_, err := s.db.ExecContext(ctx,
`DELETE FROM notification_channels WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("store: delete channel: %w", err)
}
return nil
}
// GetNotificationChannel returns one channel by primary key or ErrNotFound.
func (s *Store) GetNotificationChannel(ctx context.Context, id string) (*NotificationChannel, error) {
row := s.db.QueryRowContext(ctx,
`SELECT id, kind, name, enabled, config, default_priority,
created_at, updated_at, last_fired_at
FROM notification_channels WHERE id = ?`, id)
return scanChannel(row.Scan)
}
// ListNotificationChannels returns all channels ordered by created_at ascending.
func (s *Store) ListNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
rows, err := s.db.QueryContext(ctx,
`SELECT id, kind, name, enabled, config, default_priority,
created_at, updated_at, last_fired_at
FROM notification_channels ORDER BY created_at ASC`)
if err != nil {
return nil, fmt.Errorf("store: list channels: %w", err)
}
defer func() { _ = rows.Close() }()
var out []NotificationChannel
for rows.Next() {
c, err := scanChannel(rows.Scan)
if err != nil {
return nil, err
}
out = append(out, *c)
}
return out, rows.Err()
}
// ListEnabledNotificationChannels returns only channels with enabled=1, ordered by created_at.
func (s *Store) ListEnabledNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
rows, err := s.db.QueryContext(ctx,
`SELECT id, kind, name, enabled, config, default_priority,
created_at, updated_at, last_fired_at
FROM notification_channels WHERE enabled = 1 ORDER BY created_at ASC`)
if err != nil {
return nil, fmt.Errorf("store: list enabled: %w", err)
}
defer func() { _ = rows.Close() }()
var out []NotificationChannel
for rows.Next() {
c, err := scanChannel(rows.Scan)
if err != nil {
return nil, err
}
out = append(out, *c)
}
return out, rows.Err()
}
// AppendNotificationLog records a delivery attempt + bumps the
// channel's last_fired_at on success.
func (s *Store) AppendNotificationLog(ctx context.Context, e NotificationLogEntry) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("store: begin: %w", err)
}
defer func() { _ = tx.Rollback() }()
ok := 0
if e.OK {
ok = 1
}
_, err = tx.ExecContext(ctx,
`INSERT INTO notification_log
(id, channel_id, alert_id, event, ok, status_code, latency_ms, error, fired_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.ChannelID, nullable(e.AlertID), e.Event, ok,
nullableInt(e.StatusCode), nullableInt(e.LatencyMS),
nullable(e.Error),
e.FiredAt.UTC().Format(time.RFC3339Nano))
if err != nil {
return fmt.Errorf("store: append notification_log: %w", err)
}
if e.OK {
if _, err := tx.ExecContext(ctx,
`UPDATE notification_channels SET last_fired_at = ? WHERE id = ?`,
e.FiredAt.UTC().Format(time.RFC3339Nano), e.ChannelID); err != nil {
return fmt.Errorf("store: bump last_fired_at: %w", err)
}
}
return tx.Commit()
}
func scanChannel(scan func(...any) error) (*NotificationChannel, error) {
var c NotificationChannel
var enabled int
var defaultPri, lastFired sql.NullString
var createdAt, updatedAt string
if err := scan(&c.ID, &c.Kind, &c.Name, &enabled, &c.Config,
&defaultPri, &createdAt, &updatedAt, &lastFired); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("store: scan channel: %w", err)
}
c.Enabled = enabled == 1
if defaultPri.Valid {
v := defaultPri.String
c.DefaultPriority = &v
}
t, err := time.Parse(time.RFC3339Nano, createdAt)
if err != nil {
return nil, fmt.Errorf("store: parse created_at: %w", err)
}
c.CreatedAt = t
t, err = time.Parse(time.RFC3339Nano, updatedAt)
if err != nil {
return nil, fmt.Errorf("store: parse updated_at: %w", err)
}
c.UpdatedAt = t
if lastFired.Valid {
t, _ := time.Parse(time.RFC3339Nano, lastFired.String)
c.LastFiredAt = &t
}
return &c, nil
}