store: notification_channels CRUD + AppendNotificationLog
This commit is contained in:
@@ -0,0 +1,208 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NotificationChannel mirrors a row in notification_channels. The
|
||||||
|
// Config field is the AEAD-encrypted JSON blob; callers (in the
|
||||||
|
// notification package) decrypt before use.
|
||||||
|
type NotificationChannel struct {
|
||||||
|
ID string
|
||||||
|
Kind string // "webhook" | "ntfy" | "smtp"
|
||||||
|
Name string
|
||||||
|
Enabled bool
|
||||||
|
Config []byte // AEAD ciphertext; opaque at this layer
|
||||||
|
DefaultPriority *string
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
LastFiredAt *time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotificationLogEntry is one row in notification_log.
|
||||||
|
type NotificationLogEntry struct {
|
||||||
|
ID string
|
||||||
|
ChannelID string
|
||||||
|
AlertID *string
|
||||||
|
Event string // alert.raised | alert.acknowledged | alert.resolved | alert.test
|
||||||
|
OK bool
|
||||||
|
StatusCode *int
|
||||||
|
LatencyMS *int
|
||||||
|
Error *string
|
||||||
|
FiredAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateNotificationChannel inserts a new notification channel row.
|
||||||
|
func (s *Store) CreateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
|
||||||
|
enabled := 0
|
||||||
|
if ch.Enabled {
|
||||||
|
enabled = 1
|
||||||
|
}
|
||||||
|
_, err := s.db.ExecContext(ctx,
|
||||||
|
`INSERT INTO notification_channels
|
||||||
|
(id, kind, name, enabled, config, default_priority, created_at, updated_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
|
ch.ID, ch.Kind, ch.Name, enabled, ch.Config,
|
||||||
|
nullable(ch.DefaultPriority),
|
||||||
|
ch.CreatedAt.UTC().Format(time.RFC3339Nano),
|
||||||
|
ch.UpdatedAt.UTC().Format(time.RFC3339Nano))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: create channel: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateNotificationChannel updates mutable fields on an existing channel row.
|
||||||
|
func (s *Store) UpdateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
|
||||||
|
enabled := 0
|
||||||
|
if ch.Enabled {
|
||||||
|
enabled = 1
|
||||||
|
}
|
||||||
|
_, err := s.db.ExecContext(ctx,
|
||||||
|
`UPDATE notification_channels
|
||||||
|
SET kind = ?, name = ?, enabled = ?, config = ?,
|
||||||
|
default_priority = ?, updated_at = ?
|
||||||
|
WHERE id = ?`,
|
||||||
|
ch.Kind, ch.Name, enabled, ch.Config,
|
||||||
|
nullable(ch.DefaultPriority),
|
||||||
|
ch.UpdatedAt.UTC().Format(time.RFC3339Nano),
|
||||||
|
ch.ID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: update channel: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteNotificationChannel removes a channel row; cascades to notification_log.
|
||||||
|
func (s *Store) DeleteNotificationChannel(ctx context.Context, id string) error {
|
||||||
|
_, err := s.db.ExecContext(ctx,
|
||||||
|
`DELETE FROM notification_channels WHERE id = ?`, id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: delete channel: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNotificationChannel returns one channel by primary key or ErrNotFound.
|
||||||
|
func (s *Store) GetNotificationChannel(ctx context.Context, id string) (*NotificationChannel, error) {
|
||||||
|
row := s.db.QueryRowContext(ctx,
|
||||||
|
`SELECT id, kind, name, enabled, config, default_priority,
|
||||||
|
created_at, updated_at, last_fired_at
|
||||||
|
FROM notification_channels WHERE id = ?`, id)
|
||||||
|
return scanChannel(row.Scan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListNotificationChannels returns all channels ordered by created_at ascending.
|
||||||
|
func (s *Store) ListNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
|
||||||
|
rows, err := s.db.QueryContext(ctx,
|
||||||
|
`SELECT id, kind, name, enabled, config, default_priority,
|
||||||
|
created_at, updated_at, last_fired_at
|
||||||
|
FROM notification_channels ORDER BY created_at ASC`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: list channels: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
var out []NotificationChannel
|
||||||
|
for rows.Next() {
|
||||||
|
c, err := scanChannel(rows.Scan)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, *c)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListEnabledNotificationChannels returns only channels with enabled=1, ordered by created_at.
|
||||||
|
func (s *Store) ListEnabledNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
|
||||||
|
rows, err := s.db.QueryContext(ctx,
|
||||||
|
`SELECT id, kind, name, enabled, config, default_priority,
|
||||||
|
created_at, updated_at, last_fired_at
|
||||||
|
FROM notification_channels WHERE enabled = 1 ORDER BY created_at ASC`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: list enabled: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
var out []NotificationChannel
|
||||||
|
for rows.Next() {
|
||||||
|
c, err := scanChannel(rows.Scan)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, *c)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendNotificationLog records a delivery attempt + bumps the
|
||||||
|
// channel's last_fired_at on success.
|
||||||
|
func (s *Store) AppendNotificationLog(ctx context.Context, e NotificationLogEntry) error {
|
||||||
|
tx, err := s.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: begin: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
ok := 0
|
||||||
|
if e.OK {
|
||||||
|
ok = 1
|
||||||
|
}
|
||||||
|
_, err = tx.ExecContext(ctx,
|
||||||
|
`INSERT INTO notification_log
|
||||||
|
(id, channel_id, alert_id, event, ok, status_code, latency_ms, error, fired_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
|
e.ID, e.ChannelID, nullable(e.AlertID), e.Event, ok,
|
||||||
|
nullableInt(e.StatusCode), nullableInt(e.LatencyMS),
|
||||||
|
nullable(e.Error),
|
||||||
|
e.FiredAt.UTC().Format(time.RFC3339Nano))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: append notification_log: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.OK {
|
||||||
|
if _, err := tx.ExecContext(ctx,
|
||||||
|
`UPDATE notification_channels SET last_fired_at = ? WHERE id = ?`,
|
||||||
|
e.FiredAt.UTC().Format(time.RFC3339Nano), e.ChannelID); err != nil {
|
||||||
|
return fmt.Errorf("store: bump last_fired_at: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func scanChannel(scan func(...any) error) (*NotificationChannel, error) {
|
||||||
|
var c NotificationChannel
|
||||||
|
var enabled int
|
||||||
|
var defaultPri, lastFired sql.NullString
|
||||||
|
var createdAt, updatedAt string
|
||||||
|
if err := scan(&c.ID, &c.Kind, &c.Name, &enabled, &c.Config,
|
||||||
|
&defaultPri, &createdAt, &updatedAt, &lastFired); err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("store: scan channel: %w", err)
|
||||||
|
}
|
||||||
|
c.Enabled = enabled == 1
|
||||||
|
if defaultPri.Valid {
|
||||||
|
v := defaultPri.String
|
||||||
|
c.DefaultPriority = &v
|
||||||
|
}
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, createdAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: parse created_at: %w", err)
|
||||||
|
}
|
||||||
|
c.CreatedAt = t
|
||||||
|
t, err = time.Parse(time.RFC3339Nano, updatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: parse updated_at: %w", err)
|
||||||
|
}
|
||||||
|
c.UpdatedAt = t
|
||||||
|
if lastFired.Valid {
|
||||||
|
t, _ := time.Parse(time.RFC3339Nano, lastFired.String)
|
||||||
|
c.LastFiredAt = &t
|
||||||
|
}
|
||||||
|
return &c, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNotificationChannelCRUD(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir := t.TempDir()
|
||||||
|
st, err := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open: %v", err)
|
||||||
|
}
|
||||||
|
defer st.Close()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
ch := NotificationChannel{
|
||||||
|
ID: ulid.Make().String(), Kind: "webhook", Name: "team-slack",
|
||||||
|
Enabled: true, Config: []byte("encrypted-blob"),
|
||||||
|
CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(),
|
||||||
|
}
|
||||||
|
if err := st.CreateNotificationChannel(ctx, ch); err != nil {
|
||||||
|
t.Fatalf("create: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := st.GetNotificationChannel(ctx, ch.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get: %v", err)
|
||||||
|
}
|
||||||
|
if got.Name != ch.Name || got.Kind != "webhook" || string(got.Config) != "encrypted-blob" {
|
||||||
|
t.Fatalf("got %+v", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
got.Name = "team-slack-renamed"
|
||||||
|
got.Enabled = false
|
||||||
|
got.UpdatedAt = time.Now().UTC()
|
||||||
|
if err := st.UpdateNotificationChannel(ctx, *got); err != nil {
|
||||||
|
t.Fatalf("update: %v", err)
|
||||||
|
}
|
||||||
|
got2, _ := st.GetNotificationChannel(ctx, ch.ID)
|
||||||
|
if got2.Name != "team-slack-renamed" || got2.Enabled {
|
||||||
|
t.Fatalf("update not applied: %+v", got2)
|
||||||
|
}
|
||||||
|
|
||||||
|
all, _ := st.ListEnabledNotificationChannels(ctx)
|
||||||
|
if len(all) != 0 {
|
||||||
|
t.Errorf("disabled channel returned by ListEnabled: %d", len(all))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := st.DeleteNotificationChannel(ctx, ch.ID); err != nil {
|
||||||
|
t.Fatalf("delete: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := st.GetNotificationChannel(ctx, ch.ID); err == nil {
|
||||||
|
t.Errorf("expected ErrNotFound after delete")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAppendNotificationLog(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir := t.TempDir()
|
||||||
|
st, _ := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||||||
|
defer st.Close()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
chID := ulid.Make().String()
|
||||||
|
if err := st.CreateNotificationChannel(ctx, NotificationChannel{
|
||||||
|
ID: chID, Kind: "ntfy", Name: "n", Enabled: true,
|
||||||
|
Config: []byte{1, 2, 3},
|
||||||
|
CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("create channel: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
code := 200
|
||||||
|
lat := 287
|
||||||
|
if err := st.AppendNotificationLog(ctx, NotificationLogEntry{
|
||||||
|
ID: ulid.Make().String(), ChannelID: chID, Event: "alert.test",
|
||||||
|
OK: true, StatusCode: &code, LatencyMS: &lat,
|
||||||
|
FiredAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("append: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastFiredAt projection: the channel's last_fired_at is updated
|
||||||
|
// either by the append helper or by the callers; if you choose the
|
||||||
|
// helper does the bump, assert it.
|
||||||
|
got, _ := st.GetNotificationChannel(ctx, chID)
|
||||||
|
if got.LastFiredAt == nil {
|
||||||
|
t.Errorf("last_fired_at should bump on AppendNotificationLog success")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user