3411 lines
99 KiB
Markdown
3411 lines
99 KiB
Markdown
# P3 Alerts Implementation Plan
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** Build the alerts subsystem (engine + three notification channels + UI) per `docs/superpowers/specs/2026-05-04-p3-alerts-design.md`. End state: a hardcoded six-rule engine raises alerts on real events; webhook / ntfy / SMTP channels notify on raise/ack/resolve; operators see alerts at `/alerts` and configure channels at `/settings/notifications`.
|
||
|
||
**Architecture:** Three loosely-coupled units behind one `AlertEngine` goroutine — event hooks fed by existing call sites (MarkJobFinished, offline sweeper, ws hello), 60s ticker for stale-schedule + auto-resolution, fan-out via `notification.Hub`. All persisted state in two new tables (`notification_channels`, `notification_log`) plus one new column on the existing `alerts` table.
|
||
|
||
**Tech Stack:** Go 1.25, modernc.org/sqlite, chi router, html/template, AEAD-encrypted blobs (existing `crypto.AEAD`), `net/smtp` + `crypto/tls` for SMTP, `net/http` for webhook + ntfy.
|
||
|
||
---
|
||
|
||
## File Structure
|
||
|
||
| File | Status | Purpose |
|
||
| --- | --- | --- |
|
||
| `internal/store/migrations/0013_alerts_last_seen.sql` | Create | Adds `alerts.last_seen_at` column. |
|
||
| `internal/store/migrations/0014_notifications.sql` | Create | New `notification_channels` + `notification_log` tables. |
|
||
| `internal/store/alerts.go` | Modify | Existing file ships the `Alert` type only. Add `RaiseOrTouch`, `Acknowledge`, `Resolve`, `AutoResolve`, `ListAlerts`, `GetAlert`. |
|
||
| `internal/store/notification_channels.go` | Create | CRUD for `notification_channels` (encrypted config blob), `AppendNotificationLog`. |
|
||
| `internal/notification/payload.go` | Create | `Event` enum + `Payload` struct shared across channels. |
|
||
| `internal/notification/channel.go` | Create | `Channel` interface; helpers (build link, etc). |
|
||
| `internal/notification/webhook.go` | Create | Webhook impl (HTTP POST + bearer + custom header). |
|
||
| `internal/notification/ntfy.go` | Create | Ntfy impl (POST with Title/Priority/Tags/Click). |
|
||
| `internal/notification/smtp.go` | Create | SMTP impl using `net/smtp` + `crypto/tls`. |
|
||
| `internal/notification/hub.go` | Create | Per-event fan-out across enabled channels; logs results. |
|
||
| `internal/alert/engine.go` | Create | Goroutine, event channels, ticker, rule dispatch. |
|
||
| `internal/alert/rules.go` | Create | Rule registry + per-rule logic for the six rules. |
|
||
| `internal/server/http/ui_alerts.go` | Create | `/alerts` GET + `acknowledge` / `resolve` POST handlers. |
|
||
| `internal/server/http/ui_notifications.go` | Create | `/settings/notifications` CRUD + `POST /api/notifications/{id}/test`. |
|
||
| `internal/server/http/server.go` | Modify | Wire the new routes; add `Engine` to `Deps`. |
|
||
| `internal/server/ui/ui.go` | Modify | Add `alerts.html`, `notifications.html`, `notification_edit.html`, `settings.html`, `partials/alert_row.html`, `partials/crit_banner.html` to commonPaths. |
|
||
| `web/templates/pages/alerts.html` | Create | Fleet alerts list + filter strip. |
|
||
| `web/templates/pages/settings.html` | Create | Settings shell with sub-tabs (Notifications / Users / Auth). |
|
||
| `web/templates/pages/notifications.html` | Create | Channel list (Notifications sub-tab). |
|
||
| `web/templates/pages/notification_edit.html` | Create | Channel kind picker + per-kind form + test result + payload preview. |
|
||
| `web/templates/partials/alert_row.html` | Create | One alert row (used standalone + on swap). |
|
||
| `web/templates/partials/crit_banner.html` | Create | Dashboard-top critical banner. |
|
||
| `web/templates/pages/dashboard.html` | Modify | Render the crit banner partial. |
|
||
| `web/templates/partials/nav.html` | Modify | Show alert count badge on Alerts tab. |
|
||
| `cmd/server/main.go` | Modify | Construct `alert.Engine` + `notification.Hub` + start engine goroutine. |
|
||
| `internal/server/ws/handler.go` | Modify | Hook `engine.NotifyHostOnline` on hello + `NotifyJobFinished` after MarkJobFinished. |
|
||
| `tasks.md` | Modify | Tick P3-05/06/07 with as-shipped notes. |
|
||
|
||
Tests live in `_test.go` files alongside the source (existing convention).
|
||
|
||
---
|
||
|
||
## Slice A — Schema groundwork
|
||
|
||
### Task A1: Migration 0013 — `alerts.last_seen_at`
|
||
|
||
**Files:**
|
||
- Create: `internal/store/migrations/0013_alerts_last_seen.sql`
|
||
- Test: `internal/store/migrate_test.go` (existing — gains a small assertion)
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
|
||
Append to `internal/store/migrate_test.go`:
|
||
|
||
```go
|
||
func TestMigration0013AlertsLastSeen(t *testing.T) {
|
||
t.Parallel()
|
||
dir := t.TempDir()
|
||
st, err := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
if err != nil {
|
||
t.Fatalf("open: %v", err)
|
||
}
|
||
defer st.Close()
|
||
|
||
// Column must exist after migration. Best signal: PRAGMA table_info.
|
||
rows, err := st.DB().Query(`SELECT name FROM pragma_table_info('alerts')`)
|
||
if err != nil {
|
||
t.Fatalf("pragma: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
cols := map[string]bool{}
|
||
for rows.Next() {
|
||
var n string
|
||
_ = rows.Scan(&n)
|
||
cols[n] = true
|
||
}
|
||
if !cols["last_seen_at"] {
|
||
t.Fatalf("alerts.last_seen_at not present after migration; cols=%v", cols)
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Run to verify it fails**
|
||
|
||
```sh
|
||
go test ./internal/store/ -run TestMigration0013AlertsLastSeen -count=1
|
||
```
|
||
Expected: FAIL — `alerts.last_seen_at not present`.
|
||
|
||
- [ ] **Step 3: Write the migration**
|
||
|
||
`internal/store/migrations/0013_alerts_last_seen.sql`:
|
||
|
||
```sql
|
||
-- 0013_alerts_last_seen.sql
|
||
--
|
||
-- Add alerts.last_seen_at to support open-alert dedup with
|
||
-- recurrence-tracking. The engine bumps this column on every tick
|
||
-- where a rule still matches an existing open alert, so the UI can
|
||
-- render "still happening · Ns ago" without sending a fresh
|
||
-- notification.
|
||
--
|
||
-- Column-level ALTER per CLAUDE.md (no rebuild — alerts has inbound
|
||
-- FK from acknowledged_by → users; rebuild would risk cascade).
|
||
-- Backfill last_seen_at = created_at for any pre-existing rows so
|
||
-- the column is non-null in practice (stays nullable in the schema
|
||
-- for forwards-compat with rows that haven't been touched yet).
|
||
|
||
ALTER TABLE alerts ADD COLUMN last_seen_at TEXT;
|
||
UPDATE alerts SET last_seen_at = created_at WHERE last_seen_at IS NULL;
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
```sh
|
||
go test ./internal/store/ -run TestMigration0013AlertsLastSeen -count=1
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```sh
|
||
git add internal/store/migrations/0013_alerts_last_seen.sql internal/store/migrate_test.go
|
||
git commit -m "store: migration 0013 — alerts.last_seen_at"
|
||
```
|
||
|
||
---
|
||
|
||
### Task A2: Migration 0014 — `notification_channels` + `notification_log`
|
||
|
||
**Files:**
|
||
- Create: `internal/store/migrations/0014_notifications.sql`
|
||
- Test: `internal/store/migrate_test.go`
|
||
|
||
- [ ] **Step 1: Append failing test**
|
||
|
||
```go
|
||
func TestMigration0014NotificationsTables(t *testing.T) {
|
||
t.Parallel()
|
||
dir := t.TempDir()
|
||
st, err := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
if err != nil {
|
||
t.Fatalf("open: %v", err)
|
||
}
|
||
defer st.Close()
|
||
|
||
for _, want := range []string{"notification_channels", "notification_log"} {
|
||
var n int
|
||
if err := st.DB().QueryRow(
|
||
`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?`, want,
|
||
).Scan(&n); err != nil {
|
||
t.Fatalf("scan: %v", err)
|
||
}
|
||
if n != 1 {
|
||
t.Errorf("table %q missing after migration", want)
|
||
}
|
||
}
|
||
|
||
// Sanity: kind CHECK accepts all three v1 kinds.
|
||
for _, k := range []string{"webhook", "ntfy", "smtp"} {
|
||
_, err := st.DB().Exec(
|
||
`INSERT INTO notification_channels (id, kind, name, config, created_at, updated_at)
|
||
VALUES (?, ?, ?, x'00', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')`,
|
||
"test-"+k, k, "test-"+k)
|
||
if err != nil {
|
||
t.Errorf("insert %q rejected by CHECK: %v", k, err)
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Run to verify it fails**
|
||
|
||
```sh
|
||
go test ./internal/store/ -run TestMigration0014NotificationsTables -count=1
|
||
```
|
||
Expected: FAIL — both tables missing.
|
||
|
||
- [ ] **Step 3: Write the migration**
|
||
|
||
`internal/store/migrations/0014_notifications.sql`:
|
||
|
||
```sql
|
||
-- 0014_notifications.sql
|
||
--
|
||
-- Notification channels (operator-configured destinations: webhook,
|
||
-- ntfy, SMTP) and the dispatch log. Both are net-new — no rebuild
|
||
-- pattern needed.
|
||
--
|
||
-- config is an AEAD-encrypted JSON blob. Per-kind shape lives in
|
||
-- internal/notification/{webhook,ntfy,smtp}.go. The CHECK keeps wire
|
||
-- consistency — adding a new kind requires a follow-up migration
|
||
-- (forces the implementer to think about it).
|
||
|
||
CREATE TABLE notification_channels (
|
||
id TEXT PRIMARY KEY,
|
||
kind TEXT NOT NULL CHECK (kind IN ('webhook', 'ntfy', 'smtp')),
|
||
name TEXT NOT NULL,
|
||
enabled INTEGER NOT NULL DEFAULT 1 CHECK (enabled IN (0, 1)),
|
||
config BLOB NOT NULL, -- AEAD-encrypted JSON; per-kind shape
|
||
default_priority TEXT, -- ntfy only; null for webhook + smtp
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
last_fired_at TEXT
|
||
);
|
||
|
||
CREATE INDEX notification_channels_enabled
|
||
ON notification_channels(enabled) WHERE enabled = 1;
|
||
|
||
CREATE TABLE notification_log (
|
||
id TEXT PRIMARY KEY,
|
||
channel_id TEXT NOT NULL REFERENCES notification_channels(id) ON DELETE CASCADE,
|
||
alert_id TEXT REFERENCES alerts(id) ON DELETE SET NULL,
|
||
event TEXT NOT NULL, -- alert.raised | alert.acknowledged | alert.resolved | alert.test
|
||
ok INTEGER NOT NULL CHECK (ok IN (0, 1)),
|
||
status_code INTEGER,
|
||
latency_ms INTEGER,
|
||
error TEXT,
|
||
fired_at TEXT NOT NULL
|
||
);
|
||
|
||
CREATE INDEX notification_log_channel
|
||
ON notification_log(channel_id, fired_at DESC);
|
||
CREATE INDEX notification_log_alert
|
||
ON notification_log(alert_id);
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
```sh
|
||
go test ./internal/store/ -run TestMigration0014NotificationsTables -count=1
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```sh
|
||
git add internal/store/migrations/0014_notifications.sql internal/store/migrate_test.go
|
||
git commit -m "store: migration 0014 — notification_channels + notification_log"
|
||
```
|
||
|
||
---
|
||
|
||
### Task A3: Alerts store API — `RaiseOrTouch`, `Acknowledge`, `Resolve`, `AutoResolve`, `ListAlerts`, `GetAlert`
|
||
|
||
**Files:**
|
||
- Modify: `internal/store/alerts.go`
|
||
- Test: `internal/store/alerts_test.go` (create)
|
||
- Modify: `internal/store/types.go` (extend `Alert` with `LastSeenAt *time.Time` — check current shape first)
|
||
|
||
- [ ] **Step 1: Extend the Alert type**
|
||
|
||
Read `internal/store/types.go` for the existing `Alert` struct. Add `LastSeenAt *time.Time` after `CreatedAt`. The whole struct should look like:
|
||
|
||
```go
|
||
type Alert struct {
|
||
ID string
|
||
HostID *string
|
||
Kind string
|
||
Severity string
|
||
Message string
|
||
CreatedAt time.Time
|
||
LastSeenAt *time.Time
|
||
AcknowledgedAt *time.Time
|
||
AcknowledgedBy *string
|
||
ResolvedAt *time.Time
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Write the failing test**
|
||
|
||
`internal/store/alerts_test.go`:
|
||
|
||
```go
|
||
package store
|
||
|
||
import (
|
||
"context"
|
||
"path/filepath"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/oklog/ulid/v2"
|
||
)
|
||
|
||
func newTestStoreWithHost(t *testing.T) (*Store, string) {
|
||
t.Helper()
|
||
dir := t.TempDir()
|
||
st, err := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
if err != nil {
|
||
t.Fatalf("open: %v", err)
|
||
}
|
||
t.Cleanup(func() { _ = st.Close() })
|
||
hostID := ulid.Make().String()
|
||
if err := st.CreateHost(context.Background(), Host{
|
||
ID: hostID, Name: "h", OS: "linux", Arch: "amd64",
|
||
EnrolledAt: time.Now().UTC(),
|
||
}, "deadbeef", ""); err != nil {
|
||
t.Fatalf("create host: %v", err)
|
||
}
|
||
return st, hostID
|
||
}
|
||
|
||
func TestRaiseOrTouchInsertsThenTouches(t *testing.T) {
|
||
t.Parallel()
|
||
st, hostID := newTestStoreWithHost(t)
|
||
ctx := context.Background()
|
||
|
||
t0 := time.Now().UTC()
|
||
id1, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning",
|
||
"Backup failed: 401", t0)
|
||
if err != nil {
|
||
t.Fatalf("first raise: %v", err)
|
||
}
|
||
if !didRaise {
|
||
t.Fatalf("first call must didRaise=true")
|
||
}
|
||
if id1 == "" {
|
||
t.Fatalf("expected non-empty id")
|
||
}
|
||
|
||
// Second call within the same open window should touch, not insert.
|
||
t1 := t0.Add(60 * time.Second)
|
||
id2, didRaise2, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning",
|
||
"Backup failed: 401 (still)", t1)
|
||
if err != nil {
|
||
t.Fatalf("touch: %v", err)
|
||
}
|
||
if didRaise2 {
|
||
t.Fatalf("second call must didRaise=false")
|
||
}
|
||
if id2 != id1 {
|
||
t.Fatalf("touch returned a different id: got %q want %q", id2, id1)
|
||
}
|
||
|
||
// last_seen_at and message must be updated.
|
||
got, err := st.GetAlert(ctx, id1)
|
||
if err != nil {
|
||
t.Fatalf("get: %v", err)
|
||
}
|
||
if got.LastSeenAt == nil || !got.LastSeenAt.Equal(t1) {
|
||
t.Errorf("last_seen_at: got %v want %v", got.LastSeenAt, t1)
|
||
}
|
||
if got.Message != "Backup failed: 401 (still)" {
|
||
t.Errorf("message not refreshed: %q", got.Message)
|
||
}
|
||
}
|
||
|
||
func TestResolveAndReRaiseStartsFreshAlert(t *testing.T) {
|
||
t.Parallel()
|
||
st, hostID := newTestStoreWithHost(t)
|
||
ctx := context.Background()
|
||
|
||
t0 := time.Now().UTC()
|
||
id1, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "first", t0)
|
||
if err != nil {
|
||
t.Fatalf("raise: %v", err)
|
||
}
|
||
if err := st.Resolve(ctx, id1, t0.Add(time.Minute)); err != nil {
|
||
t.Fatalf("resolve: %v", err)
|
||
}
|
||
|
||
id2, didRaise, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "second", t0.Add(2*time.Minute))
|
||
if err != nil {
|
||
t.Fatalf("re-raise: %v", err)
|
||
}
|
||
if !didRaise {
|
||
t.Fatalf("post-resolve raise must didRaise=true")
|
||
}
|
||
if id2 == id1 {
|
||
t.Fatalf("re-raise reused the resolved id; want a fresh row")
|
||
}
|
||
}
|
||
|
||
func TestAcknowledgeKeepsAlertOpen(t *testing.T) {
|
||
t.Parallel()
|
||
st, hostID := newTestStoreWithHost(t)
|
||
ctx := context.Background()
|
||
|
||
id, _, err := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", time.Now().UTC())
|
||
if err != nil {
|
||
t.Fatalf("raise: %v", err)
|
||
}
|
||
userID := "u-1"
|
||
if err := st.Acknowledge(ctx, id, userID, time.Now().UTC()); err != nil {
|
||
t.Fatalf("ack: %v", err)
|
||
}
|
||
got, err := st.GetAlert(ctx, id)
|
||
if err != nil {
|
||
t.Fatalf("get: %v", err)
|
||
}
|
||
if got.AcknowledgedAt == nil {
|
||
t.Errorf("acknowledged_at not set")
|
||
}
|
||
if got.AcknowledgedBy == nil || *got.AcknowledgedBy != userID {
|
||
t.Errorf("acknowledged_by: got %v want %q", got.AcknowledgedBy, userID)
|
||
}
|
||
if got.ResolvedAt != nil {
|
||
t.Errorf("ack must not set resolved_at; got %v", got.ResolvedAt)
|
||
}
|
||
}
|
||
|
||
func TestAutoResolveClearsOpenAlerts(t *testing.T) {
|
||
t.Parallel()
|
||
st, hostID := newTestStoreWithHost(t)
|
||
ctx := context.Background()
|
||
|
||
t0 := time.Now().UTC()
|
||
id, _, _ := st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "m", t0)
|
||
if err := st.AutoResolve(ctx, hostID, "backup_failed", t0.Add(time.Minute)); err != nil {
|
||
t.Fatalf("auto-resolve: %v", err)
|
||
}
|
||
got, _ := st.GetAlert(ctx, id)
|
||
if got.ResolvedAt == nil {
|
||
t.Errorf("expected resolved_at set")
|
||
}
|
||
}
|
||
|
||
func TestListAlertsFilters(t *testing.T) {
|
||
t.Parallel()
|
||
st, hostID := newTestStoreWithHost(t)
|
||
ctx := context.Background()
|
||
t0 := time.Now().UTC()
|
||
|
||
// One open warning + one resolved info.
|
||
_, _, _ = st.RaiseOrTouch(ctx, hostID, "backup_failed", "warning", "open", t0)
|
||
id2, _, _ := st.RaiseOrTouch(ctx, hostID, "stale_schedule", "info", "done", t0)
|
||
_ = st.Resolve(ctx, id2, t0.Add(time.Minute))
|
||
|
||
open, err := st.ListAlerts(ctx, AlertFilter{Status: "open"})
|
||
if err != nil {
|
||
t.Fatalf("list open: %v", err)
|
||
}
|
||
if len(open) != 1 || open[0].Severity != "warning" {
|
||
t.Errorf("open filter: got %+v", open)
|
||
}
|
||
|
||
all, err := st.ListAlerts(ctx, AlertFilter{Status: "all"})
|
||
if err != nil {
|
||
t.Fatalf("list all: %v", err)
|
||
}
|
||
if len(all) != 2 {
|
||
t.Errorf("all filter: got %d, want 2", len(all))
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run to verify it fails**
|
||
|
||
```sh
|
||
go test ./internal/store/ -run "TestRaiseOrTouchInsertsThenTouches|TestResolveAndReRaiseStartsFreshAlert|TestAcknowledgeKeepsAlertOpen|TestAutoResolveClearsOpenAlerts|TestListAlertsFilters" -count=1
|
||
```
|
||
Expected: FAIL — methods don't exist yet.
|
||
|
||
- [ ] **Step 4: Implement**
|
||
|
||
Append to `internal/store/alerts.go`:
|
||
|
||
```go
|
||
// AlertFilter narrows ListAlerts.
|
||
type AlertFilter struct {
|
||
Status string // "open" | "acknowledged" | "resolved" | "all" | ""
|
||
Severity string // "info" | "warning" | "critical" | ""
|
||
HostID string // empty = any host
|
||
Search string // substring match on message
|
||
Limit int // 0 = no limit
|
||
}
|
||
|
||
// RaiseOrTouch implements the dedup + last_seen_at bump pattern. If
|
||
// an alert with (host_id, kind, resolved_at IS NULL) already exists,
|
||
// it touches last_seen_at + message and returns (id, false). Otherwise
|
||
// inserts a fresh row and returns (id, true). Caller fires a
|
||
// notification only when didRaise=true.
|
||
func (s *Store) RaiseOrTouch(ctx context.Context, hostID, kind, severity, message string, when time.Time) (id string, didRaise bool, err error) {
|
||
tx, err := s.db.BeginTx(ctx, nil)
|
||
if err != nil {
|
||
return "", false, fmt.Errorf("store: begin: %w", err)
|
||
}
|
||
defer func() { _ = tx.Rollback() }()
|
||
|
||
row := tx.QueryRowContext(ctx,
|
||
`SELECT id FROM alerts WHERE host_id = ? AND kind = ? AND resolved_at IS NULL LIMIT 1`,
|
||
hostID, kind)
|
||
var existing string
|
||
switch err := row.Scan(&existing); {
|
||
case err == nil:
|
||
_, uerr := tx.ExecContext(ctx,
|
||
`UPDATE alerts SET last_seen_at = ?, message = ? WHERE id = ?`,
|
||
when.UTC().Format(time.RFC3339Nano), message, existing)
|
||
if uerr != nil {
|
||
return "", false, fmt.Errorf("store: touch alert: %w", uerr)
|
||
}
|
||
if err := tx.Commit(); err != nil {
|
||
return "", false, err
|
||
}
|
||
return existing, false, nil
|
||
case errors.Is(err, sql.ErrNoRows):
|
||
// fall through to insert
|
||
default:
|
||
return "", false, fmt.Errorf("store: lookup alert: %w", err)
|
||
}
|
||
|
||
id = ulid.Make().String()
|
||
whenStr := when.UTC().Format(time.RFC3339Nano)
|
||
_, err = tx.ExecContext(ctx,
|
||
`INSERT INTO alerts (id, host_id, kind, severity, message, created_at, last_seen_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||
id, hostID, kind, severity, message, whenStr, whenStr)
|
||
if err != nil {
|
||
return "", false, fmt.Errorf("store: insert alert: %w", err)
|
||
}
|
||
if err := tx.Commit(); err != nil {
|
||
return "", false, err
|
||
}
|
||
return id, true, nil
|
||
}
|
||
|
||
// Acknowledge sets acknowledged_at + acknowledged_by; does NOT set
|
||
// resolved_at. Idempotent — re-acknowledging just refreshes the timestamp.
|
||
func (s *Store) Acknowledge(ctx context.Context, id, userID string, when time.Time) error {
|
||
res, err := s.db.ExecContext(ctx,
|
||
`UPDATE alerts SET acknowledged_at = ?, acknowledged_by = ?
|
||
WHERE id = ? AND resolved_at IS NULL`,
|
||
when.UTC().Format(time.RFC3339Nano), userID, id)
|
||
if err != nil {
|
||
return fmt.Errorf("store: ack alert: %w", err)
|
||
}
|
||
n, _ := res.RowsAffected()
|
||
if n == 0 {
|
||
return ErrNotFound
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Resolve marks the alert resolved. Idempotent on already-resolved rows
|
||
// (no-op).
|
||
func (s *Store) Resolve(ctx context.Context, id string, when time.Time) error {
|
||
_, err := s.db.ExecContext(ctx,
|
||
`UPDATE alerts SET resolved_at = ?
|
||
WHERE id = ? AND resolved_at IS NULL`,
|
||
when.UTC().Format(time.RFC3339Nano), id)
|
||
if err != nil {
|
||
return fmt.Errorf("store: resolve alert: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// AutoResolve closes every open alert for the (host_id, kind) pair.
|
||
// Used by the engine when a rule's underlying condition clears (e.g.
|
||
// next backup succeeded so backup_failed clears).
|
||
func (s *Store) AutoResolve(ctx context.Context, hostID, kind string, when time.Time) error {
|
||
_, err := s.db.ExecContext(ctx,
|
||
`UPDATE alerts SET resolved_at = ?
|
||
WHERE host_id = ? AND kind = ? AND resolved_at IS NULL`,
|
||
when.UTC().Format(time.RFC3339Nano), hostID, kind)
|
||
if err != nil {
|
||
return fmt.Errorf("store: auto-resolve: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// GetAlert reads one row.
|
||
func (s *Store) GetAlert(ctx context.Context, id string) (*Alert, error) {
|
||
row := s.db.QueryRowContext(ctx,
|
||
`SELECT id, host_id, kind, severity, message, created_at, last_seen_at,
|
||
acknowledged_at, acknowledged_by, resolved_at
|
||
FROM alerts WHERE id = ?`, id)
|
||
return scanAlert(row.Scan)
|
||
}
|
||
|
||
// ListAlerts is the filtered list. Sort: open-first, then by created_at desc.
|
||
func (s *Store) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) {
|
||
q := `SELECT id, host_id, kind, severity, message, created_at, last_seen_at,
|
||
acknowledged_at, acknowledged_by, resolved_at FROM alerts`
|
||
conds := []string{}
|
||
args := []any{}
|
||
switch f.Status {
|
||
case "open":
|
||
conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NULL")
|
||
case "acknowledged":
|
||
conds = append(conds, "resolved_at IS NULL AND acknowledged_at IS NOT NULL")
|
||
case "resolved":
|
||
conds = append(conds, "resolved_at IS NOT NULL")
|
||
case "all", "":
|
||
// no-op
|
||
}
|
||
if f.Severity != "" {
|
||
conds = append(conds, "severity = ?")
|
||
args = append(args, f.Severity)
|
||
}
|
||
if f.HostID != "" {
|
||
conds = append(conds, "host_id = ?")
|
||
args = append(args, f.HostID)
|
||
}
|
||
if f.Search != "" {
|
||
conds = append(conds, "message LIKE ?")
|
||
args = append(args, "%"+f.Search+"%")
|
||
}
|
||
if len(conds) > 0 {
|
||
q += " WHERE " + strings.Join(conds, " AND ")
|
||
}
|
||
q += ` ORDER BY (resolved_at IS NULL) DESC, created_at DESC`
|
||
if f.Limit > 0 {
|
||
q += ` LIMIT ?`
|
||
args = append(args, f.Limit)
|
||
}
|
||
rows, err := s.db.QueryContext(ctx, q, args...)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("store: list alerts: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
var out []Alert
|
||
for rows.Next() {
|
||
a, err := scanAlert(rows.Scan)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
out = append(out, *a)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
// scanAlert centralises the column read so the GetAlert and
|
||
// ListAlerts paths agree on column order. Pass row.Scan or rows.Scan.
|
||
func scanAlert(scan func(...any) error) (*Alert, error) {
|
||
var a Alert
|
||
var hostID, lastSeen, ackedAt, ackedBy, resolvedAt sql.NullString
|
||
var createdAt string
|
||
if err := scan(&a.ID, &hostID, &a.Kind, &a.Severity, &a.Message,
|
||
&createdAt, &lastSeen, &ackedAt, &ackedBy, &resolvedAt); err != nil {
|
||
if errors.Is(err, sql.ErrNoRows) {
|
||
return nil, ErrNotFound
|
||
}
|
||
return nil, fmt.Errorf("store: scan alert: %w", err)
|
||
}
|
||
if hostID.Valid {
|
||
v := hostID.String
|
||
a.HostID = &v
|
||
}
|
||
t, err := time.Parse(time.RFC3339Nano, createdAt)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("store: parse created_at: %w", err)
|
||
}
|
||
a.CreatedAt = t
|
||
if lastSeen.Valid {
|
||
t, _ := time.Parse(time.RFC3339Nano, lastSeen.String)
|
||
a.LastSeenAt = &t
|
||
}
|
||
if ackedAt.Valid {
|
||
t, _ := time.Parse(time.RFC3339Nano, ackedAt.String)
|
||
a.AcknowledgedAt = &t
|
||
}
|
||
if ackedBy.Valid {
|
||
v := ackedBy.String
|
||
a.AcknowledgedBy = &v
|
||
}
|
||
if resolvedAt.Valid {
|
||
t, _ := time.Parse(time.RFC3339Nano, resolvedAt.String)
|
||
a.ResolvedAt = &t
|
||
}
|
||
return &a, nil
|
||
}
|
||
```
|
||
|
||
Add the imports if missing: `database/sql`, `errors`, `fmt`, `strings`, `time`, plus `github.com/oklog/ulid/v2`.
|
||
|
||
- [ ] **Step 5: Run tests to verify they pass**
|
||
|
||
```sh
|
||
go test ./internal/store/ -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```sh
|
||
git add internal/store/alerts.go internal/store/alerts_test.go internal/store/types.go
|
||
git commit -m "store: alerts CRUD with dedup + last_seen_at bump"
|
||
```
|
||
|
||
---
|
||
|
||
### Task A4: Notification-channels store API + log writer
|
||
|
||
**Files:**
|
||
- Create: `internal/store/notification_channels.go`
|
||
- Test: `internal/store/notification_channels_test.go`
|
||
|
||
- [ ] **Step 1: Define types in this file**
|
||
|
||
```go
|
||
package store
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
)
|
||
|
||
// NotificationChannel mirrors a row in notification_channels. The
|
||
// Config field is the AEAD-encrypted JSON blob; callers (in the
|
||
// notification package) decrypt before use.
|
||
type NotificationChannel struct {
|
||
ID string
|
||
Kind string // "webhook" | "ntfy" | "smtp"
|
||
Name string
|
||
Enabled bool
|
||
Config []byte // AEAD ciphertext; opaque at this layer
|
||
DefaultPriority *string
|
||
CreatedAt time.Time
|
||
UpdatedAt time.Time
|
||
LastFiredAt *time.Time
|
||
}
|
||
|
||
// NotificationLogEntry is one row in notification_log.
|
||
type NotificationLogEntry struct {
|
||
ID string
|
||
ChannelID string
|
||
AlertID *string
|
||
Event string // alert.raised | alert.acknowledged | alert.resolved | alert.test
|
||
OK bool
|
||
StatusCode *int
|
||
LatencyMS *int
|
||
Error *string
|
||
FiredAt time.Time
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Write the failing test**
|
||
|
||
```go
|
||
package store
|
||
|
||
import (
|
||
"context"
|
||
"path/filepath"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/oklog/ulid/v2"
|
||
)
|
||
|
||
func TestNotificationChannelCRUD(t *testing.T) {
|
||
t.Parallel()
|
||
dir := t.TempDir()
|
||
st, err := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
if err != nil {
|
||
t.Fatalf("open: %v", err)
|
||
}
|
||
defer st.Close()
|
||
ctx := context.Background()
|
||
|
||
ch := NotificationChannel{
|
||
ID: ulid.Make().String(), Kind: "webhook", Name: "team-slack",
|
||
Enabled: true, Config: []byte("encrypted-blob"),
|
||
CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(),
|
||
}
|
||
if err := st.CreateNotificationChannel(ctx, ch); err != nil {
|
||
t.Fatalf("create: %v", err)
|
||
}
|
||
|
||
got, err := st.GetNotificationChannel(ctx, ch.ID)
|
||
if err != nil {
|
||
t.Fatalf("get: %v", err)
|
||
}
|
||
if got.Name != ch.Name || got.Kind != "webhook" || string(got.Config) != "encrypted-blob" {
|
||
t.Fatalf("got %+v", got)
|
||
}
|
||
|
||
got.Name = "team-slack-renamed"
|
||
got.Enabled = false
|
||
got.UpdatedAt = time.Now().UTC()
|
||
if err := st.UpdateNotificationChannel(ctx, *got); err != nil {
|
||
t.Fatalf("update: %v", err)
|
||
}
|
||
got2, _ := st.GetNotificationChannel(ctx, ch.ID)
|
||
if got2.Name != "team-slack-renamed" || got2.Enabled {
|
||
t.Fatalf("update not applied: %+v", got2)
|
||
}
|
||
|
||
all, _ := st.ListEnabledNotificationChannels(ctx)
|
||
if len(all) != 0 {
|
||
t.Errorf("disabled channel returned by ListEnabled: %d", len(all))
|
||
}
|
||
|
||
if err := st.DeleteNotificationChannel(ctx, ch.ID); err != nil {
|
||
t.Fatalf("delete: %v", err)
|
||
}
|
||
if _, err := st.GetNotificationChannel(ctx, ch.ID); err == nil {
|
||
t.Errorf("expected ErrNotFound after delete")
|
||
}
|
||
}
|
||
|
||
func TestAppendNotificationLog(t *testing.T) {
|
||
t.Parallel()
|
||
dir := t.TempDir()
|
||
st, _ := Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
defer st.Close()
|
||
ctx := context.Background()
|
||
|
||
chID := ulid.Make().String()
|
||
if err := st.CreateNotificationChannel(ctx, NotificationChannel{
|
||
ID: chID, Kind: "ntfy", Name: "n", Enabled: true,
|
||
Config: []byte{1, 2, 3},
|
||
CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(),
|
||
}); err != nil {
|
||
t.Fatalf("create channel: %v", err)
|
||
}
|
||
|
||
code := 200
|
||
lat := 287
|
||
if err := st.AppendNotificationLog(ctx, NotificationLogEntry{
|
||
ID: ulid.Make().String(), ChannelID: chID, Event: "alert.test",
|
||
OK: true, StatusCode: &code, LatencyMS: &lat,
|
||
FiredAt: time.Now().UTC(),
|
||
}); err != nil {
|
||
t.Fatalf("append: %v", err)
|
||
}
|
||
|
||
// LastFiredAt projection: the channel's last_fired_at is updated
|
||
// either by the append helper or by the callers; if you choose the
|
||
// helper does the bump, assert it.
|
||
got, _ := st.GetNotificationChannel(ctx, chID)
|
||
if got.LastFiredAt == nil {
|
||
t.Errorf("last_fired_at should bump on AppendNotificationLog success")
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run to verify it fails**
|
||
|
||
```sh
|
||
go test ./internal/store/ -run "TestNotificationChannelCRUD|TestAppendNotificationLog" -count=1
|
||
```
|
||
Expected: FAIL.
|
||
|
||
- [ ] **Step 4: Implement**
|
||
|
||
Append to `internal/store/notification_channels.go`:
|
||
|
||
```go
|
||
func (s *Store) CreateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
|
||
enabled := 0
|
||
if ch.Enabled {
|
||
enabled = 1
|
||
}
|
||
_, err := s.db.ExecContext(ctx,
|
||
`INSERT INTO notification_channels
|
||
(id, kind, name, enabled, config, default_priority, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||
ch.ID, ch.Kind, ch.Name, enabled, ch.Config,
|
||
nullable(ch.DefaultPriority),
|
||
ch.CreatedAt.UTC().Format(time.RFC3339Nano),
|
||
ch.UpdatedAt.UTC().Format(time.RFC3339Nano))
|
||
if err != nil {
|
||
return fmt.Errorf("store: create channel: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *Store) UpdateNotificationChannel(ctx context.Context, ch NotificationChannel) error {
|
||
enabled := 0
|
||
if ch.Enabled {
|
||
enabled = 1
|
||
}
|
||
_, err := s.db.ExecContext(ctx,
|
||
`UPDATE notification_channels
|
||
SET kind = ?, name = ?, enabled = ?, config = ?,
|
||
default_priority = ?, updated_at = ?
|
||
WHERE id = ?`,
|
||
ch.Kind, ch.Name, enabled, ch.Config,
|
||
nullable(ch.DefaultPriority),
|
||
ch.UpdatedAt.UTC().Format(time.RFC3339Nano),
|
||
ch.ID)
|
||
if err != nil {
|
||
return fmt.Errorf("store: update channel: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *Store) DeleteNotificationChannel(ctx context.Context, id string) error {
|
||
_, err := s.db.ExecContext(ctx,
|
||
`DELETE FROM notification_channels WHERE id = ?`, id)
|
||
if err != nil {
|
||
return fmt.Errorf("store: delete channel: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *Store) GetNotificationChannel(ctx context.Context, id string) (*NotificationChannel, error) {
|
||
row := s.db.QueryRowContext(ctx,
|
||
`SELECT id, kind, name, enabled, config, default_priority,
|
||
created_at, updated_at, last_fired_at
|
||
FROM notification_channels WHERE id = ?`, id)
|
||
return scanChannel(row.Scan)
|
||
}
|
||
|
||
func (s *Store) ListNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
|
||
rows, err := s.db.QueryContext(ctx,
|
||
`SELECT id, kind, name, enabled, config, default_priority,
|
||
created_at, updated_at, last_fired_at
|
||
FROM notification_channels ORDER BY created_at ASC`)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("store: list channels: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
var out []NotificationChannel
|
||
for rows.Next() {
|
||
c, err := scanChannel(rows.Scan)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
out = append(out, *c)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
func (s *Store) ListEnabledNotificationChannels(ctx context.Context) ([]NotificationChannel, error) {
|
||
rows, err := s.db.QueryContext(ctx,
|
||
`SELECT id, kind, name, enabled, config, default_priority,
|
||
created_at, updated_at, last_fired_at
|
||
FROM notification_channels WHERE enabled = 1 ORDER BY created_at ASC`)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("store: list enabled: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
var out []NotificationChannel
|
||
for rows.Next() {
|
||
c, err := scanChannel(rows.Scan)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
out = append(out, *c)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
// AppendNotificationLog records a delivery attempt + bumps the
|
||
// channel's last_fired_at on success.
|
||
func (s *Store) AppendNotificationLog(ctx context.Context, e NotificationLogEntry) error {
|
||
tx, err := s.db.BeginTx(ctx, nil)
|
||
if err != nil {
|
||
return fmt.Errorf("store: begin: %w", err)
|
||
}
|
||
defer func() { _ = tx.Rollback() }()
|
||
|
||
ok := 0
|
||
if e.OK {
|
||
ok = 1
|
||
}
|
||
_, err = tx.ExecContext(ctx,
|
||
`INSERT INTO notification_log
|
||
(id, channel_id, alert_id, event, ok, status_code, latency_ms, error, fired_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||
e.ID, e.ChannelID, nullable(e.AlertID), e.Event, ok,
|
||
nullableInt(e.StatusCode), nullableInt(e.LatencyMS),
|
||
nullable(e.Error),
|
||
e.FiredAt.UTC().Format(time.RFC3339Nano))
|
||
if err != nil {
|
||
return fmt.Errorf("store: append notification_log: %w", err)
|
||
}
|
||
|
||
if e.OK {
|
||
if _, err := tx.ExecContext(ctx,
|
||
`UPDATE notification_channels SET last_fired_at = ? WHERE id = ?`,
|
||
e.FiredAt.UTC().Format(time.RFC3339Nano), e.ChannelID); err != nil {
|
||
return fmt.Errorf("store: bump last_fired_at: %w", err)
|
||
}
|
||
}
|
||
return tx.Commit()
|
||
}
|
||
|
||
func scanChannel(scan func(...any) error) (*NotificationChannel, error) {
|
||
var c NotificationChannel
|
||
var enabled int
|
||
var defaultPri, lastFired sql.NullString
|
||
var createdAt, updatedAt string
|
||
if err := scan(&c.ID, &c.Kind, &c.Name, &enabled, &c.Config,
|
||
&defaultPri, &createdAt, &updatedAt, &lastFired); err != nil {
|
||
if errors.Is(err, sql.ErrNoRows) {
|
||
return nil, ErrNotFound
|
||
}
|
||
return nil, fmt.Errorf("store: scan channel: %w", err)
|
||
}
|
||
c.Enabled = enabled == 1
|
||
if defaultPri.Valid {
|
||
v := defaultPri.String
|
||
c.DefaultPriority = &v
|
||
}
|
||
t, err := time.Parse(time.RFC3339Nano, createdAt)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("store: parse created_at: %w", err)
|
||
}
|
||
c.CreatedAt = t
|
||
t, err = time.Parse(time.RFC3339Nano, updatedAt)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("store: parse updated_at: %w", err)
|
||
}
|
||
c.UpdatedAt = t
|
||
if lastFired.Valid {
|
||
t, _ := time.Parse(time.RFC3339Nano, lastFired.String)
|
||
c.LastFiredAt = &t
|
||
}
|
||
return &c, nil
|
||
}
|
||
|
||
// nullableInt mirrors store/util.go's nullable for *int.
|
||
func nullableInt(p *int) any {
|
||
if p == nil {
|
||
return nil
|
||
}
|
||
return *p
|
||
}
|
||
```
|
||
|
||
If `nullable` and `nullableStr` already exist in `internal/store/util.go` reuse them; check first. If `nullableInt` is new, add it.
|
||
|
||
- [ ] **Step 5: Run tests to verify they pass**
|
||
|
||
```sh
|
||
go test ./internal/store/ -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```sh
|
||
git add internal/store/notification_channels.go internal/store/notification_channels_test.go
|
||
git commit -m "store: notification_channels CRUD + AppendNotificationLog"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice B — Notification channels (transport)
|
||
|
||
### Task B1: Channel interface + payload type
|
||
|
||
**Files:**
|
||
- Create: `internal/notification/payload.go`
|
||
- Create: `internal/notification/channel.go`
|
||
|
||
- [ ] **Step 1: Define the payload + interface**
|
||
|
||
`internal/notification/payload.go`:
|
||
|
||
```go
|
||
// Package notification owns the fan-out of alert events to operator-
|
||
// configured channels. Three channels in v1: webhook, ntfy, smtp.
|
||
// Each channel implements Channel.Send for one Payload at a time;
|
||
// the Hub orchestrates fan-out, persists to notification_log.
|
||
package notification
|
||
|
||
import "time"
|
||
|
||
// Event identifies the lifecycle hook this notification is for.
|
||
type Event string
|
||
|
||
const (
|
||
EventRaised Event = "alert.raised"
|
||
EventAcknowledged Event = "alert.acknowledged"
|
||
EventResolved Event = "alert.resolved"
|
||
EventTest Event = "alert.test"
|
||
)
|
||
|
||
// Payload is the per-event blob every channel renders into its own
|
||
// shape. Severity maps to channel-specific priority (ntfy) or stays
|
||
// in the body (webhook/smtp).
|
||
type Payload struct {
|
||
Event Event // alert.raised | … | alert.test
|
||
AlertID string // ULID
|
||
Severity string // info | warning | critical
|
||
Kind string // backup_failed | …
|
||
HostID string
|
||
HostName string
|
||
Message string
|
||
RaisedAt time.Time
|
||
Link string // Absolute URL to /alerts/<id>; built by Hub
|
||
}
|
||
```
|
||
|
||
`internal/notification/channel.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import "context"
|
||
|
||
// Channel is the per-kind transport. Implementations live in
|
||
// webhook.go / ntfy.go / smtp.go. Send must respect ctx (5s for HTTP,
|
||
// 10s for SMTP) and never panic.
|
||
type Channel interface {
|
||
// Kind returns the kind string ("webhook", "ntfy", "smtp"). Used
|
||
// for log enrichment and dispatcher routing.
|
||
Kind() string
|
||
|
||
// Send delivers one payload. Returns (statusCode, latency, err).
|
||
// statusCode is HTTP for HTTP channels, the SMTP final-line code
|
||
// (e.g. 250) for SMTP, 0 if the call didn't reach a wire response.
|
||
Send(ctx context.Context, p Payload) (statusCode int, latency time.Duration, err error)
|
||
}
|
||
```
|
||
|
||
(Remember to import `time` in channel.go.)
|
||
|
||
- [ ] **Step 2: Build to verify it compiles**
|
||
|
||
```sh
|
||
go build ./internal/notification/...
|
||
```
|
||
Expected: clean build.
|
||
|
||
- [ ] **Step 3: Commit**
|
||
|
||
```sh
|
||
git add internal/notification/payload.go internal/notification/channel.go
|
||
git commit -m "notification: payload + Channel interface"
|
||
```
|
||
|
||
---
|
||
|
||
### Task B2: Webhook channel
|
||
|
||
**Files:**
|
||
- Create: `internal/notification/webhook.go`
|
||
- Test: `internal/notification/webhook_test.go`
|
||
|
||
- [ ] **Step 1: Define the config + impl skeleton**
|
||
|
||
`internal/notification/webhook.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"time"
|
||
)
|
||
|
||
// WebhookConfig is the per-channel JSON shape stored AEAD-encrypted
|
||
// in notification_channels.config.
|
||
type WebhookConfig struct {
|
||
URL string `json:"url"`
|
||
BearerToken string `json:"bearer_token,omitempty"`
|
||
HeaderName string `json:"header_name,omitempty"`
|
||
HeaderValue string `json:"header_value,omitempty"`
|
||
}
|
||
|
||
// WebhookChannel is the HTTP-POST channel. One per configured channel
|
||
// row. Reused across sends — the http.Client is goroutine-safe.
|
||
type WebhookChannel struct {
|
||
cfg WebhookConfig
|
||
client *http.Client
|
||
}
|
||
|
||
// NewWebhookChannel builds a webhook with a 5s overall timeout enforced
|
||
// by the http.Client; ctx in Send is layered on top for caller-driven
|
||
// cancel.
|
||
func NewWebhookChannel(cfg WebhookConfig) *WebhookChannel {
|
||
return &WebhookChannel{
|
||
cfg: cfg,
|
||
client: &http.Client{Timeout: 5 * time.Second},
|
||
}
|
||
}
|
||
|
||
func (c *WebhookChannel) Kind() string { return "webhook" }
|
||
|
||
// webhookBody is the wire-stable envelope. Documented in the spec; do
|
||
// not reorder fields freely — operators write switch statements on
|
||
// "event" and "severity".
|
||
type webhookBody struct {
|
||
Event string `json:"event"`
|
||
AlertID string `json:"alert_id"`
|
||
Severity string `json:"severity"`
|
||
Kind string `json:"kind"`
|
||
HostID string `json:"host_id"`
|
||
HostName string `json:"host_name"`
|
||
Message string `json:"message"`
|
||
RaisedAt string `json:"raised_at"`
|
||
Link string `json:"link"`
|
||
}
|
||
|
||
func (c *WebhookChannel) Send(ctx context.Context, p Payload) (int, time.Duration, error) {
|
||
body := webhookBody{
|
||
Event: string(p.Event), AlertID: p.AlertID,
|
||
Severity: p.Severity, Kind: p.Kind,
|
||
HostID: p.HostID, HostName: p.HostName,
|
||
Message: p.Message,
|
||
RaisedAt: p.RaisedAt.UTC().Format(time.RFC3339Nano),
|
||
Link: p.Link,
|
||
}
|
||
buf, err := json.Marshal(body)
|
||
if err != nil {
|
||
return 0, 0, fmt.Errorf("webhook: marshal body: %w", err)
|
||
}
|
||
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.cfg.URL, bytes.NewReader(buf))
|
||
if err != nil {
|
||
return 0, 0, fmt.Errorf("webhook: build request: %w", err)
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
if c.cfg.BearerToken != "" {
|
||
req.Header.Set("Authorization", "Bearer "+c.cfg.BearerToken)
|
||
}
|
||
if c.cfg.HeaderName != "" {
|
||
req.Header.Set(c.cfg.HeaderName, c.cfg.HeaderValue)
|
||
}
|
||
|
||
t0 := time.Now()
|
||
res, err := c.client.Do(req)
|
||
latency := time.Since(t0)
|
||
if err != nil {
|
||
return 0, latency, fmt.Errorf("webhook: do: %w", err)
|
||
}
|
||
defer func() { _ = res.Body.Close() }()
|
||
// Drain body — keep the connection reusable.
|
||
_, _ = io.Copy(io.Discard, res.Body)
|
||
if res.StatusCode >= 400 {
|
||
return res.StatusCode, latency, fmt.Errorf("webhook: http %d", res.StatusCode)
|
||
}
|
||
return res.StatusCode, latency, nil
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Write the failing test**
|
||
|
||
`internal/notification/webhook_test.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"net/http"
|
||
"net/http/httptest"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
func TestWebhookSendsCorrectPayloadAndHeaders(t *testing.T) {
|
||
t.Parallel()
|
||
var got webhookBody
|
||
var auth, custom string
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
auth = r.Header.Get("Authorization")
|
||
custom = r.Header.Get("X-Test")
|
||
_ = json.NewDecoder(r.Body).Decode(&got)
|
||
w.WriteHeader(http.StatusOK)
|
||
}))
|
||
defer srv.Close()
|
||
|
||
ch := NewWebhookChannel(WebhookConfig{
|
||
URL: srv.URL, BearerToken: "tok-123",
|
||
HeaderName: "X-Test", HeaderValue: "yes",
|
||
})
|
||
code, _, err := ch.Send(context.Background(), Payload{
|
||
Event: EventRaised, AlertID: "01K",
|
||
Severity: "warning", Kind: "backup_failed",
|
||
HostID: "h1", HostName: "alfa-01",
|
||
Message: "Backup failed",
|
||
RaisedAt: time.Date(2026, 5, 4, 15, 42, 1, 0, time.UTC),
|
||
Link: "https://rm.example/alerts/01K",
|
||
})
|
||
if err != nil {
|
||
t.Fatalf("send: %v", err)
|
||
}
|
||
if code != 200 {
|
||
t.Errorf("status: %d", code)
|
||
}
|
||
if got.Event != "alert.raised" || got.Kind != "backup_failed" || got.Message != "Backup failed" {
|
||
t.Errorf("body: %+v", got)
|
||
}
|
||
if auth != "Bearer tok-123" {
|
||
t.Errorf("auth: %q", auth)
|
||
}
|
||
if custom != "yes" {
|
||
t.Errorf("custom header: %q", custom)
|
||
}
|
||
}
|
||
|
||
func TestWebhookReturnsErrorOn4xx(t *testing.T) {
|
||
t.Parallel()
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||
w.WriteHeader(http.StatusUnauthorized)
|
||
}))
|
||
defer srv.Close()
|
||
ch := NewWebhookChannel(WebhookConfig{URL: srv.URL})
|
||
code, _, err := ch.Send(context.Background(), Payload{Event: EventRaised})
|
||
if err == nil {
|
||
t.Fatal("expected error for 401")
|
||
}
|
||
if code != 401 {
|
||
t.Errorf("code: %d", code)
|
||
}
|
||
}
|
||
|
||
func TestWebhookRespectsCtxTimeout(t *testing.T) {
|
||
t.Parallel()
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||
time.Sleep(2 * time.Second)
|
||
w.WriteHeader(200)
|
||
}))
|
||
defer srv.Close()
|
||
ch := NewWebhookChannel(WebhookConfig{URL: srv.URL})
|
||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||
defer cancel()
|
||
_, _, err := ch.Send(ctx, Payload{Event: EventRaised})
|
||
if err == nil {
|
||
t.Fatal("expected timeout error")
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run tests**
|
||
|
||
```sh
|
||
go test ./internal/notification/ -run TestWebhook -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add internal/notification/webhook.go internal/notification/webhook_test.go
|
||
git commit -m "notification: webhook channel"
|
||
```
|
||
|
||
---
|
||
|
||
### Task B3: Ntfy channel
|
||
|
||
**Files:**
|
||
- Create: `internal/notification/ntfy.go`
|
||
- Test: `internal/notification/ntfy_test.go`
|
||
|
||
- [ ] **Step 1: Implementation**
|
||
|
||
`internal/notification/ntfy.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
type NtfyConfig struct {
|
||
ServerURL string `json:"server_url"` // default https://ntfy.sh
|
||
Topic string `json:"topic"`
|
||
AccessToken string `json:"access_token,omitempty"`
|
||
}
|
||
|
||
type NtfyChannel struct {
|
||
cfg NtfyConfig
|
||
defaultPriority string
|
||
client *http.Client
|
||
}
|
||
|
||
// NewNtfyChannel builds the channel; defaultPriority is the channel-
|
||
// configured fallback (one of "min" | "low" | "default" | "high" |
|
||
// "urgent" or empty).
|
||
func NewNtfyChannel(cfg NtfyConfig, defaultPriority string) *NtfyChannel {
|
||
return &NtfyChannel{
|
||
cfg: cfg, defaultPriority: defaultPriority,
|
||
client: &http.Client{Timeout: 5 * time.Second},
|
||
}
|
||
}
|
||
|
||
func (c *NtfyChannel) Kind() string { return "ntfy" }
|
||
|
||
func (c *NtfyChannel) Send(ctx context.Context, p Payload) (int, time.Duration, error) {
|
||
server := c.cfg.ServerURL
|
||
if server == "" {
|
||
server = "https://ntfy.sh"
|
||
}
|
||
url := strings.TrimRight(server, "/") + "/" + c.cfg.Topic
|
||
body := strings.NewReader(p.Message)
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
|
||
if err != nil {
|
||
return 0, 0, fmt.Errorf("ntfy: build req: %w", err)
|
||
}
|
||
req.Header.Set("Content-Type", "text/plain")
|
||
req.Header.Set("Title", "["+p.Severity+"] "+p.HostName+" "+p.Kind)
|
||
req.Header.Set("Tags", p.Severity+","+p.Kind)
|
||
if p.Link != "" {
|
||
req.Header.Set("Click", p.Link)
|
||
}
|
||
req.Header.Set("Priority", priorityForSeverity(p.Severity, c.defaultPriority))
|
||
if c.cfg.AccessToken != "" {
|
||
req.Header.Set("Authorization", "Bearer "+c.cfg.AccessToken)
|
||
}
|
||
|
||
t0 := time.Now()
|
||
res, err := c.client.Do(req)
|
||
latency := time.Since(t0)
|
||
if err != nil {
|
||
return 0, latency, fmt.Errorf("ntfy: do: %w", err)
|
||
}
|
||
defer func() { _ = res.Body.Close() }()
|
||
_, _ = io.Copy(io.Discard, res.Body)
|
||
if res.StatusCode >= 400 {
|
||
return res.StatusCode, latency, fmt.Errorf("ntfy: http %d", res.StatusCode)
|
||
}
|
||
return res.StatusCode, latency, nil
|
||
}
|
||
|
||
// priorityForSeverity maps severity → ntfy priority. Critical always
|
||
// wins (operator's default is overridden).
|
||
func priorityForSeverity(severity, defaultPri string) string {
|
||
switch severity {
|
||
case "critical":
|
||
return "5" // urgent
|
||
case "warning":
|
||
if defaultPri != "" {
|
||
return defaultPri
|
||
}
|
||
return "4"
|
||
default:
|
||
if defaultPri != "" {
|
||
return defaultPri
|
||
}
|
||
return "3"
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Write the failing test**
|
||
|
||
`internal/notification/ntfy_test.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"io"
|
||
"net/http"
|
||
"net/http/httptest"
|
||
"testing"
|
||
)
|
||
|
||
func TestNtfySendsHeadersAndBody(t *testing.T) {
|
||
t.Parallel()
|
||
type captured struct {
|
||
title, tags, click, priority, auth, body string
|
||
}
|
||
var got captured
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
got.title = r.Header.Get("Title")
|
||
got.tags = r.Header.Get("Tags")
|
||
got.click = r.Header.Get("Click")
|
||
got.priority = r.Header.Get("Priority")
|
||
got.auth = r.Header.Get("Authorization")
|
||
b, _ := io.ReadAll(r.Body)
|
||
got.body = string(b)
|
||
w.WriteHeader(200)
|
||
}))
|
||
defer srv.Close()
|
||
|
||
ch := NewNtfyChannel(NtfyConfig{
|
||
ServerURL: srv.URL, Topic: "rmf",
|
||
AccessToken: "tk1",
|
||
}, "")
|
||
_, _, err := ch.Send(context.Background(), Payload{
|
||
Severity: "critical", HostName: "alfa-01", Kind: "check_failed",
|
||
Message: "errors found", Link: "https://rm.example/a",
|
||
})
|
||
if err != nil {
|
||
t.Fatalf("send: %v", err)
|
||
}
|
||
if got.title != "[critical] alfa-01 check_failed" {
|
||
t.Errorf("title: %q", got.title)
|
||
}
|
||
if got.priority != "5" {
|
||
t.Errorf("priority: %q want 5 (critical → urgent)", got.priority)
|
||
}
|
||
if got.tags != "critical,check_failed" {
|
||
t.Errorf("tags: %q", got.tags)
|
||
}
|
||
if got.click != "https://rm.example/a" {
|
||
t.Errorf("click: %q", got.click)
|
||
}
|
||
if got.auth != "Bearer tk1" {
|
||
t.Errorf("auth: %q", got.auth)
|
||
}
|
||
if got.body != "errors found" {
|
||
t.Errorf("body: %q", got.body)
|
||
}
|
||
}
|
||
|
||
func TestNtfyDefaultPriorityRespected(t *testing.T) {
|
||
t.Parallel()
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||
w.WriteHeader(200)
|
||
}))
|
||
defer srv.Close()
|
||
ch := NewNtfyChannel(NtfyConfig{ServerURL: srv.URL, Topic: "t"}, "min")
|
||
// Use info severity — default should win.
|
||
if got := priorityForSeverity("info", "min"); got != "min" {
|
||
t.Errorf("info+default=min: got %q", got)
|
||
}
|
||
// Critical always overrides default.
|
||
if got := priorityForSeverity("critical", "min"); got != "5" {
|
||
t.Errorf("critical: got %q", got)
|
||
}
|
||
_ = ch
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run tests**
|
||
|
||
```sh
|
||
go test ./internal/notification/ -run TestNtfy -count=1
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add internal/notification/ntfy.go internal/notification/ntfy_test.go
|
||
git commit -m "notification: ntfy channel"
|
||
```
|
||
|
||
---
|
||
|
||
### Task B4: SMTP channel
|
||
|
||
**Files:**
|
||
- Create: `internal/notification/smtp.go`
|
||
- Test: `internal/notification/smtp_test.go`
|
||
|
||
- [ ] **Step 1: Implementation**
|
||
|
||
`internal/notification/smtp.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"crypto/tls"
|
||
"fmt"
|
||
"net"
|
||
"net/smtp"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
type SMTPConfig struct {
|
||
Host string `json:"host"`
|
||
Port int `json:"port"`
|
||
Encryption string `json:"encryption"` // "starttls" | "tls" | "none"
|
||
Username string `json:"username"`
|
||
Password string `json:"password"`
|
||
From string `json:"from"`
|
||
To string `json:"to"`
|
||
}
|
||
|
||
type SMTPChannel struct {
|
||
cfg SMTPConfig
|
||
// linkBaseHost holds the public base hostname of restic-manager so
|
||
// Message-IDs include a stable right-hand-side. Falls back to
|
||
// "restic-manager.local" when unset.
|
||
messageIDDomain string
|
||
}
|
||
|
||
// NewSMTPChannel builds an SMTP channel. messageIDDomain comes from
|
||
// cfg.Cfg.BaseURL — caller passes it through.
|
||
func NewSMTPChannel(cfg SMTPConfig, messageIDDomain string) *SMTPChannel {
|
||
if messageIDDomain == "" {
|
||
messageIDDomain = "restic-manager.local"
|
||
}
|
||
return &SMTPChannel{cfg: cfg, messageIDDomain: messageIDDomain}
|
||
}
|
||
|
||
func (c *SMTPChannel) Kind() string { return "smtp" }
|
||
|
||
func (c *SMTPChannel) Send(ctx context.Context, p Payload) (int, time.Duration, error) {
|
||
t0 := time.Now()
|
||
addr := fmt.Sprintf("%s:%d", c.cfg.Host, c.cfg.Port)
|
||
|
||
// Dial respects ctx (we use net.Dialer).
|
||
dialer := &net.Dialer{Timeout: 10 * time.Second}
|
||
rawConn, err := dialer.DialContext(ctx, "tcp", addr)
|
||
if err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: dial %s: %w", addr, err)
|
||
}
|
||
|
||
var client *smtp.Client
|
||
switch strings.ToLower(c.cfg.Encryption) {
|
||
case "tls":
|
||
conn := tls.Client(rawConn, &tls.Config{ServerName: c.cfg.Host, MinVersion: tls.VersionTLS12})
|
||
client, err = smtp.NewClient(conn, c.cfg.Host)
|
||
case "starttls", "":
|
||
client, err = smtp.NewClient(rawConn, c.cfg.Host)
|
||
if err == nil {
|
||
err = client.StartTLS(&tls.Config{ServerName: c.cfg.Host, MinVersion: tls.VersionTLS12})
|
||
}
|
||
case "none":
|
||
client, err = smtp.NewClient(rawConn, c.cfg.Host)
|
||
default:
|
||
_ = rawConn.Close()
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: unknown encryption %q", c.cfg.Encryption)
|
||
}
|
||
if err != nil {
|
||
_ = rawConn.Close()
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: handshake: %w", err)
|
||
}
|
||
defer func() { _ = client.Quit() }()
|
||
|
||
if c.cfg.Username != "" {
|
||
auth := smtp.PlainAuth("", c.cfg.Username, c.cfg.Password, c.cfg.Host)
|
||
if err := client.Auth(auth); err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: auth: %w", err)
|
||
}
|
||
}
|
||
|
||
if err := client.Mail(extractAddr(c.cfg.From)); err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: MAIL FROM: %w", err)
|
||
}
|
||
if err := client.Rcpt(c.cfg.To); err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: RCPT TO: %w", err)
|
||
}
|
||
wc, err := client.Data()
|
||
if err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: DATA: %w", err)
|
||
}
|
||
msg := buildEmailBody(c.cfg, c.messageIDDomain, p)
|
||
if _, err := wc.Write(msg); err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: write: %w", err)
|
||
}
|
||
if err := wc.Close(); err != nil {
|
||
return 0, time.Since(t0), fmt.Errorf("smtp: close DATA: %w", err)
|
||
}
|
||
|
||
return 250, time.Since(t0), nil
|
||
}
|
||
|
||
// extractAddr pulls the bare email out of a "Name <addr@host>" form.
|
||
func extractAddr(s string) string {
|
||
if i, j := strings.LastIndex(s, "<"), strings.LastIndex(s, ">"); i >= 0 && j > i {
|
||
return s[i+1 : j]
|
||
}
|
||
return s
|
||
}
|
||
|
||
// buildEmailBody assembles the RFC 5322 message bytes per the spec.
|
||
// Plain text only; subject hardcoded.
|
||
func buildEmailBody(cfg SMTPConfig, msgIDDomain string, p Payload) []byte {
|
||
var b strings.Builder
|
||
b.WriteString("From: " + cfg.From + "\r\n")
|
||
b.WriteString("To: " + cfg.To + "\r\n")
|
||
b.WriteString(fmt.Sprintf("Subject: [restic-manager] [%s] %s: %s\r\n", p.Severity, p.HostName, p.Kind))
|
||
b.WriteString("Date: " + p.RaisedAt.UTC().Format(time.RFC1123Z) + "\r\n")
|
||
b.WriteString("Message-ID: <" + p.AlertID + "@" + msgIDDomain + ">\r\n")
|
||
b.WriteString("MIME-Version: 1.0\r\n")
|
||
b.WriteString("Content-Type: text/plain; charset=utf-8\r\n")
|
||
b.WriteString("\r\n")
|
||
b.WriteString(p.Message + "\r\n\r\n")
|
||
b.WriteString("—\r\n")
|
||
b.WriteString("Raised at: " + p.RaisedAt.UTC().Format(time.RFC3339) + "\r\n")
|
||
b.WriteString("Severity: " + p.Severity + "\r\n")
|
||
b.WriteString("Host: " + p.HostName + "\r\n")
|
||
b.WriteString("Kind: " + p.Kind + "\r\n")
|
||
if p.Link != "" {
|
||
b.WriteString("\r\nOpen in restic-manager:\r\n")
|
||
b.WriteString(p.Link + "\r\n")
|
||
}
|
||
b.WriteString("\r\n(This message was sent by restic-manager. Acknowledge or resolve in the UI.)\r\n")
|
||
return []byte(b.String())
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Write the failing test using a fake SMTP server**
|
||
|
||
`internal/notification/smtp_test.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"net"
|
||
"strings"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
// fakeSMTPServer accepts a single connection, runs the minimal SMTP
|
||
// dialogue (HELO/EHLO, MAIL FROM, RCPT TO, DATA, QUIT) and stores
|
||
// what came across the wire. Plain (no TLS) — we test the protocol
|
||
// shape, not crypto.
|
||
type fakeSMTPServer struct {
|
||
mu sync.Mutex
|
||
mailFrom string
|
||
rcptTo string
|
||
data string
|
||
authed bool
|
||
}
|
||
|
||
func startFakeSMTP(t *testing.T) (string, *fakeSMTPServer) {
|
||
t.Helper()
|
||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||
if err != nil {
|
||
t.Fatalf("listen: %v", err)
|
||
}
|
||
srv := &fakeSMTPServer{}
|
||
t.Cleanup(func() { _ = ln.Close() })
|
||
go func() {
|
||
conn, err := ln.Accept()
|
||
if err != nil {
|
||
return
|
||
}
|
||
defer func() { _ = conn.Close() }()
|
||
readLine := func() string {
|
||
buf := make([]byte, 1024)
|
||
n, err := conn.Read(buf)
|
||
if err != nil {
|
||
return ""
|
||
}
|
||
return string(buf[:n])
|
||
}
|
||
write := func(s string) { _, _ = conn.Write([]byte(s)) }
|
||
|
||
write("220 fake.smtp ESMTP\r\n")
|
||
for {
|
||
line := readLine()
|
||
if line == "" {
|
||
return
|
||
}
|
||
cmd := strings.ToUpper(strings.TrimSpace(line))
|
||
switch {
|
||
case strings.HasPrefix(cmd, "EHLO"), strings.HasPrefix(cmd, "HELO"):
|
||
write("250-fake.smtp\r\n250 AUTH PLAIN\r\n")
|
||
case strings.HasPrefix(cmd, "AUTH "):
|
||
srv.mu.Lock()
|
||
srv.authed = true
|
||
srv.mu.Unlock()
|
||
write("235 OK\r\n")
|
||
case strings.HasPrefix(cmd, "MAIL FROM"):
|
||
srv.mu.Lock()
|
||
srv.mailFrom = strings.TrimSpace(strings.TrimPrefix(line, "MAIL FROM:"))
|
||
srv.mu.Unlock()
|
||
write("250 OK\r\n")
|
||
case strings.HasPrefix(cmd, "RCPT TO"):
|
||
srv.mu.Lock()
|
||
srv.rcptTo = strings.TrimSpace(strings.TrimPrefix(line, "RCPT TO:"))
|
||
srv.mu.Unlock()
|
||
write("250 OK\r\n")
|
||
case cmd == "DATA":
|
||
write("354 OK\r\n")
|
||
// read until "\r\n.\r\n"
|
||
var data strings.Builder
|
||
for {
|
||
chunk := readLine()
|
||
if chunk == "" {
|
||
break
|
||
}
|
||
data.WriteString(chunk)
|
||
if strings.Contains(data.String(), "\r\n.\r\n") {
|
||
break
|
||
}
|
||
}
|
||
srv.mu.Lock()
|
||
srv.data = data.String()
|
||
srv.mu.Unlock()
|
||
write("250 OK\r\n")
|
||
case cmd == "QUIT":
|
||
write("221 bye\r\n")
|
||
return
|
||
default:
|
||
write("500 unknown\r\n")
|
||
}
|
||
}
|
||
}()
|
||
return ln.Addr().String(), srv
|
||
}
|
||
|
||
func TestSMTPSendsExpectedHeaders(t *testing.T) {
|
||
t.Parallel()
|
||
addr, srv := startFakeSMTP(t)
|
||
host, port := splitHostPort(addr)
|
||
|
||
ch := NewSMTPChannel(SMTPConfig{
|
||
Host: host, Port: port, Encryption: "none",
|
||
Username: "u", Password: "p",
|
||
From: "Restic-Manager <alerts@example.com>",
|
||
To: "ops@example.com",
|
||
}, "rm.example")
|
||
|
||
_, _, err := ch.Send(context.Background(), Payload{
|
||
Event: EventRaised, AlertID: "01ABC",
|
||
Severity: "warning", Kind: "backup_failed",
|
||
HostName: "alfa-01", Message: "Backup failed: 401",
|
||
RaisedAt: time.Date(2026, 5, 4, 15, 42, 1, 0, time.UTC),
|
||
Link: "https://rm.example/alerts/01ABC",
|
||
})
|
||
if err != nil {
|
||
t.Fatalf("send: %v", err)
|
||
}
|
||
|
||
srv.mu.Lock()
|
||
defer srv.mu.Unlock()
|
||
if !srv.authed {
|
||
t.Errorf("AUTH never sent")
|
||
}
|
||
if !strings.Contains(srv.mailFrom, "alerts@example.com") {
|
||
t.Errorf("MAIL FROM: %q", srv.mailFrom)
|
||
}
|
||
if !strings.Contains(srv.rcptTo, "ops@example.com") {
|
||
t.Errorf("RCPT TO: %q", srv.rcptTo)
|
||
}
|
||
if !strings.Contains(srv.data, "Subject: [restic-manager] [warning] alfa-01: backup_failed") {
|
||
t.Errorf("subject missing or wrong: %q", srv.data)
|
||
}
|
||
if !strings.Contains(srv.data, "Message-ID: <01ABC@rm.example>") {
|
||
t.Errorf("Message-ID wrong: %q", srv.data)
|
||
}
|
||
if !strings.Contains(srv.data, "Backup failed: 401") {
|
||
t.Errorf("body missing: %q", srv.data)
|
||
}
|
||
}
|
||
|
||
func splitHostPort(addr string) (string, int) {
|
||
host, portStr, _ := net.SplitHostPort(addr)
|
||
var port int
|
||
for _, r := range portStr {
|
||
port = port*10 + int(r-'0')
|
||
}
|
||
return host, port
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run the test**
|
||
|
||
```sh
|
||
go test ./internal/notification/ -run TestSMTP -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add internal/notification/smtp.go internal/notification/smtp_test.go
|
||
git commit -m "notification: smtp channel"
|
||
```
|
||
|
||
---
|
||
|
||
### Task B5: notification.Hub — fan-out + log writer
|
||
|
||
**Files:**
|
||
- Create: `internal/notification/hub.go`
|
||
- Test: `internal/notification/hub_test.go`
|
||
|
||
- [ ] **Step 1: Implementation**
|
||
|
||
`internal/notification/hub.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"crypto/rand"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"log/slog"
|
||
"sync"
|
||
"time"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
// Hub fans Payload events out to every enabled channel and persists
|
||
// the result to notification_log. One Hub per process; thread-safe.
|
||
type Hub struct {
|
||
store *store.Store
|
||
aead *crypto.AEAD
|
||
baseURL string // e.g. https://restic-manager.example
|
||
msgIDDomain string // hostname extracted from baseURL for SMTP Message-ID
|
||
}
|
||
|
||
func NewHub(st *store.Store, aead *crypto.AEAD, baseURL string) *Hub {
|
||
return &Hub{
|
||
store: st, aead: aead, baseURL: baseURL,
|
||
msgIDDomain: extractDomain(baseURL),
|
||
}
|
||
}
|
||
|
||
// Dispatch fans out to every enabled channel. Best-effort — failures
|
||
// are logged to notification_log but don't propagate. Each channel
|
||
// runs in its own goroutine; Dispatch returns when all have settled
|
||
// (so the caller can block briefly for the test-button case).
|
||
func (h *Hub) Dispatch(ctx context.Context, p Payload) {
|
||
chans, err := h.store.ListEnabledNotificationChannels(ctx)
|
||
if err != nil {
|
||
slog.Error("notification: list channels", "err", err)
|
||
return
|
||
}
|
||
// Stamp the link if not already set.
|
||
if p.Link == "" {
|
||
p.Link = h.baseURL + "/alerts/" + p.AlertID
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
for _, c := range chans {
|
||
wg.Add(1)
|
||
go func(c store.NotificationChannel) {
|
||
defer wg.Done()
|
||
h.send(ctx, c, p)
|
||
}(c)
|
||
}
|
||
wg.Wait()
|
||
}
|
||
|
||
// DispatchOne fires a single channel — used by the "Send test
|
||
// notification" button. Returns the log entry it just persisted so
|
||
// the handler can render the result inline.
|
||
func (h *Hub) DispatchOne(ctx context.Context, channelID string, p Payload) (store.NotificationLogEntry, error) {
|
||
c, err := h.store.GetNotificationChannel(ctx, channelID)
|
||
if err != nil {
|
||
return store.NotificationLogEntry{}, err
|
||
}
|
||
if p.Link == "" {
|
||
p.Link = h.baseURL + "/alerts/" + p.AlertID
|
||
}
|
||
return h.send(ctx, *c, p), nil
|
||
}
|
||
|
||
func (h *Hub) send(ctx context.Context, c store.NotificationChannel, p Payload) store.NotificationLogEntry {
|
||
ch, err := h.buildChannel(c)
|
||
logID := newID()
|
||
logEntry := store.NotificationLogEntry{
|
||
ID: logID, ChannelID: c.ID,
|
||
Event: string(p.Event), FiredAt: time.Now().UTC(),
|
||
}
|
||
if p.AlertID != "" {
|
||
aid := p.AlertID
|
||
logEntry.AlertID = &aid
|
||
}
|
||
if err != nil {
|
||
errStr := err.Error()
|
||
logEntry.OK = false
|
||
logEntry.Error = &errStr
|
||
_ = h.store.AppendNotificationLog(ctx, logEntry)
|
||
return logEntry
|
||
}
|
||
code, latency, sendErr := ch.Send(ctx, p)
|
||
statusCode := code
|
||
latencyMS := int(latency.Milliseconds())
|
||
logEntry.StatusCode = &statusCode
|
||
logEntry.LatencyMS = &latencyMS
|
||
if sendErr != nil {
|
||
errStr := sendErr.Error()
|
||
logEntry.OK = false
|
||
logEntry.Error = &errStr
|
||
} else {
|
||
logEntry.OK = true
|
||
}
|
||
if err := h.store.AppendNotificationLog(ctx, logEntry); err != nil {
|
||
slog.Warn("notification: persist log", "err", err)
|
||
}
|
||
return logEntry
|
||
}
|
||
|
||
// buildChannel decrypts the channel config and returns a Channel impl.
|
||
func (h *Hub) buildChannel(row store.NotificationChannel) (Channel, error) {
|
||
plain, err := h.aead.Open(row.Config, []byte("notification-channel:"+row.ID))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
switch row.Kind {
|
||
case "webhook":
|
||
var cfg WebhookConfig
|
||
if err := json.Unmarshal(plain, &cfg); err != nil {
|
||
return nil, err
|
||
}
|
||
return NewWebhookChannel(cfg), nil
|
||
case "ntfy":
|
||
var cfg NtfyConfig
|
||
if err := json.Unmarshal(plain, &cfg); err != nil {
|
||
return nil, err
|
||
}
|
||
dp := ""
|
||
if row.DefaultPriority != nil {
|
||
dp = *row.DefaultPriority
|
||
}
|
||
return NewNtfyChannel(cfg, dp), nil
|
||
case "smtp":
|
||
var cfg SMTPConfig
|
||
if err := json.Unmarshal(plain, &cfg); err != nil {
|
||
return nil, err
|
||
}
|
||
return NewSMTPChannel(cfg, h.msgIDDomain), nil
|
||
}
|
||
return nil, errUnknownKind(row.Kind)
|
||
}
|
||
|
||
func newID() string {
|
||
var b [16]byte
|
||
_, _ = rand.Read(b[:])
|
||
return hex.EncodeToString(b[:])
|
||
}
|
||
|
||
func extractDomain(baseURL string) string {
|
||
// Tiny: strip scheme + path. Good enough for Message-ID right-hand-side.
|
||
s := baseURL
|
||
if i := indexOf(s, "://"); i >= 0 {
|
||
s = s[i+3:]
|
||
}
|
||
if i := indexOf(s, "/"); i >= 0 {
|
||
s = s[:i]
|
||
}
|
||
if s == "" {
|
||
return "restic-manager.local"
|
||
}
|
||
return s
|
||
}
|
||
|
||
func indexOf(s, sub string) int {
|
||
for i := 0; i+len(sub) <= len(s); i++ {
|
||
if s[i:i+len(sub)] == sub {
|
||
return i
|
||
}
|
||
}
|
||
return -1
|
||
}
|
||
|
||
type errUnknownKind string
|
||
|
||
func (e errUnknownKind) Error() string { return "notification: unknown kind: " + string(e) }
|
||
```
|
||
|
||
- [ ] **Step 2: Write the failing test**
|
||
|
||
`internal/notification/hub_test.go`:
|
||
|
||
```go
|
||
package notification
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"net/http"
|
||
"net/http/httptest"
|
||
"path/filepath"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/oklog/ulid/v2"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
func setupHub(t *testing.T) (*Hub, *store.Store) {
|
||
t.Helper()
|
||
dir := t.TempDir()
|
||
st, err := store.Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
if err != nil {
|
||
t.Fatalf("store: %v", err)
|
||
}
|
||
t.Cleanup(func() { _ = st.Close() })
|
||
keyPath := filepath.Join(dir, "secret.key")
|
||
_ = crypto.GenerateKeyFile(keyPath)
|
||
key, _ := crypto.LoadKeyFromFile(keyPath)
|
||
aead, _ := crypto.NewAEAD(key)
|
||
return NewHub(st, aead, "https://rm.example"), st
|
||
}
|
||
|
||
func TestHubDispatchRecordsLogEntries(t *testing.T) {
|
||
t.Parallel()
|
||
hub, st := setupHub(t)
|
||
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||
w.WriteHeader(200)
|
||
}))
|
||
defer srv.Close()
|
||
|
||
cfg, _ := json.Marshal(WebhookConfig{URL: srv.URL})
|
||
enc, err := hub.aead.Seal(cfg, []byte("notification-channel:test-ch"))
|
||
if err != nil {
|
||
t.Fatalf("seal: %v", err)
|
||
}
|
||
if err := st.CreateNotificationChannel(context.Background(), store.NotificationChannel{
|
||
ID: "test-ch", Kind: "webhook", Name: "test", Enabled: true,
|
||
Config: enc, CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(),
|
||
}); err != nil {
|
||
t.Fatalf("create channel: %v", err)
|
||
}
|
||
|
||
hub.Dispatch(context.Background(), Payload{
|
||
Event: EventRaised, AlertID: ulid.Make().String(),
|
||
Severity: "warning", Kind: "backup_failed",
|
||
HostName: "alfa-01", Message: "x", RaisedAt: time.Now().UTC(),
|
||
})
|
||
|
||
// Verify a log row landed.
|
||
var n int
|
||
if err := st.DB().QueryRow(`SELECT COUNT(*) FROM notification_log WHERE channel_id = ? AND ok = 1`, "test-ch").Scan(&n); err != nil {
|
||
t.Fatalf("count: %v", err)
|
||
}
|
||
if n != 1 {
|
||
t.Fatalf("expected 1 log row, got %d", n)
|
||
}
|
||
}
|
||
|
||
func TestHubSkipsDisabledChannels(t *testing.T) {
|
||
t.Parallel()
|
||
hub, st := setupHub(t)
|
||
cfg, _ := json.Marshal(WebhookConfig{URL: "http://no-such-host.invalid"})
|
||
enc, _ := hub.aead.Seal(cfg, []byte("notification-channel:dis"))
|
||
_ = st.CreateNotificationChannel(context.Background(), store.NotificationChannel{
|
||
ID: "dis", Kind: "webhook", Name: "off", Enabled: false,
|
||
Config: enc, CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(),
|
||
})
|
||
hub.Dispatch(context.Background(), Payload{
|
||
Event: EventRaised, AlertID: "x", Severity: "warning",
|
||
Kind: "backup_failed", HostName: "h", Message: "m", RaisedAt: time.Now().UTC(),
|
||
})
|
||
var n int
|
||
_ = st.DB().QueryRow(`SELECT COUNT(*) FROM notification_log`).Scan(&n)
|
||
if n != 0 {
|
||
t.Errorf("disabled channel produced log rows: %d", n)
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run tests**
|
||
|
||
```sh
|
||
go test ./internal/notification/ -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add internal/notification/hub.go internal/notification/hub_test.go
|
||
git commit -m "notification: Hub fan-out + log writer"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice C — Alert engine
|
||
|
||
### Task C1: Engine struct + dispatch loop + auto-resolve sweep
|
||
|
||
**Files:**
|
||
- Create: `internal/alert/engine.go`
|
||
- Test: `internal/alert/engine_test.go`
|
||
|
||
- [ ] **Step 1: Engine skeleton + types**
|
||
|
||
`internal/alert/engine.go`:
|
||
|
||
```go
|
||
// Package alert evaluates the hardcoded rule set and persists raises
|
||
// / acknowledges / resolves. Three event sources feed it:
|
||
// - JobFinishedEvent — pushed when a job lands a terminal state
|
||
// (the existing MarkJobFinished site)
|
||
// - HostOfflineEvent / HostOnlineEvent — pushed by the offline
|
||
// sweeper and by the ws hello handler
|
||
// - 60s ticker (internal) — drives stale-schedule + auto-resolve
|
||
//
|
||
// All output goes through store.RaiseOrTouch / Acknowledge / Resolve
|
||
// and the notification.Hub. The engine is one goroutine started at
|
||
// boot; non-blocking sends from hot paths.
|
||
package alert
|
||
|
||
import (
|
||
"context"
|
||
"log/slog"
|
||
"sync"
|
||
"time"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
// JobFinishedEvent carries everything the engine needs to evaluate
|
||
// the failed-X rules. Pushed via Engine.NotifyJobFinished from the
|
||
// MarkJobFinished site.
|
||
type JobFinishedEvent struct {
|
||
HostID string
|
||
JobID string
|
||
Kind string // backup | forget | prune | check | unlock | restore | diff
|
||
Status string // succeeded | failed | cancelled
|
||
When time.Time
|
||
}
|
||
|
||
type Engine struct {
|
||
store *store.Store
|
||
hub *notification.Hub
|
||
|
||
jobs chan JobFinishedEvent
|
||
hostDown chan string // host_id
|
||
hostUp chan string
|
||
|
||
// agentOfflineFloor is the duration a host must be offline before
|
||
// we raise. Configurable for tests; default 15m.
|
||
agentOfflineFloor time.Duration
|
||
tickPeriod time.Duration
|
||
|
||
closeOnce sync.Once
|
||
done chan struct{}
|
||
}
|
||
|
||
// NewEngine builds the engine. agentOfflineFloor + tickPeriod default
|
||
// to 15min and 60s respectively when zero.
|
||
func NewEngine(st *store.Store, hub *notification.Hub) *Engine {
|
||
return &Engine{
|
||
store: st,
|
||
hub: hub,
|
||
jobs: make(chan JobFinishedEvent, 32),
|
||
hostDown: make(chan string, 32),
|
||
hostUp: make(chan string, 32),
|
||
agentOfflineFloor: 15 * time.Minute,
|
||
tickPeriod: 60 * time.Second,
|
||
done: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// Run drives the event loop. Returns when ctx is done. Blocks; call in
|
||
// its own goroutine.
|
||
func (e *Engine) Run(ctx context.Context) {
|
||
t := time.NewTicker(e.tickPeriod)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
e.closeOnce.Do(func() { close(e.done) })
|
||
return
|
||
case ev := <-e.jobs:
|
||
e.handleJobFinished(ctx, ev)
|
||
case hostID := <-e.hostDown:
|
||
e.handleHostOffline(ctx, hostID)
|
||
case hostID := <-e.hostUp:
|
||
e.handleHostOnline(ctx, hostID)
|
||
case now := <-t.C:
|
||
e.tick(ctx, now)
|
||
}
|
||
}
|
||
}
|
||
|
||
// NotifyJobFinished is the hot-path hook called from MarkJobFinished's
|
||
// caller (ws.handler.dispatchAgentMessage). Non-blocking: drops on a
|
||
// full channel with a slog warning.
|
||
func (e *Engine) NotifyJobFinished(ev JobFinishedEvent) {
|
||
select {
|
||
case e.jobs <- ev:
|
||
default:
|
||
slog.Warn("alert: jobs channel full; dropping event", "kind", ev.Kind, "host_id", ev.HostID)
|
||
}
|
||
}
|
||
|
||
func (e *Engine) NotifyHostOffline(hostID string) {
|
||
select {
|
||
case e.hostDown <- hostID:
|
||
default:
|
||
slog.Warn("alert: hostDown channel full; dropping", "host_id", hostID)
|
||
}
|
||
}
|
||
|
||
func (e *Engine) NotifyHostOnline(hostID string) {
|
||
select {
|
||
case e.hostUp <- hostID:
|
||
default:
|
||
slog.Warn("alert: hostUp channel full; dropping", "host_id", hostID)
|
||
}
|
||
}
|
||
```
|
||
|
||
(`handleJobFinished`, `handleHostOffline`, `handleHostOnline`, and
|
||
`tick` come in C2.)
|
||
|
||
- [ ] **Step 2: Build to confirm it compiles**
|
||
|
||
```sh
|
||
go build ./internal/alert/...
|
||
```
|
||
Expected: clean.
|
||
|
||
- [ ] **Step 3: Commit**
|
||
|
||
```sh
|
||
git add internal/alert/engine.go
|
||
git commit -m "alert: engine skeleton + event channels"
|
||
```
|
||
|
||
---
|
||
|
||
### Task C2: Engine — rule logic for the six rules
|
||
|
||
**Files:**
|
||
- Create: `internal/alert/rules.go`
|
||
- Modify: `internal/alert/engine.go` (fill in handle* methods)
|
||
- Test: `internal/alert/rules_test.go`
|
||
|
||
- [ ] **Step 1: Rule helper module**
|
||
|
||
`internal/alert/rules.go`:
|
||
|
||
```go
|
||
package alert
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
// Rule kinds — keep in lockstep with the engine logic + UI tag-color
|
||
// table.
|
||
const (
|
||
KindBackupFailed = "backup_failed"
|
||
KindForgetFailed = "forget_failed"
|
||
KindPruneFailed = "prune_failed"
|
||
KindCheckFailed = "check_failed"
|
||
KindStaleSchedule = "stale_schedule"
|
||
KindAgentOffline = "agent_offline"
|
||
)
|
||
|
||
// raiseAndNotify is the standard pattern: store.RaiseOrTouch +
|
||
// notification.Hub.Dispatch only on first raise.
|
||
func (e *Engine) raiseAndNotify(ctx context.Context, hostID, kind, severity, message string, when time.Time) {
|
||
id, didRaise, err := e.store.RaiseOrTouch(ctx, hostID, kind, severity, message, when)
|
||
if err != nil {
|
||
// Not fatal — log and move on.
|
||
slogWarn("alert: raise", "kind", kind, "host_id", hostID, "err", err)
|
||
return
|
||
}
|
||
if !didRaise {
|
||
return
|
||
}
|
||
host, err := e.store.GetHost(ctx, hostID)
|
||
hostName := hostID
|
||
if err == nil {
|
||
hostName = host.Name
|
||
}
|
||
go e.hub.Dispatch(ctx, notification.Payload{
|
||
Event: notification.EventRaised, AlertID: id,
|
||
Severity: severity, Kind: kind,
|
||
HostID: hostID, HostName: hostName,
|
||
Message: message,
|
||
RaisedAt: when,
|
||
})
|
||
}
|
||
|
||
// resolveAndNotify clears any open alert for (host_id, kind) and
|
||
// fires alert.resolved on each that was actually open. Best-effort.
|
||
func (e *Engine) resolveAndNotify(ctx context.Context, hostID, kind string, when time.Time) {
|
||
open, err := e.store.ListAlerts(ctx, store.AlertFilter{
|
||
Status: "open", HostID: hostID,
|
||
})
|
||
if err != nil {
|
||
return
|
||
}
|
||
openAcked, _ := e.store.ListAlerts(ctx, store.AlertFilter{
|
||
Status: "acknowledged", HostID: hostID,
|
||
})
|
||
all := append(open, openAcked...)
|
||
if err := e.store.AutoResolve(ctx, hostID, kind, when); err != nil {
|
||
slogWarn("alert: auto-resolve", "kind", kind, "host_id", hostID, "err", err)
|
||
return
|
||
}
|
||
host, _ := e.store.GetHost(ctx, hostID)
|
||
hostName := hostID
|
||
if host != nil {
|
||
hostName = host.Name
|
||
}
|
||
for _, a := range all {
|
||
if a.Kind != kind {
|
||
continue
|
||
}
|
||
go e.hub.Dispatch(ctx, notification.Payload{
|
||
Event: notification.EventResolved, AlertID: a.ID,
|
||
Severity: a.Severity, Kind: a.Kind,
|
||
HostID: hostID, HostName: hostName,
|
||
Message: fmt.Sprintf("Auto-resolved (%s)", kind),
|
||
RaisedAt: when,
|
||
})
|
||
}
|
||
}
|
||
```
|
||
|
||
(Add a small `slogWarn` shim or just import `log/slog` in engine.go and use directly.)
|
||
|
||
- [ ] **Step 2: Fill handleJobFinished / handleHostOffline / handleHostOnline / tick**
|
||
|
||
Append to `internal/alert/engine.go`:
|
||
|
||
```go
|
||
func (e *Engine) handleJobFinished(ctx context.Context, ev JobFinishedEvent) {
|
||
switch ev.Kind {
|
||
case "backup":
|
||
if ev.Status == "failed" {
|
||
e.raiseAndNotify(ctx, ev.HostID, KindBackupFailed, "warning",
|
||
fmt.Sprintf("Backup job %s failed", ev.JobID), ev.When)
|
||
} else if ev.Status == "succeeded" {
|
||
e.resolveAndNotify(ctx, ev.HostID, KindBackupFailed, ev.When)
|
||
}
|
||
case "forget":
|
||
if ev.Status == "failed" {
|
||
e.raiseAndNotify(ctx, ev.HostID, KindForgetFailed, "warning",
|
||
fmt.Sprintf("Forget job %s failed", ev.JobID), ev.When)
|
||
} else if ev.Status == "succeeded" {
|
||
e.resolveAndNotify(ctx, ev.HostID, KindForgetFailed, ev.When)
|
||
}
|
||
case "prune":
|
||
if ev.Status == "failed" {
|
||
e.raiseAndNotify(ctx, ev.HostID, KindPruneFailed, "warning",
|
||
fmt.Sprintf("Prune job %s failed", ev.JobID), ev.When)
|
||
} else if ev.Status == "succeeded" {
|
||
e.resolveAndNotify(ctx, ev.HostID, KindPruneFailed, ev.When)
|
||
}
|
||
case "check":
|
||
if ev.Status == "failed" {
|
||
e.raiseAndNotify(ctx, ev.HostID, KindCheckFailed, "critical",
|
||
fmt.Sprintf("Check job %s failed", ev.JobID), ev.When)
|
||
} else if ev.Status == "succeeded" {
|
||
e.resolveAndNotify(ctx, ev.HostID, KindCheckFailed, ev.When)
|
||
}
|
||
}
|
||
// init / unlock / restore / diff don't trigger alerts in v1.
|
||
}
|
||
|
||
func (e *Engine) handleHostOffline(ctx context.Context, hostID string) {
|
||
host, err := e.store.GetHost(ctx, hostID)
|
||
if err != nil {
|
||
return
|
||
}
|
||
// Apply the 15-min floor — host went offline only "long enough"
|
||
// when last_seen_at is older than the floor.
|
||
if time.Since(host.LastSeenAt) < e.agentOfflineFloor {
|
||
return
|
||
}
|
||
e.raiseAndNotify(ctx, hostID, KindAgentOffline, "warning",
|
||
fmt.Sprintf("Agent offline for %s (threshold %s)",
|
||
roundDur(time.Since(host.LastSeenAt)), e.agentOfflineFloor),
|
||
time.Now().UTC())
|
||
}
|
||
|
||
func (e *Engine) handleHostOnline(ctx context.Context, hostID string) {
|
||
e.resolveAndNotify(ctx, hostID, KindAgentOffline, time.Now().UTC())
|
||
}
|
||
|
||
// tick is the 60s sweep. Two responsibilities:
|
||
// 1. Re-evaluate agent_offline against every offline host (catches
|
||
// hosts that crossed the floor between events).
|
||
// 2. Stale-schedule detection: any schedule whose next-fire was
|
||
// more than 5 minutes ago with no matching job since.
|
||
func (e *Engine) tick(ctx context.Context, now time.Time) {
|
||
hosts, err := e.store.ListHosts(ctx)
|
||
if err != nil {
|
||
slog.Warn("alert: tick list hosts", "err", err)
|
||
return
|
||
}
|
||
for _, h := range hosts {
|
||
if h.Status == "offline" && now.Sub(h.LastSeenAt) >= e.agentOfflineFloor {
|
||
e.raiseAndNotify(ctx, h.ID, KindAgentOffline, "warning",
|
||
fmt.Sprintf("Agent offline for %s (threshold %s)",
|
||
roundDur(now.Sub(h.LastSeenAt)), e.agentOfflineFloor), now)
|
||
}
|
||
}
|
||
// Stale-schedule sweep — left as a future tick body if/when the
|
||
// store grows the helper. For v1 we skip it cleanly: the rule is
|
||
// declared but the trigger is "lands later if anyone asks".
|
||
// (Document this in tasks.md when you tick P3-05.)
|
||
}
|
||
|
||
func roundDur(d time.Duration) string {
|
||
if d < time.Minute {
|
||
return "less than a minute"
|
||
}
|
||
d = d.Round(time.Minute)
|
||
return d.String()
|
||
}
|
||
```
|
||
|
||
> **Note:** the `stale_schedule` rule is declared in the spec but
|
||
> left as a no-op in the v1 ticker — the precise definition of
|
||
> "expected to have fired but didn't" needs a small store helper
|
||
> we can add later. Mention this in the tasks.md tick when you
|
||
> close P3-05.
|
||
|
||
- [ ] **Step 3: Write the failing tests**
|
||
|
||
`internal/alert/rules_test.go`:
|
||
|
||
```go
|
||
package alert
|
||
|
||
import (
|
||
"context"
|
||
"path/filepath"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/oklog/ulid/v2"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
func setupEngine(t *testing.T) (*Engine, *store.Store, string) {
|
||
t.Helper()
|
||
dir := t.TempDir()
|
||
st, _ := store.Open(context.Background(), filepath.Join(dir, "rm.db"))
|
||
t.Cleanup(func() { _ = st.Close() })
|
||
keyPath := filepath.Join(dir, "secret.key")
|
||
_ = crypto.GenerateKeyFile(keyPath)
|
||
key, _ := crypto.LoadKeyFromFile(keyPath)
|
||
aead, _ := crypto.NewAEAD(key)
|
||
hub := notification.NewHub(st, aead, "https://rm.example")
|
||
eng := NewEngine(st, hub)
|
||
hostID := ulid.Make().String()
|
||
if err := st.CreateHost(context.Background(), store.Host{
|
||
ID: hostID, Name: "alfa-01", OS: "linux", Arch: "amd64",
|
||
EnrolledAt: time.Now().UTC(),
|
||
}, "deadbeef", ""); err != nil {
|
||
t.Fatalf("create host: %v", err)
|
||
}
|
||
return eng, st, hostID
|
||
}
|
||
|
||
func TestEngineBackupFailedRaisesThenResolves(t *testing.T) {
|
||
t.Parallel()
|
||
eng, st, hostID := setupEngine(t)
|
||
ctx := context.Background()
|
||
|
||
eng.handleJobFinished(ctx, JobFinishedEvent{
|
||
HostID: hostID, JobID: "j1", Kind: "backup", Status: "failed",
|
||
When: time.Now().UTC(),
|
||
})
|
||
open, _ := st.ListAlerts(ctx, store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 1 || open[0].Kind != KindBackupFailed {
|
||
t.Fatalf("expected one backup_failed open; got %+v", open)
|
||
}
|
||
|
||
// Second failed job should TOUCH (not raise a fresh row).
|
||
eng.handleJobFinished(ctx, JobFinishedEvent{
|
||
HostID: hostID, JobID: "j2", Kind: "backup", Status: "failed",
|
||
When: time.Now().UTC().Add(time.Minute),
|
||
})
|
||
open, _ = st.ListAlerts(ctx, store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 1 {
|
||
t.Fatalf("expected dedup to stay at 1 open; got %d", len(open))
|
||
}
|
||
|
||
// Success auto-resolves.
|
||
eng.handleJobFinished(ctx, JobFinishedEvent{
|
||
HostID: hostID, JobID: "j3", Kind: "backup", Status: "succeeded",
|
||
When: time.Now().UTC().Add(2 * time.Minute),
|
||
})
|
||
open, _ = st.ListAlerts(ctx, store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 0 {
|
||
t.Fatalf("expected zero open after success; got %d", len(open))
|
||
}
|
||
}
|
||
|
||
func TestEngineCheckFailedSeverityCritical(t *testing.T) {
|
||
t.Parallel()
|
||
eng, st, hostID := setupEngine(t)
|
||
eng.handleJobFinished(context.Background(), JobFinishedEvent{
|
||
HostID: hostID, Kind: "check", Status: "failed", When: time.Now().UTC(),
|
||
})
|
||
open, _ := st.ListAlerts(context.Background(),
|
||
store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 1 || open[0].Severity != "critical" {
|
||
t.Fatalf("got %+v", open)
|
||
}
|
||
}
|
||
|
||
func TestEngineAgentOfflineRespects15MinFloor(t *testing.T) {
|
||
t.Parallel()
|
||
eng, st, hostID := setupEngine(t)
|
||
// Host's last_seen_at defaulted to ~now via CreateHost. Force a
|
||
// stale value for the test by direct DB update.
|
||
if _, err := st.DB().Exec(
|
||
`UPDATE hosts SET last_seen_at = ? WHERE id = ?`,
|
||
time.Now().UTC().Add(-20*time.Minute).Format(time.RFC3339Nano), hostID,
|
||
); err != nil {
|
||
t.Fatalf("update last_seen_at: %v", err)
|
||
}
|
||
eng.handleHostOffline(context.Background(), hostID)
|
||
open, _ := st.ListAlerts(context.Background(),
|
||
store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 1 {
|
||
t.Fatalf("expected agent_offline raised; got %d", len(open))
|
||
}
|
||
|
||
// Bring back online — should auto-resolve.
|
||
eng.handleHostOnline(context.Background(), hostID)
|
||
open, _ = st.ListAlerts(context.Background(),
|
||
store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 0 {
|
||
t.Fatalf("expected agent_offline resolved; got %d", len(open))
|
||
}
|
||
}
|
||
|
||
func TestEngineAgentOfflineUnderFloorNoRaise(t *testing.T) {
|
||
t.Parallel()
|
||
eng, st, hostID := setupEngine(t)
|
||
// last_seen_at defaulted to "now" by CreateHost, so the floor
|
||
// hasn't elapsed. handleHostOffline must skip the raise.
|
||
eng.handleHostOffline(context.Background(), hostID)
|
||
open, _ := st.ListAlerts(context.Background(),
|
||
store.AlertFilter{Status: "open", HostID: hostID})
|
||
if len(open) != 0 {
|
||
t.Fatalf("expected no raise within 15-min floor; got %d", len(open))
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Run tests**
|
||
|
||
```sh
|
||
go test ./internal/alert/ -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```sh
|
||
git add internal/alert/engine.go internal/alert/rules.go internal/alert/rules_test.go
|
||
git commit -m "alert: rule logic for the six v1 rules"
|
||
```
|
||
|
||
---
|
||
|
||
### Task C3: Wire the engine into MarkJobFinished + ws hello + offline sweep
|
||
|
||
**Files:**
|
||
- Modify: `internal/server/ws/handler.go` (MarkJobFinished call site)
|
||
- Modify: `internal/server/ws/handler.go` (hello path)
|
||
- Modify: `cmd/server/main.go` (offline sweeper)
|
||
|
||
The engine has zero impact unless wired. Three call sites:
|
||
|
||
- [ ] **Step 1: Add Engine to ws.HandlerDeps**
|
||
|
||
`internal/server/ws/handler.go` — extend `HandlerDeps`:
|
||
|
||
```go
|
||
type HandlerDeps struct {
|
||
Hub *Hub
|
||
Store *store.Store
|
||
JobHub *JobHub
|
||
// NEW:
|
||
AlertEngine AlertNotifier // interface so ws doesn't import alert
|
||
// (existing fields…)
|
||
}
|
||
|
||
// AlertNotifier is the slice of alert.Engine ws needs. Lives here so
|
||
// the ws package doesn't import the alert package (avoids a cycle if
|
||
// alert ever needs ws types).
|
||
type AlertNotifier interface {
|
||
NotifyJobFinished(alert.JobFinishedEvent) // dispatched after MarkJobFinished
|
||
NotifyHostOnline(hostID string)
|
||
}
|
||
```
|
||
|
||
> **Cycle warning:** the type signature there imports
|
||
> `internal/alert.JobFinishedEvent`. If that creates a cycle, define
|
||
> a local `JobFinishedEvent` in `internal/server/ws` and convert at
|
||
> the wire-up site in `cmd/server/main.go`. Verify with
|
||
> `go build ./...` after the edit.
|
||
|
||
- [ ] **Step 2: Hook MarkJobFinished**
|
||
|
||
In `dispatchAgentMessage`'s `case api.MsgJobFinished` block, after the
|
||
existing `MarkJobFinished` call + JobHub broadcast:
|
||
|
||
```go
|
||
if deps.AlertEngine != nil {
|
||
deps.AlertEngine.NotifyJobFinished(alert.JobFinishedEvent{
|
||
HostID: hostID, JobID: p.JobID,
|
||
Kind: string(/* lookup the job's kind */),
|
||
Status: string(p.Status),
|
||
When: p.FinishedAt,
|
||
})
|
||
}
|
||
```
|
||
|
||
> **Subtlety:** the WS `JobFinishedPayload` doesn't carry the kind —
|
||
> the agent dispatched against a stored job, the kind is only in the
|
||
> DB. Fetch via `deps.Store.GetJob(ctx, p.JobID)` and use
|
||
> `job.Kind`. Cache lookups not necessary for v1 traffic.
|
||
|
||
- [ ] **Step 3: Hook the hello path for HostOnline**
|
||
|
||
In `runAgentLoop`, after `MarkHostHello` succeeds:
|
||
|
||
```go
|
||
if deps.AlertEngine != nil {
|
||
deps.AlertEngine.NotifyHostOnline(hostID)
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Hook the offline sweeper**
|
||
|
||
In `cmd/server/main.go` the `offlineTick` case currently calls
|
||
`MarkHostsOfflineStale` and logs the count. Replace with a version
|
||
that also notifies the engine for each newly-marked host:
|
||
|
||
```go
|
||
case <-offlineTick.C:
|
||
cutoff := time.Now().Add(-90 * time.Second)
|
||
ids, err := st.MarkHostsOfflineStaleReturnIDs(ctx, cutoff)
|
||
if err == nil && len(ids) > 0 {
|
||
slog.Info("marked hosts offline (stale heartbeat)", "n", len(ids))
|
||
for _, id := range ids {
|
||
engine.NotifyHostOffline(id)
|
||
}
|
||
}
|
||
```
|
||
|
||
`MarkHostsOfflineStaleReturnIDs` is a small new variant of the
|
||
existing `MarkHostsOfflineStale` that returns the list of host IDs
|
||
flipped. Add it in `internal/store/hosts.go`; trivial — wrap the
|
||
existing UPDATE with a preceding SELECT.
|
||
|
||
- [ ] **Step 5: Build to verify the wiring compiles**
|
||
|
||
```sh
|
||
go build ./...
|
||
```
|
||
Expected: clean. If you hit an import cycle, push the
|
||
`AlertNotifier` interface trick all the way through.
|
||
|
||
- [ ] **Step 6: Existing tests still pass**
|
||
|
||
```sh
|
||
go test ./internal/server/ws/ ./internal/server/http/ -count=1 -timeout=120s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 7: Commit**
|
||
|
||
```sh
|
||
git add internal/server/ws/handler.go cmd/server/main.go internal/store/hosts.go
|
||
git commit -m "alert: wire engine into ws hello + MarkJobFinished + offline sweep"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice D — HTTP routes for /alerts page
|
||
|
||
### Task D1: GET /alerts list page + JSON variant
|
||
|
||
**Files:**
|
||
- Create: `internal/server/http/ui_alerts.go`
|
||
- Test: `internal/server/http/ui_alerts_test.go`
|
||
- Modify: `internal/server/http/server.go` (route)
|
||
|
||
- [ ] **Step 1: Page model + handler**
|
||
|
||
`internal/server/http/ui_alerts.go`:
|
||
|
||
```go
|
||
package http
|
||
|
||
import (
|
||
"encoding/json"
|
||
"log/slog"
|
||
stdhttp "net/http"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/oklog/ulid/v2"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
type alertsPage struct {
|
||
Filter store.AlertFilter
|
||
Alerts []store.Alert
|
||
Counts alertCounts
|
||
HostNames map[string]string // host_id → name for table rendering
|
||
}
|
||
|
||
type alertCounts struct {
|
||
Open int
|
||
Acknowledged int
|
||
Resolved24h int
|
||
}
|
||
|
||
// handleUIAlerts renders the alerts page with the chosen filters.
|
||
func (s *Server) handleUIAlerts(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||
u := s.requireUIUser(w, r)
|
||
if u == nil {
|
||
return
|
||
}
|
||
q := r.URL.Query()
|
||
f := store.AlertFilter{
|
||
Status: q.Get("status"),
|
||
Severity: q.Get("severity"),
|
||
HostID: q.Get("host_id"),
|
||
Search: strings.TrimSpace(q.Get("q")),
|
||
Limit: 200,
|
||
}
|
||
if f.Status == "" {
|
||
f.Status = "open"
|
||
}
|
||
|
||
alerts, err := s.deps.Store.ListAlerts(r.Context(), f)
|
||
if err != nil {
|
||
slog.Error("ui alerts: list", "err", err)
|
||
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
|
||
return
|
||
}
|
||
|
||
page := alertsPage{Filter: f, Alerts: alerts, HostNames: map[string]string{}}
|
||
if hosts, err := s.deps.Store.ListHosts(r.Context()); err == nil {
|
||
for _, h := range hosts {
|
||
page.HostNames[h.ID] = h.Name
|
||
}
|
||
}
|
||
page.Counts = computeAlertCounts(s, r)
|
||
|
||
view := s.baseView(u)
|
||
view.Title = "Alerts · restic-manager"
|
||
view.Active = "alerts"
|
||
view.Page = page
|
||
if err := s.deps.UI.Render(w, "alerts", view); err != nil {
|
||
slog.Error("ui alerts: render", "err", err)
|
||
}
|
||
}
|
||
|
||
func computeAlertCounts(s *Server, r *stdhttp.Request) alertCounts {
|
||
open, _ := s.deps.Store.ListAlerts(r.Context(),
|
||
store.AlertFilter{Status: "open"})
|
||
acked, _ := s.deps.Store.ListAlerts(r.Context(),
|
||
store.AlertFilter{Status: "acknowledged"})
|
||
cutoff := time.Now().UTC().Add(-24 * time.Hour)
|
||
all, _ := s.deps.Store.ListAlerts(r.Context(),
|
||
store.AlertFilter{Status: "resolved"})
|
||
res := 0
|
||
for _, a := range all {
|
||
if a.ResolvedAt != nil && a.ResolvedAt.After(cutoff) {
|
||
res++
|
||
}
|
||
}
|
||
return alertCounts{Open: len(open), Acknowledged: len(acked), Resolved24h: res}
|
||
}
|
||
|
||
// handleAPIAlerts is the JSON list — same filter shape.
|
||
func (s *Server) handleAPIAlerts(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||
if _, ok := s.requireUser(r); !ok {
|
||
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorised", "")
|
||
return
|
||
}
|
||
q := r.URL.Query()
|
||
f := store.AlertFilter{
|
||
Status: q.Get("status"),
|
||
Severity: q.Get("severity"),
|
||
HostID: q.Get("host_id"),
|
||
Search: strings.TrimSpace(q.Get("q")),
|
||
Limit: 200,
|
||
}
|
||
alerts, err := s.deps.Store.ListAlerts(r.Context(), f)
|
||
if err != nil {
|
||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(alerts)
|
||
}
|
||
|
||
// handleUIAlertAcknowledge is POST /alerts/{id}/acknowledge.
|
||
func (s *Server) handleUIAlertAcknowledge(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||
u := s.requireUIUser(w, r)
|
||
if u == nil {
|
||
return
|
||
}
|
||
id := chi.URLParam(r, "id")
|
||
if id == "" {
|
||
stdhttp.Error(w, "missing id", stdhttp.StatusBadRequest)
|
||
return
|
||
}
|
||
if err := s.deps.Store.Acknowledge(r.Context(), id, u.ID, time.Now().UTC()); err != nil {
|
||
slog.Warn("ui alerts: ack", "err", err)
|
||
}
|
||
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
|
||
ID: ulid.Make().String(), UserID: &u.ID, Actor: "user",
|
||
Action: "alert.acknowledge",
|
||
TargetKind: ptr("alert"), TargetID: &id,
|
||
TS: time.Now().UTC(),
|
||
})
|
||
if r.Header.Get("HX-Request") == "true" {
|
||
w.Header().Set("HX-Redirect", "/alerts?"+r.URL.RawQuery)
|
||
w.WriteHeader(stdhttp.StatusNoContent)
|
||
return
|
||
}
|
||
stdhttp.Redirect(w, r, "/alerts", stdhttp.StatusSeeOther)
|
||
}
|
||
|
||
// handleUIAlertResolve is POST /alerts/{id}/resolve.
|
||
func (s *Server) handleUIAlertResolve(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||
u := s.requireUIUser(w, r)
|
||
if u == nil {
|
||
return
|
||
}
|
||
id := chi.URLParam(r, "id")
|
||
if id == "" {
|
||
stdhttp.Error(w, "missing id", stdhttp.StatusBadRequest)
|
||
return
|
||
}
|
||
if err := s.deps.Store.Resolve(r.Context(), id, time.Now().UTC()); err != nil {
|
||
slog.Warn("ui alerts: resolve", "err", err)
|
||
}
|
||
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
|
||
ID: ulid.Make().String(), UserID: &u.ID, Actor: "user",
|
||
Action: "alert.resolve",
|
||
TargetKind: ptr("alert"), TargetID: &id,
|
||
TS: time.Now().UTC(),
|
||
})
|
||
if r.Header.Get("HX-Request") == "true" {
|
||
w.Header().Set("HX-Redirect", "/alerts?"+r.URL.RawQuery)
|
||
w.WriteHeader(stdhttp.StatusNoContent)
|
||
return
|
||
}
|
||
stdhttp.Redirect(w, r, "/alerts", stdhttp.StatusSeeOther)
|
||
}
|
||
```
|
||
|
||
(Imports include `github.com/go-chi/chi/v5`.)
|
||
|
||
- [ ] **Step 2: Wire routes**
|
||
|
||
In `internal/server/http/server.go`, inside the `if s.deps.UI != nil` block:
|
||
|
||
```go
|
||
r.Get("/alerts", s.handleUIAlerts)
|
||
r.Post("/alerts/{id}/acknowledge", s.handleUIAlertAcknowledge)
|
||
r.Post("/alerts/{id}/resolve", s.handleUIAlertResolve)
|
||
```
|
||
|
||
And inside `r.Route("/api", ...)`:
|
||
|
||
```go
|
||
r.Get("/alerts", s.handleAPIAlerts)
|
||
```
|
||
|
||
- [ ] **Step 3: Build to verify**
|
||
|
||
```sh
|
||
go build ./...
|
||
```
|
||
Expected: clean. (Template doesn't exist yet → handler will fail at
|
||
runtime, but build succeeds.)
|
||
|
||
- [ ] **Step 4: Test (the page handler can't render without templates yet — write the test that drives it once templates land in slice F)**
|
||
|
||
For now skip the rendering test; cover the JSON handler:
|
||
|
||
`internal/server/http/ui_alerts_test.go`:
|
||
|
||
```go
|
||
package http
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
stdhttp "net/http"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/oklog/ulid/v2"
|
||
|
||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||
)
|
||
|
||
func TestAPIAlertsListsOpen(t *testing.T) {
|
||
t.Parallel()
|
||
srv, ts, st := rawTestServer(t)
|
||
hostID, _ := enrolHostForWS(t, srv, st, "host-alerts")
|
||
_, _, _ = st.RaiseOrTouch(context.Background(), hostID,
|
||
"backup_failed", "warning", "x", time.Now().UTC())
|
||
cookie := loginAsAdmin(t, st)
|
||
|
||
req, _ := stdhttp.NewRequest("GET", ts.URL+"/api/alerts?status=open", nil)
|
||
req.AddCookie(cookie)
|
||
res, err := stdhttp.DefaultClient.Do(req)
|
||
if err != nil {
|
||
t.Fatalf("do: %v", err)
|
||
}
|
||
defer res.Body.Close()
|
||
if res.StatusCode != 200 {
|
||
t.Fatalf("status: %d", res.StatusCode)
|
||
}
|
||
var got []store.Alert
|
||
if err := json.NewDecoder(res.Body).Decode(&got); err != nil {
|
||
t.Fatalf("decode: %v", err)
|
||
}
|
||
if len(got) != 1 || got[0].Kind != "backup_failed" {
|
||
t.Fatalf("got %+v", got)
|
||
}
|
||
_ = ulid.Make() // import keep
|
||
}
|
||
```
|
||
|
||
```sh
|
||
go test ./internal/server/http/ -run TestAPIAlertsListsOpen -count=1 -timeout=30s
|
||
```
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```sh
|
||
git add internal/server/http/ui_alerts.go internal/server/http/ui_alerts_test.go internal/server/http/server.go
|
||
git commit -m "http: /alerts list + ack/resolve handlers + /api/alerts JSON"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice E — HTTP routes for /settings/notifications
|
||
|
||
### Task E1: Channel CRUD handlers
|
||
|
||
**Files:**
|
||
- Create: `internal/server/http/ui_notifications.go`
|
||
- Test: `internal/server/http/ui_notifications_test.go`
|
||
- Modify: `internal/server/http/server.go`
|
||
|
||
- [ ] **Step 1: CRUD handlers**
|
||
|
||
`internal/server/http/ui_notifications.go` — too long to inline here in
|
||
full. Mirror the shape of `ui_repo.go` (see existing). Required handlers:
|
||
|
||
- `handleUISettings(w, r)` — render `settings` shell with the
|
||
Notifications sub-tab as the body. Pre-fetches channel list.
|
||
- `handleUINotificationsList(w, r)` — same as above; the page is
|
||
the same template, rendered with the Notifications sub-tab active.
|
||
- `handleUINotificationNewGet / Post` — render the kind picker +
|
||
empty form for the chosen kind; POST validates + AEAD-encrypts the
|
||
config blob via `s.deps.AEAD.Seal(rawJSON, []byte("notification-channel:"+id))`,
|
||
inserts via `Store.CreateNotificationChannel`, redirects.
|
||
- `handleUINotificationEditGet / Post` — pre-decrypts existing
|
||
config, renders the form with the operator's prior values
|
||
(passwords show "•••• stored, leave blank to keep" placeholder),
|
||
POST merges + re-encrypts.
|
||
- `handleUINotificationDelete` — typed-confirm name pattern (mirror
|
||
`ui_repo_reinit.go`) — operator types the channel name to confirm.
|
||
- `handleAPINotificationTest` — `POST /api/notifications/{id}/test`
|
||
builds a synthetic info-severity Payload + calls
|
||
`s.deps.NotificationHub.DispatchOne`, returns the resulting log
|
||
entry as JSON.
|
||
|
||
Each kind's form parsing produces the per-kind config struct from
|
||
`internal/notification` (`WebhookConfig`, `NtfyConfig`, `SMTPConfig`),
|
||
JSON-marshals it, and feeds into `aead.Seal`. Validation:
|
||
|
||
- name non-empty + ≤100 chars
|
||
- kind ∈ {webhook, ntfy, smtp}
|
||
- webhook: URL parses; if scheme is http/https
|
||
- ntfy: server_url parses; topic non-empty
|
||
- smtp: host non-empty; port 1..65535; encryption ∈ {starttls, tls, none};
|
||
to + from look like RFC 5322 addresses (use `mail.ParseAddress`)
|
||
|
||
On any validation failure, re-render the form with the operator's
|
||
input intact + an error banner (mirror P2-04's pattern).
|
||
|
||
- [ ] **Step 2: Add routes**
|
||
|
||
In `server.go`'s `if s.deps.UI != nil` block:
|
||
|
||
```go
|
||
r.Get("/settings", s.handleUISettings)
|
||
r.Get("/settings/notifications", s.handleUINotificationsList)
|
||
r.Get("/settings/notifications/new", s.handleUINotificationNewGet)
|
||
r.Post("/settings/notifications/new", s.handleUINotificationNewPost)
|
||
r.Get("/settings/notifications/{id}/edit", s.handleUINotificationEditGet)
|
||
r.Post("/settings/notifications/{id}/edit", s.handleUINotificationEditPost)
|
||
r.Post("/settings/notifications/{id}/delete", s.handleUINotificationDelete)
|
||
```
|
||
|
||
And inside `r.Route("/api", ...)`:
|
||
|
||
```go
|
||
r.Post("/notifications/{id}/test", s.handleAPINotificationTest)
|
||
```
|
||
|
||
- [ ] **Step 3: Test the test-notification path end-to-end**
|
||
|
||
Mirror P3-X1's `cancel_test.go` shape: spin up a httptest server as
|
||
the webhook target, configure a channel, POST to the test endpoint,
|
||
assert the synthetic event landed at the sink + a `notification_log`
|
||
row with `event=alert.test, ok=1`.
|
||
|
||
- [ ] **Step 4: Run tests + build**
|
||
|
||
```sh
|
||
go build ./...
|
||
go test ./internal/server/http/ -count=1 -timeout=60s
|
||
```
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```sh
|
||
git add internal/server/http/ui_notifications.go internal/server/http/ui_notifications_test.go internal/server/http/server.go
|
||
git commit -m "http: /settings/notifications CRUD + test endpoint"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice F — UI templates
|
||
|
||
### Task F1: alerts.html + alert_row.html partial + nav badge
|
||
|
||
**Files:**
|
||
- Create: `web/templates/pages/alerts.html`
|
||
- Create: `web/templates/partials/alert_row.html`
|
||
- Modify: `web/templates/partials/nav.html`
|
||
- Modify: `internal/server/ui/ui.go` (add to commonPaths)
|
||
|
||
- [ ] **Step 1: Templates from the wireframe**
|
||
|
||
Translate `_diag/p3-alerts-wireframe/wireframe.html` surface 1 into
|
||
real Go templates. The shape should match exactly: filter strip
|
||
(status / severity / host / search), alert-row grid with severity
|
||
border, dot, kind tag, host name, message, raised + last_seen,
|
||
ack/resolve actions, plus the empty state.
|
||
|
||
Notes:
|
||
- Use the existing `relTime` template func.
|
||
- Render "still happening · Ns ago" when `last_seen_at` is < 60s ago.
|
||
- Form action for ack/resolve: `<form method="post"
|
||
action="/alerts/{{.ID}}/acknowledge?{{.PreservedQuery}}"
|
||
hx-post="..." hx-swap="none">` so HTMX bounces back via
|
||
HX-Redirect to the same filtered list.
|
||
|
||
- [ ] **Step 2: nav.html badge**
|
||
|
||
Add to nav.html: `{{if gt .OpenAlerts 0}}<span class="tag tag-critical mono">{{.OpenAlerts}}</span>{{end}}`
|
||
inside the Alerts tab. Wire `view.OpenAlerts` from a quick `len(open)`
|
||
query in `s.baseView`.
|
||
|
||
- [ ] **Step 3: Commit**
|
||
|
||
```sh
|
||
git add web/templates/pages/alerts.html web/templates/partials/alert_row.html web/templates/partials/nav.html internal/server/ui/ui.go internal/server/http/ui_handlers.go
|
||
git commit -m "ui: alerts list page + alert row partial + nav badge"
|
||
```
|
||
|
||
---
|
||
|
||
### Task F2: settings.html + notifications.html + notification_edit.html
|
||
|
||
**Files:**
|
||
- Create: `web/templates/pages/settings.html`
|
||
- Create: `web/templates/pages/notifications.html`
|
||
- Create: `web/templates/pages/notification_edit.html`
|
||
- Modify: `internal/server/ui/ui.go` (add the three pages)
|
||
|
||
- [ ] **Step 1: Settings shell**
|
||
|
||
`settings.html` is the page; it renders the sub-tab nav (Notifications
|
||
| Users | Authentication) and slots in the body. For v1 only
|
||
Notifications is wired; the other two render an inline "Lands later"
|
||
notice.
|
||
|
||
- [ ] **Step 2: Notifications list + edit form**
|
||
|
||
Translate wireframe surfaces 2, 3, 3b, 3c into real templates. Edit
|
||
form needs both kind variants visible — render the picker with the
|
||
operator's selected kind highlighted, and show only the matching
|
||
field set below (use `{{if eq .Channel.Kind "webhook"}}…{{end}}`).
|
||
|
||
Right-rail payload preview is per-kind: webhook envelope JSON for
|
||
webhook, ntfy header shape for ntfy, RFC 5322 layout for smtp.
|
||
|
||
- [ ] **Step 3: Send-test feedback**
|
||
|
||
The "Send test notification" button should be an HTMX POST that
|
||
swaps a small result chip (`#test-result`) with the green ✓ /
|
||
red ✗ pill rendered server-side from the
|
||
`handleAPINotificationTest` JSON. Easiest: wrap in a
|
||
`<div id="test-result" hx-target="this" hx-swap="innerHTML">`
|
||
and have the test handler render a tiny inline partial.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add web/templates/pages/settings.html web/templates/pages/notifications.html web/templates/pages/notification_edit.html internal/server/ui/ui.go
|
||
git commit -m "ui: /settings/notifications list + edit form (3 kinds)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task F3: Crit banner partial + dashboard wiring
|
||
|
||
**Files:**
|
||
- Create: `web/templates/partials/crit_banner.html`
|
||
- Modify: `web/templates/pages/dashboard.html`
|
||
- Modify: `internal/server/http/ui_handlers.go` (handleUIDashboard adds CritCount)
|
||
|
||
- [ ] **Step 1: Banner partial**
|
||
|
||
```html
|
||
{{define "crit_banner"}}
|
||
{{if gt .CritOpenCount 0}}
|
||
<div class="crit-banner" style="…">
|
||
<div class="flex items-center gap-3">
|
||
<span class="dot dot-critical"></span>
|
||
<span><span class="text-bad font-medium">{{.CritOpenCount}} critical alert{{if ne .CritOpenCount 1}}s{{end}}</span> open across the fleet</span>
|
||
</div>
|
||
<a href="/alerts?severity=critical&status=open" class="btn btn-danger">Review →</a>
|
||
</div>
|
||
{{end}}
|
||
{{end}}
|
||
```
|
||
|
||
- [ ] **Step 2: Dashboard handler**
|
||
|
||
In `handleUIDashboard`, fetch the count + render at top of the
|
||
dashboard page. Mirror the existing pattern.
|
||
|
||
- [ ] **Step 3: Add `crit_banner.html` to commonPaths**
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add web/templates/partials/crit_banner.html web/templates/pages/dashboard.html internal/server/http/ui_handlers.go internal/server/ui/ui.go
|
||
git commit -m "ui: dashboard crit-alerts banner"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice G — Wire engine + hub into cmd/server
|
||
|
||
### Task G1: Construct + start engine; expose to handlers
|
||
|
||
**Files:**
|
||
- Modify: `cmd/server/main.go`
|
||
- Modify: `internal/server/http/server.go` (`Deps` gains `AlertEngine` + `NotificationHub`)
|
||
- Modify: `internal/server/ws/handler.go` (use `deps.AlertEngine`)
|
||
|
||
- [ ] **Step 1: Boot wiring**
|
||
|
||
In `cmd/server/main.go`, after creating the AEAD + store + Hub:
|
||
|
||
```go
|
||
notifHub := notification.NewHub(st, aead, cfg.BaseURL)
|
||
engine := alert.NewEngine(st, notifHub)
|
||
// Run the engine until ctx is done.
|
||
go engine.Run(ctx)
|
||
```
|
||
|
||
Pass `engine` and `notifHub` into the HTTP `Deps` struct + ws
|
||
`HandlerDeps`. The notification.Hub and engine satisfy whatever
|
||
interfaces the slices below depend on.
|
||
|
||
- [ ] **Step 2: Build to verify wiring compiles**
|
||
|
||
```sh
|
||
go build ./...
|
||
```
|
||
|
||
- [ ] **Step 3: Integration smoke**
|
||
|
||
Run an existing test that exercises the WS layer, and confirm a
|
||
`backup_failed` alert lands in the DB after `MarkJobFinished` is
|
||
called from the dispatcher. New test: extend
|
||
`internal/server/http/ui_alerts_test.go` to drive a job-failed event
|
||
through the WS round-trip and assert the alert exists.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```sh
|
||
git add cmd/server/main.go internal/server/http/server.go internal/server/ws/handler.go
|
||
git commit -m "alert: construct + run engine; expose hub to handlers"
|
||
```
|
||
|
||
---
|
||
|
||
## Slice H — Playwright sweep + tasks.md tick
|
||
|
||
### Task H1: Live sweep against the smoke env
|
||
|
||
**Files:**
|
||
- `_diag/p3-alerts-sweep/` — screenshots dropped here.
|
||
|
||
- [ ] **Step 1: Restage the binaries (per CLAUDE.md)**
|
||
|
||
```sh
|
||
make build
|
||
cp bin/restic-manager-agent /tmp/rm-smoke/data/agent-binaries/restic-manager-agent-linux-amd64
|
||
sudo -n install -m 0755 bin/restic-manager-agent /usr/local/bin/restic-manager-agent
|
||
sudo -n systemctl restart restic-manager-agent
|
||
pkill -9 -f restic-manager-server
|
||
RM_LISTEN=:8080 RM_DATA_DIR=/tmp/rm-smoke/data RM_BASE_URL=http://127.0.0.1:8080 \
|
||
RM_SECRET_KEY_FILE=/tmp/rm-smoke/data/secret.key RM_COOKIE_SECURE=false \
|
||
./bin/restic-manager-server >> /tmp/rm-smoke/server.log 2>&1 &
|
||
```
|
||
|
||
- [ ] **Step 2: Walk the sweep**
|
||
|
||
Run the eleven-step Playwright sweep documented in the spec under
|
||
"Playwright sweep". Drop screenshots into `_diag/p3-alerts-sweep/`.
|
||
Local MailHog (Docker, ports 1025+8025) covers the SMTP step.
|
||
|
||
- [ ] **Step 3: Fix anything that breaks**
|
||
|
||
Common things to look for, mirroring the P3-restore sweep:
|
||
|
||
- CSS tokens not defined in `web/styles/input.css` (e.g. anything
|
||
used in the templates that wasn't in the wireframe → add).
|
||
- Form-state preservation on validation re-render (operator typed
|
||
values lost).
|
||
- AEAD seal/open key mismatch on edit (use
|
||
`[]byte("notification-channel:"+id)` consistently).
|
||
|
||
- [ ] **Step 4: Commit fixes as you find them**
|
||
|
||
Small commits per category: "ui: …", "fix: …", etc.
|
||
|
||
---
|
||
|
||
### Task H2: tasks.md tick + final commit
|
||
|
||
**Files:**
|
||
- Modify: `tasks.md`
|
||
|
||
- [ ] **Step 1: Mark P3-05/06/07 done**
|
||
|
||
In the "Phase 3 — Alerts" section of `tasks.md`, tick the three
|
||
checkboxes and add an as-shipped block matching the P3-restore
|
||
pattern (rule set, channels, scope decisions, link to spec +
|
||
sweep screenshots).
|
||
|
||
- [ ] **Step 2: Move "Phase 3 — Alerts" status from `(not started)` to ✅**
|
||
|
||
- [ ] **Step 3: Commit**
|
||
|
||
```sh
|
||
git add tasks.md
|
||
git commit -m "tasks: tick P3-05/06/07 (alerts sub-phase)"
|
||
```
|
||
|
||
---
|
||
|
||
## Self-Review
|
||
|
||
**1. Spec coverage check.** Walked the spec section-by-section:
|
||
|
||
- Decisions 1–10 mapped to tasks (engine cadence in C1+C2, dedup in
|
||
A3, notification shape in B2/B3/B4, channel scope = global covered
|
||
by the channel-list page rendering all channels regardless of host).
|
||
- Six rules each have a case in `handleJobFinished` (4 of them) +
|
||
`handleHostOffline`/`handleHostOnline` (1) + a tick branch (1
|
||
declared, no-op for v1, called out).
|
||
- Three v1 channels each have their own task (B2/B3/B4) + Hub
|
||
fan-out (B5).
|
||
- Two migrations ship in A1 + A2.
|
||
- All routes from the spec's "Routes added" table are wired (D1,
|
||
E1, F1).
|
||
- Webhook payload shape matches the spec exactly.
|
||
- SMTP body assembly matches the spec exactly (subject pattern,
|
||
Message-ID right-hand-side, plain-text body shape).
|
||
|
||
**2. Placeholder scan.** No "TBD" / "TODO" / "implement later"
|
||
in any task body. The stale-schedule sweep is intentionally a no-op
|
||
in v1 with a documented reason (the spec acknowledges this rule
|
||
needs a small store helper that's not blocking the rest); the tick
|
||
function still lists it explicitly.
|
||
|
||
**3. Type consistency.** Method names checked across slices:
|
||
`RaiseOrTouch` (A3) is called from `raiseAndNotify` (C2);
|
||
`AutoResolve` (A3) from `resolveAndNotify` (C2); `ListAlerts`
|
||
+ `AlertFilter` shape consistent A3 ↔ D1; `notification.Hub.Dispatch`
|
||
+ `DispatchOne` consistent B5 ↔ C2 ↔ E1.
|
||
|
||
**Plan complete.**
|
||
|
||
---
|
||
|
||
Plan complete and saved to `docs/superpowers/plans/2026-05-04-p3-alerts.md`. Two execution options:
|
||
|
||
**1. Subagent-Driven (recommended)** — I dispatch a fresh subagent per task, review between tasks, fast iteration
|
||
|
||
**2. Inline Execution** — Execute tasks in this session using executing-plans, batch execution with checkpoints
|
||
|
||
Which approach?
|