From 69fc89143d555b948addc267055488f4ba7f1634 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 4 May 2026 19:28:41 +0100 Subject: [PATCH] store: notification_channels CRUD + AppendNotificationLog --- internal/store/notification_channels.go | 208 +++++++++++++++++++ internal/store/notification_channels_test.go | 96 +++++++++ 2 files changed, 304 insertions(+) create mode 100644 internal/store/notification_channels.go create mode 100644 internal/store/notification_channels_test.go diff --git a/internal/store/notification_channels.go b/internal/store/notification_channels.go new file mode 100644 index 0000000..9b3176d --- /dev/null +++ b/internal/store/notification_channels.go @@ -0,0 +1,208 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" +) + +// NotificationChannel mirrors a row in notification_channels. The +// Config field is the AEAD-encrypted JSON blob; callers (in the +// notification package) decrypt before use. +type NotificationChannel struct { + ID string + Kind string // "webhook" | "ntfy" | "smtp" + Name string + Enabled bool + Config []byte // AEAD ciphertext; opaque at this layer + DefaultPriority *string + CreatedAt time.Time + UpdatedAt time.Time + LastFiredAt *time.Time +} + +// NotificationLogEntry is one row in notification_log. +type NotificationLogEntry struct { + ID string + ChannelID string + AlertID *string + Event string // alert.raised | alert.acknowledged | alert.resolved | alert.test + OK bool + StatusCode *int + LatencyMS *int + Error *string + FiredAt time.Time +} + +// CreateNotificationChannel inserts a new notification channel row. +func (s *Store) CreateNotificationChannel(ctx context.Context, ch NotificationChannel) error { + enabled := 0 + if ch.Enabled { + enabled = 1 + } + _, err := s.db.ExecContext(ctx, + `INSERT INTO notification_channels + (id, kind, name, enabled, config, default_priority, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + ch.ID, ch.Kind, ch.Name, enabled, ch.Config, + nullable(ch.DefaultPriority), + ch.CreatedAt.UTC().Format(time.RFC3339Nano), + ch.UpdatedAt.UTC().Format(time.RFC3339Nano)) + if err != nil { + return fmt.Errorf("store: create channel: %w", err) + } + return nil +} + +// UpdateNotificationChannel updates mutable fields on an existing channel row. +func (s *Store) UpdateNotificationChannel(ctx context.Context, ch NotificationChannel) error { + enabled := 0 + if ch.Enabled { + enabled = 1 + } + _, err := s.db.ExecContext(ctx, + `UPDATE notification_channels + SET kind = ?, name = ?, enabled = ?, config = ?, + default_priority = ?, updated_at = ? + WHERE id = ?`, + ch.Kind, ch.Name, enabled, ch.Config, + nullable(ch.DefaultPriority), + ch.UpdatedAt.UTC().Format(time.RFC3339Nano), + ch.ID) + if err != nil { + return fmt.Errorf("store: update channel: %w", err) + } + return nil +} + +// DeleteNotificationChannel removes a channel row; cascades to notification_log. +func (s *Store) DeleteNotificationChannel(ctx context.Context, id string) error { + _, err := s.db.ExecContext(ctx, + `DELETE FROM notification_channels WHERE id = ?`, id) + if err != nil { + return fmt.Errorf("store: delete channel: %w", err) + } + return nil +} + +// GetNotificationChannel returns one channel by primary key or ErrNotFound. +func (s *Store) GetNotificationChannel(ctx context.Context, id string) (*NotificationChannel, error) { + row := s.db.QueryRowContext(ctx, + `SELECT id, kind, name, enabled, config, default_priority, + created_at, updated_at, last_fired_at + FROM notification_channels WHERE id = ?`, id) + return scanChannel(row.Scan) +} + +// ListNotificationChannels returns all channels ordered by created_at ascending. +func (s *Store) ListNotificationChannels(ctx context.Context) ([]NotificationChannel, error) { + rows, err := s.db.QueryContext(ctx, + `SELECT id, kind, name, enabled, config, default_priority, + created_at, updated_at, last_fired_at + FROM notification_channels ORDER BY created_at ASC`) + if err != nil { + return nil, fmt.Errorf("store: list channels: %w", err) + } + defer func() { _ = rows.Close() }() + var out []NotificationChannel + for rows.Next() { + c, err := scanChannel(rows.Scan) + if err != nil { + return nil, err + } + out = append(out, *c) + } + return out, rows.Err() +} + +// ListEnabledNotificationChannels returns only channels with enabled=1, ordered by created_at. +func (s *Store) ListEnabledNotificationChannels(ctx context.Context) ([]NotificationChannel, error) { + rows, err := s.db.QueryContext(ctx, + `SELECT id, kind, name, enabled, config, default_priority, + created_at, updated_at, last_fired_at + FROM notification_channels WHERE enabled = 1 ORDER BY created_at ASC`) + if err != nil { + return nil, fmt.Errorf("store: list enabled: %w", err) + } + defer func() { _ = rows.Close() }() + var out []NotificationChannel + for rows.Next() { + c, err := scanChannel(rows.Scan) + if err != nil { + return nil, err + } + out = append(out, *c) + } + return out, rows.Err() +} + +// AppendNotificationLog records a delivery attempt + bumps the +// channel's last_fired_at on success. +func (s *Store) AppendNotificationLog(ctx context.Context, e NotificationLogEntry) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin: %w", err) + } + defer func() { _ = tx.Rollback() }() + + ok := 0 + if e.OK { + ok = 1 + } + _, err = tx.ExecContext(ctx, + `INSERT INTO notification_log + (id, channel_id, alert_id, event, ok, status_code, latency_ms, error, fired_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + e.ID, e.ChannelID, nullable(e.AlertID), e.Event, ok, + nullableInt(e.StatusCode), nullableInt(e.LatencyMS), + nullable(e.Error), + e.FiredAt.UTC().Format(time.RFC3339Nano)) + if err != nil { + return fmt.Errorf("store: append notification_log: %w", err) + } + + if e.OK { + if _, err := tx.ExecContext(ctx, + `UPDATE notification_channels SET last_fired_at = ? WHERE id = ?`, + e.FiredAt.UTC().Format(time.RFC3339Nano), e.ChannelID); err != nil { + return fmt.Errorf("store: bump last_fired_at: %w", err) + } + } + return tx.Commit() +} + +func scanChannel(scan func(...any) error) (*NotificationChannel, error) { + var c NotificationChannel + var enabled int + var defaultPri, lastFired sql.NullString + var createdAt, updatedAt string + if err := scan(&c.ID, &c.Kind, &c.Name, &enabled, &c.Config, + &defaultPri, &createdAt, &updatedAt, &lastFired); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("store: scan channel: %w", err) + } + c.Enabled = enabled == 1 + if defaultPri.Valid { + v := defaultPri.String + c.DefaultPriority = &v + } + t, err := time.Parse(time.RFC3339Nano, createdAt) + if err != nil { + return nil, fmt.Errorf("store: parse created_at: %w", err) + } + c.CreatedAt = t + t, err = time.Parse(time.RFC3339Nano, updatedAt) + if err != nil { + return nil, fmt.Errorf("store: parse updated_at: %w", err) + } + c.UpdatedAt = t + if lastFired.Valid { + t, _ := time.Parse(time.RFC3339Nano, lastFired.String) + c.LastFiredAt = &t + } + return &c, nil +} diff --git a/internal/store/notification_channels_test.go b/internal/store/notification_channels_test.go new file mode 100644 index 0000000..412d14c --- /dev/null +++ b/internal/store/notification_channels_test.go @@ -0,0 +1,96 @@ +package store + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/oklog/ulid/v2" +) + +func TestNotificationChannelCRUD(t *testing.T) { + t.Parallel() + dir := t.TempDir() + st, err := Open(context.Background(), filepath.Join(dir, "rm.db")) + if err != nil { + t.Fatalf("open: %v", err) + } + defer st.Close() + ctx := context.Background() + + ch := NotificationChannel{ + ID: ulid.Make().String(), Kind: "webhook", Name: "team-slack", + Enabled: true, Config: []byte("encrypted-blob"), + CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), + } + if err := st.CreateNotificationChannel(ctx, ch); err != nil { + t.Fatalf("create: %v", err) + } + + got, err := st.GetNotificationChannel(ctx, ch.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.Name != ch.Name || got.Kind != "webhook" || string(got.Config) != "encrypted-blob" { + t.Fatalf("got %+v", got) + } + + got.Name = "team-slack-renamed" + got.Enabled = false + got.UpdatedAt = time.Now().UTC() + if err := st.UpdateNotificationChannel(ctx, *got); err != nil { + t.Fatalf("update: %v", err) + } + got2, _ := st.GetNotificationChannel(ctx, ch.ID) + if got2.Name != "team-slack-renamed" || got2.Enabled { + t.Fatalf("update not applied: %+v", got2) + } + + all, _ := st.ListEnabledNotificationChannels(ctx) + if len(all) != 0 { + t.Errorf("disabled channel returned by ListEnabled: %d", len(all)) + } + + if err := st.DeleteNotificationChannel(ctx, ch.ID); err != nil { + t.Fatalf("delete: %v", err) + } + if _, err := st.GetNotificationChannel(ctx, ch.ID); err == nil { + t.Errorf("expected ErrNotFound after delete") + } +} + +func TestAppendNotificationLog(t *testing.T) { + t.Parallel() + dir := t.TempDir() + st, _ := Open(context.Background(), filepath.Join(dir, "rm.db")) + defer st.Close() + ctx := context.Background() + + chID := ulid.Make().String() + if err := st.CreateNotificationChannel(ctx, NotificationChannel{ + ID: chID, Kind: "ntfy", Name: "n", Enabled: true, + Config: []byte{1, 2, 3}, + CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("create channel: %v", err) + } + + code := 200 + lat := 287 + if err := st.AppendNotificationLog(ctx, NotificationLogEntry{ + ID: ulid.Make().String(), ChannelID: chID, Event: "alert.test", + OK: true, StatusCode: &code, LatencyMS: &lat, + FiredAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("append: %v", err) + } + + // LastFiredAt projection: the channel's last_fired_at is updated + // either by the append helper or by the callers; if you choose the + // helper does the bump, assert it. + got, _ := st.GetNotificationChannel(ctx, chID) + if got.LastFiredAt == nil { + t.Errorf("last_fired_at should bump on AppendNotificationLog success") + } +}