Files
restic-manager/internal/notification/hub.go
T

188 lines
5.0 KiB
Go

package notification
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"log/slog"
"sync"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// Hub fans Payload events out to every enabled channel and persists
// the result to notification_log. One Hub per process; thread-safe.
type Hub struct {
store *store.Store
aead *crypto.AEAD
baseURL string // e.g. https://restic-manager.example
msgIDDomain string // hostname extracted from baseURL for SMTP Message-ID
}
// NewHub constructs a Hub. baseURL is the public root of the server
// (used to build /alerts/<id> links and the SMTP Message-ID domain).
func NewHub(st *store.Store, aead *crypto.AEAD, baseURL string) *Hub {
return &Hub{
store: st,
aead: aead,
baseURL: baseURL,
msgIDDomain: extractDomain(baseURL),
}
}
// Dispatch fans out to every enabled channel. Best-effort — failures
// are logged to notification_log but do not propagate to the caller.
// Each channel runs in its own goroutine; Dispatch returns only when
// all goroutines have settled, so the caller can block briefly for
// the test-button case.
func (h *Hub) Dispatch(ctx context.Context, p Payload) {
chans, err := h.store.ListEnabledNotificationChannels(ctx)
if err != nil {
slog.Error("notification: list channels", "err", err)
return
}
// Stamp the alert link if the caller left it empty.
if p.Link == "" {
p.Link = h.baseURL + "/alerts/" + p.AlertID
}
var wg sync.WaitGroup
for _, c := range chans {
wg.Add(1)
go func(c store.NotificationChannel) {
defer wg.Done()
h.send(ctx, c, p)
}(c)
}
wg.Wait()
}
// DispatchOne fires a single channel — used by the "Send test
// notification" button. Returns the log entry that was persisted so
// the handler can render the result inline.
func (h *Hub) DispatchOne(ctx context.Context, channelID string, p Payload) (store.NotificationLogEntry, error) {
c, err := h.store.GetNotificationChannel(ctx, channelID)
if err != nil {
return store.NotificationLogEntry{}, err
}
if p.Link == "" {
p.Link = h.baseURL + "/alerts/" + p.AlertID
}
return h.send(ctx, *c, p), nil
}
// send builds the channel impl, delivers the payload, and persists a
// notification_log row regardless of success or failure.
func (h *Hub) send(ctx context.Context, c store.NotificationChannel, p Payload) store.NotificationLogEntry {
ch, buildErr := h.buildChannel(c)
logEntry := store.NotificationLogEntry{
ID: newID(),
ChannelID: c.ID,
Event: string(p.Event),
FiredAt: time.Now().UTC(),
}
if p.AlertID != "" {
aid := p.AlertID
logEntry.AlertID = &aid
}
if buildErr != nil {
errStr := buildErr.Error()
logEntry.OK = false
logEntry.Error = &errStr
_ = h.store.AppendNotificationLog(ctx, logEntry)
return logEntry
}
code, latency, sendErr := ch.Send(ctx, p)
statusCode := code
latencyMS := int(latency.Milliseconds())
logEntry.StatusCode = &statusCode
logEntry.LatencyMS = &latencyMS
if sendErr != nil {
errStr := sendErr.Error()
logEntry.OK = false
logEntry.Error = &errStr
} else {
logEntry.OK = true
}
if err := h.store.AppendNotificationLog(ctx, logEntry); err != nil {
slog.Warn("notification: persist log", "err", err)
}
return logEntry
}
// buildChannel decrypts the channel config and returns a concrete
// Channel implementation for the channel's kind.
func (h *Hub) buildChannel(row store.NotificationChannel) (Channel, error) {
plain, err := h.aead.Decrypt(string(row.Config), []byte("notification-channel:"+row.ID))
if err != nil {
return nil, err
}
switch row.Kind {
case "webhook":
var cfg WebhookConfig
if err := json.Unmarshal(plain, &cfg); err != nil {
return nil, err
}
return NewWebhookChannel(cfg), nil
case "ntfy":
var cfg NtfyConfig
if err := json.Unmarshal(plain, &cfg); err != nil {
return nil, err
}
dp := ""
if row.DefaultPriority != nil {
dp = *row.DefaultPriority
}
return NewNtfyChannel(cfg, dp), nil
case "smtp":
var cfg SMTPConfig
if err := json.Unmarshal(plain, &cfg); err != nil {
return nil, err
}
return NewSMTPChannel(cfg, h.msgIDDomain), nil
}
return nil, errUnknownKind(row.Kind)
}
// newID returns a 32-hex-char random identifier for notification_log rows.
func newID() string {
var b [16]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}
// extractDomain strips the scheme and path from baseURL, leaving only
// the host[:port] component. Used as the right-hand side of SMTP
// Message-IDs.
func extractDomain(baseURL string) string {
s := baseURL
if i := indexOf(s, "://"); i >= 0 {
s = s[i+3:]
}
if i := indexOf(s, "/"); i >= 0 {
s = s[:i]
}
if s == "" {
return "restic-manager.local"
}
return s
}
// indexOf returns the index of the first occurrence of sub in s, or -1.
func indexOf(s, sub string) int {
for i := 0; i+len(sub) <= len(s); i++ {
if s[i:i+len(sub)] == sub {
return i
}
}
return -1
}
type errUnknownKind string
func (e errUnknownKind) Error() string { return "notification: unknown kind: " + string(e) }