notification: ntfy channel
This commit is contained in:
@@ -0,0 +1,103 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NtfyConfig is the per-channel JSON shape stored AEAD-encrypted in
|
||||
// notification_channels.config.
|
||||
type NtfyConfig struct {
|
||||
ServerURL string `json:"server_url"`
|
||||
Topic string `json:"topic"`
|
||||
AccessToken string `json:"access_token,omitempty"`
|
||||
}
|
||||
|
||||
// NtfyChannel delivers alerts to an ntfy server using POST with
|
||||
// ntfy-specific headers (Title, Priority, Tags, Click). One instance
|
||||
// per configured channel row. Reused across sends — http.Client is
|
||||
// goroutine-safe.
|
||||
type NtfyChannel struct {
|
||||
cfg NtfyConfig
|
||||
defaultPriority string // "min"/"low"/"default"/"high"/"urgent" or ""
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewNtfyChannel builds an ntfy channel with a 5s http.Client timeout.
|
||||
// defaultPriority is the channel-configured fallback when no
|
||||
// severity-specific mapping applies; pass "" to use the built-in
|
||||
// fallbacks (4 for warning, 3 for everything else).
|
||||
func NewNtfyChannel(cfg NtfyConfig, defaultPriority string) *NtfyChannel {
|
||||
if cfg.ServerURL == "" {
|
||||
cfg.ServerURL = "https://ntfy.sh"
|
||||
}
|
||||
return &NtfyChannel{
|
||||
cfg: cfg,
|
||||
defaultPriority: defaultPriority,
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// Kind returns "ntfy" for log enrichment and dispatcher routing.
|
||||
func (c *NtfyChannel) Kind() string { return "ntfy" }
|
||||
|
||||
// Send delivers the payload as a plain-text POST to <server>/<topic>
|
||||
// with ntfy headers. Returns (statusCode, latency, err). 4xx/5xx
|
||||
// responses are returned as errors with the status code set.
|
||||
func (c *NtfyChannel) Send(ctx context.Context, p Payload) (int, time.Duration, error) {
|
||||
url := c.cfg.ServerURL + "/" + c.cfg.Topic
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBufferString(p.Message))
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("ntfy: build request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Title", fmt.Sprintf("[%s] %s %s", p.Severity, p.HostName, p.Kind))
|
||||
req.Header.Set("Tags", p.Severity+","+p.Kind)
|
||||
req.Header.Set("Priority", priorityForSeverity(p.Severity, c.defaultPriority))
|
||||
if p.Link != "" {
|
||||
req.Header.Set("Click", p.Link)
|
||||
}
|
||||
if c.cfg.AccessToken != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.cfg.AccessToken)
|
||||
}
|
||||
|
||||
t0 := time.Now()
|
||||
res, err := c.client.Do(req)
|
||||
latency := time.Since(t0)
|
||||
if err != nil {
|
||||
return 0, latency, fmt.Errorf("ntfy: do: %w", err)
|
||||
}
|
||||
defer func() { _ = res.Body.Close() }()
|
||||
// Drain body to keep the connection reusable.
|
||||
_, _ = io.Copy(io.Discard, res.Body)
|
||||
if res.StatusCode >= 400 {
|
||||
return res.StatusCode, latency, fmt.Errorf("ntfy: http %d", res.StatusCode)
|
||||
}
|
||||
return res.StatusCode, latency, nil
|
||||
}
|
||||
|
||||
// priorityForSeverity maps a severity string to an ntfy numeric priority
|
||||
// string. critical always returns "5" regardless of defaultPri. For
|
||||
// other severities, defaultPri is returned when non-empty, otherwise
|
||||
// "4" for warning and "3" for everything else.
|
||||
func priorityForSeverity(severity, defaultPri string) string {
|
||||
switch severity {
|
||||
case "critical":
|
||||
return "5"
|
||||
case "warning":
|
||||
if defaultPri != "" {
|
||||
return defaultPri
|
||||
}
|
||||
return "4"
|
||||
default:
|
||||
if defaultPri != "" {
|
||||
return defaultPri
|
||||
}
|
||||
return "3"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNtfySendsHeadersAndBody(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
gotTitle string
|
||||
gotPri string
|
||||
gotTags string
|
||||
gotClick string
|
||||
gotAuth string
|
||||
gotBody string
|
||||
)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gotTitle = r.Header.Get("Title")
|
||||
gotPri = r.Header.Get("Priority")
|
||||
gotTags = r.Header.Get("Tags")
|
||||
gotClick = r.Header.Get("Click")
|
||||
gotAuth = r.Header.Get("Authorization")
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
gotBody = string(b)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
cfg := NtfyConfig{
|
||||
ServerURL: srv.URL,
|
||||
Topic: "alerts",
|
||||
AccessToken: "tk1",
|
||||
}
|
||||
ch := NewNtfyChannel(cfg, "") // no default priority; critical must still be "5"
|
||||
|
||||
p := Payload{
|
||||
Event: EventRaised,
|
||||
AlertID: "01HZ",
|
||||
Severity: "critical",
|
||||
Kind: "check_failed",
|
||||
HostName: "alfa-01",
|
||||
Message: "errors found",
|
||||
RaisedAt: time.Now(),
|
||||
Link: "https://rm.example/a",
|
||||
}
|
||||
|
||||
code, _, err := ch.Send(t.Context(), p)
|
||||
if err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
if code != http.StatusOK {
|
||||
t.Fatalf("want 200, got %d", code)
|
||||
}
|
||||
|
||||
if want := "[critical] alfa-01 check_failed"; gotTitle != want {
|
||||
t.Errorf("Title: got %q want %q", gotTitle, want)
|
||||
}
|
||||
if gotPri != "5" {
|
||||
t.Errorf("Priority: got %q want \"5\"", gotPri)
|
||||
}
|
||||
if want := "critical,check_failed"; gotTags != want {
|
||||
t.Errorf("Tags: got %q want %q", gotTags, want)
|
||||
}
|
||||
if gotClick != "https://rm.example/a" {
|
||||
t.Errorf("Click: got %q want %q", gotClick, "https://rm.example/a")
|
||||
}
|
||||
if want := "Bearer tk1"; gotAuth != want {
|
||||
t.Errorf("Authorization: got %q want %q", gotAuth, want)
|
||||
}
|
||||
if gotBody != "errors found" {
|
||||
t.Errorf("body: got %q want %q", gotBody, "errors found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNtfyDefaultPriorityRespected(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// info + defaultPri="min" → "min"
|
||||
if got := priorityForSeverity("info", "min"); got != "min" {
|
||||
t.Errorf("info+min: got %q want \"min\"", got)
|
||||
}
|
||||
// critical → "5" regardless of default
|
||||
if got := priorityForSeverity("critical", "min"); got != "5" {
|
||||
t.Errorf("critical+min: got %q want \"5\"", got)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user