notification: webhook channel
This commit is contained in:
@@ -0,0 +1,98 @@
|
|||||||
|
package notification
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WebhookConfig is the per-channel JSON shape stored AEAD-encrypted
|
||||||
|
// in notification_channels.config.
|
||||||
|
type WebhookConfig struct {
|
||||||
|
URL string `json:"url"`
|
||||||
|
BearerToken string `json:"bearer_token,omitempty"`
|
||||||
|
HeaderName string `json:"header_name,omitempty"`
|
||||||
|
HeaderValue string `json:"header_value,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// WebhookChannel is the HTTP-POST channel. One per configured channel
|
||||||
|
// row. Reused across sends — the http.Client is goroutine-safe.
|
||||||
|
type WebhookChannel struct {
|
||||||
|
cfg WebhookConfig
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWebhookChannel builds a webhook with a 5s overall timeout enforced
|
||||||
|
// by the http.Client; ctx in Send is layered on top for caller-driven
|
||||||
|
// cancel.
|
||||||
|
func NewWebhookChannel(cfg WebhookConfig) *WebhookChannel {
|
||||||
|
return &WebhookChannel{
|
||||||
|
cfg: cfg,
|
||||||
|
client: &http.Client{Timeout: 5 * time.Second},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kind returns "webhook" for log enrichment and dispatcher routing.
|
||||||
|
func (c *WebhookChannel) Kind() string { return "webhook" }
|
||||||
|
|
||||||
|
// webhookBody is the wire-stable envelope. Documented in the spec; do
|
||||||
|
// not reorder fields freely — operators write switch statements on
|
||||||
|
// "event" and "severity".
|
||||||
|
type webhookBody struct {
|
||||||
|
Event string `json:"event"`
|
||||||
|
AlertID string `json:"alert_id"`
|
||||||
|
Severity string `json:"severity"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
HostID string `json:"host_id"`
|
||||||
|
HostName string `json:"host_name"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
RaisedAt string `json:"raised_at"`
|
||||||
|
Link string `json:"link"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send delivers the payload as a JSON POST. Returns (statusCode, latency, err).
|
||||||
|
// 4xx/5xx responses are returned as errors with the status code set.
|
||||||
|
func (c *WebhookChannel) Send(ctx context.Context, p Payload) (int, time.Duration, error) {
|
||||||
|
body := webhookBody{
|
||||||
|
Event: string(p.Event), AlertID: p.AlertID,
|
||||||
|
Severity: p.Severity, Kind: p.Kind,
|
||||||
|
HostID: p.HostID, HostName: p.HostName,
|
||||||
|
Message: p.Message,
|
||||||
|
RaisedAt: p.RaisedAt.UTC().Format(time.RFC3339Nano),
|
||||||
|
Link: p.Link,
|
||||||
|
}
|
||||||
|
buf, err := json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("webhook: marshal body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.cfg.URL, bytes.NewReader(buf))
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("webhook: build request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
if c.cfg.BearerToken != "" {
|
||||||
|
req.Header.Set("Authorization", "Bearer "+c.cfg.BearerToken)
|
||||||
|
}
|
||||||
|
if c.cfg.HeaderName != "" {
|
||||||
|
req.Header.Set(c.cfg.HeaderName, c.cfg.HeaderValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
|
res, err := c.client.Do(req)
|
||||||
|
latency := time.Since(t0)
|
||||||
|
if err != nil {
|
||||||
|
return 0, latency, fmt.Errorf("webhook: do: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = res.Body.Close() }()
|
||||||
|
// Drain body — keep the connection reusable.
|
||||||
|
_, _ = io.Copy(io.Discard, res.Body)
|
||||||
|
if res.StatusCode >= 400 {
|
||||||
|
return res.StatusCode, latency, fmt.Errorf("webhook: http %d", res.StatusCode)
|
||||||
|
}
|
||||||
|
return res.StatusCode, latency, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package notification
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWebhookSendsCorrectPayloadAndHeaders(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
var got webhookBody
|
||||||
|
var auth, custom string
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
auth = r.Header.Get("Authorization")
|
||||||
|
custom = r.Header.Get("X-Test")
|
||||||
|
_ = json.NewDecoder(r.Body).Decode(&got)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
ch := NewWebhookChannel(WebhookConfig{
|
||||||
|
URL: srv.URL, BearerToken: "tok-123",
|
||||||
|
HeaderName: "X-Test", HeaderValue: "yes",
|
||||||
|
})
|
||||||
|
code, _, err := ch.Send(context.Background(), Payload{
|
||||||
|
Event: EventRaised, AlertID: "01K",
|
||||||
|
Severity: "warning", Kind: "backup_failed",
|
||||||
|
HostID: "h1", HostName: "alfa-01",
|
||||||
|
Message: "Backup failed",
|
||||||
|
RaisedAt: time.Date(2026, 5, 4, 15, 42, 1, 0, time.UTC),
|
||||||
|
Link: "https://rm.example/alerts/01K",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("send: %v", err)
|
||||||
|
}
|
||||||
|
if code != 200 {
|
||||||
|
t.Errorf("status: %d", code)
|
||||||
|
}
|
||||||
|
if got.Event != "alert.raised" || got.Kind != "backup_failed" || got.Message != "Backup failed" {
|
||||||
|
t.Errorf("body: %+v", got)
|
||||||
|
}
|
||||||
|
if auth != "Bearer tok-123" {
|
||||||
|
t.Errorf("auth: %q", auth)
|
||||||
|
}
|
||||||
|
if custom != "yes" {
|
||||||
|
t.Errorf("custom header: %q", custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWebhookReturnsErrorOn4xx(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
ch := NewWebhookChannel(WebhookConfig{URL: srv.URL})
|
||||||
|
code, _, err := ch.Send(context.Background(), Payload{Event: EventRaised})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error for 401")
|
||||||
|
}
|
||||||
|
if code != 401 {
|
||||||
|
t.Errorf("code: %d", code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWebhookRespectsCtxTimeout(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
w.WriteHeader(200)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
ch := NewWebhookChannel(WebhookConfig{URL: srv.URL})
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
_, _, err := ch.Send(ctx, Payload{Event: EventRaised})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected timeout error")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user