notification: ntfy channel

This commit is contained in:
2026-05-04 19:35:50 +01:00
parent 5031c888ed
commit 1ff0b2dc86
2 changed files with 195 additions and 0 deletions
+103
View File
@@ -0,0 +1,103 @@
package notification
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
)
// NtfyConfig is the per-channel JSON shape stored AEAD-encrypted in
// notification_channels.config.
type NtfyConfig struct {
ServerURL string `json:"server_url"`
Topic string `json:"topic"`
AccessToken string `json:"access_token,omitempty"`
}
// NtfyChannel delivers alerts to an ntfy server using POST with
// ntfy-specific headers (Title, Priority, Tags, Click). One instance
// per configured channel row. Reused across sends — http.Client is
// goroutine-safe.
type NtfyChannel struct {
cfg NtfyConfig
defaultPriority string // "min"/"low"/"default"/"high"/"urgent" or ""
client *http.Client
}
// NewNtfyChannel builds an ntfy channel with a 5s http.Client timeout.
// defaultPriority is the channel-configured fallback when no
// severity-specific mapping applies; pass "" to use the built-in
// fallbacks (4 for warning, 3 for everything else).
func NewNtfyChannel(cfg NtfyConfig, defaultPriority string) *NtfyChannel {
if cfg.ServerURL == "" {
cfg.ServerURL = "https://ntfy.sh"
}
return &NtfyChannel{
cfg: cfg,
defaultPriority: defaultPriority,
client: &http.Client{Timeout: 5 * time.Second},
}
}
// Kind returns "ntfy" for log enrichment and dispatcher routing.
func (c *NtfyChannel) Kind() string { return "ntfy" }
// Send delivers the payload as a plain-text POST to <server>/<topic>
// with ntfy headers. Returns (statusCode, latency, err). 4xx/5xx
// responses are returned as errors with the status code set.
func (c *NtfyChannel) Send(ctx context.Context, p Payload) (int, time.Duration, error) {
url := c.cfg.ServerURL + "/" + c.cfg.Topic
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBufferString(p.Message))
if err != nil {
return 0, 0, fmt.Errorf("ntfy: build request: %w", err)
}
req.Header.Set("Title", fmt.Sprintf("[%s] %s %s", p.Severity, p.HostName, p.Kind))
req.Header.Set("Tags", p.Severity+","+p.Kind)
req.Header.Set("Priority", priorityForSeverity(p.Severity, c.defaultPriority))
if p.Link != "" {
req.Header.Set("Click", p.Link)
}
if c.cfg.AccessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.cfg.AccessToken)
}
t0 := time.Now()
res, err := c.client.Do(req)
latency := time.Since(t0)
if err != nil {
return 0, latency, fmt.Errorf("ntfy: do: %w", err)
}
defer func() { _ = res.Body.Close() }()
// Drain body to keep the connection reusable.
_, _ = io.Copy(io.Discard, res.Body)
if res.StatusCode >= 400 {
return res.StatusCode, latency, fmt.Errorf("ntfy: http %d", res.StatusCode)
}
return res.StatusCode, latency, nil
}
// priorityForSeverity maps a severity string to an ntfy numeric priority
// string. critical always returns "5" regardless of defaultPri. For
// other severities, defaultPri is returned when non-empty, otherwise
// "4" for warning and "3" for everything else.
func priorityForSeverity(severity, defaultPri string) string {
switch severity {
case "critical":
return "5"
case "warning":
if defaultPri != "" {
return defaultPri
}
return "4"
default:
if defaultPri != "" {
return defaultPri
}
return "3"
}
}
+92
View File
@@ -0,0 +1,92 @@
package notification
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestNtfySendsHeadersAndBody(t *testing.T) {
t.Parallel()
var (
gotTitle string
gotPri string
gotTags string
gotClick string
gotAuth string
gotBody string
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotTitle = r.Header.Get("Title")
gotPri = r.Header.Get("Priority")
gotTags = r.Header.Get("Tags")
gotClick = r.Header.Get("Click")
gotAuth = r.Header.Get("Authorization")
b, _ := io.ReadAll(r.Body)
gotBody = string(b)
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
cfg := NtfyConfig{
ServerURL: srv.URL,
Topic: "alerts",
AccessToken: "tk1",
}
ch := NewNtfyChannel(cfg, "") // no default priority; critical must still be "5"
p := Payload{
Event: EventRaised,
AlertID: "01HZ",
Severity: "critical",
Kind: "check_failed",
HostName: "alfa-01",
Message: "errors found",
RaisedAt: time.Now(),
Link: "https://rm.example/a",
}
code, _, err := ch.Send(t.Context(), p)
if err != nil {
t.Fatalf("Send: %v", err)
}
if code != http.StatusOK {
t.Fatalf("want 200, got %d", code)
}
if want := "[critical] alfa-01 check_failed"; gotTitle != want {
t.Errorf("Title: got %q want %q", gotTitle, want)
}
if gotPri != "5" {
t.Errorf("Priority: got %q want \"5\"", gotPri)
}
if want := "critical,check_failed"; gotTags != want {
t.Errorf("Tags: got %q want %q", gotTags, want)
}
if gotClick != "https://rm.example/a" {
t.Errorf("Click: got %q want %q", gotClick, "https://rm.example/a")
}
if want := "Bearer tk1"; gotAuth != want {
t.Errorf("Authorization: got %q want %q", gotAuth, want)
}
if gotBody != "errors found" {
t.Errorf("body: got %q want %q", gotBody, "errors found")
}
}
func TestNtfyDefaultPriorityRespected(t *testing.T) {
t.Parallel()
// info + defaultPri="min" → "min"
if got := priorityForSeverity("info", "min"); got != "min" {
t.Errorf("info+min: got %q want \"min\"", got)
}
// critical → "5" regardless of default
if got := priorityForSeverity("critical", "min"); got != "5" {
t.Errorf("critical+min: got %q want \"5\"", got)
}
}