ee3ee241ea
Cohesive batch from a smoke-test session against a real rest-server.
Themed bullets:
* Agent runs as root, sandboxed via systemd. CapabilityBoundingSet
drops to CAP_DAC_READ_SEARCH + restore caps; ProtectSystem=strict
with ReadWritePaths confined to /etc + /var/lib/restic-manager;
NoNewPrivileges blocks escalation. Install script no longer
creates a service user. spec.md §4.2 / §14.1 / §14.3 explain the
rationale (matches UrBackup / Veeam / Bareos defaults; trying to
back up "everything" as an unprivileged user creates silent skips
on /home, /root, /var/lib/* with no upside vs the threat model
the agent already implies).
* Init-repo end-to-end. New JobKind="init" wired through agent
runner, restic.Env.RunInit, server dispatcher, and a UI button
(red "Initialise repo" in the run-now panel). hosts.repo_initialised_at
flips on init success, on backup success, or on a non-empty
snapshots.report. The "Run now" / "Init" / "Retry" branching now
drives both the dashboard host row and the host-detail panel.
Migrations 0004 (column), 0005 (jobs.kind CHECK widened — using
the safe create-new-then-rename pattern; first version corrupted
job_logs.job_id FK), 0006 (cleans up job_logs FK on already-
affected DBs).
* rest-server creds embedded at exec time only. restic.Env gains
RepoUsername; mergeRestCreds() builds the user:pass@-prefixed URL
inside envSlice() and never assigns it back to the struct, so
nothing slog-able ever sees the cleartext form. RedactURL helper
for any future surface that needs to log a URL safely. Both
helpers tested.
* Add-host UX. Repo password is now optional — server mints a
24-byte URL-safe random one and surfaces it once, alongside an
htpasswd snippet ("echo PASS | htpasswd -B -i ... USERNAME") so
the operator pastes one command on the rest-server host and one
on the endpoint. Result page also links the install snippet at
/install/install.sh (was /install.sh — 404'd before) and pipes
to bash (not sh — script uses set -o pipefail and other
bashisms; on Debian/Ubuntu sh is dash).
* Late-subscriber race in JobHub. A fast-failing job could finish
(DB write + Broadcast) before the browser's HX-Redirect → page
load → WS-connect path completed, so the JS sat forever waiting
on a job.finished that already passed. JobHub split into
Register + Send + Run; handleJobStream now subscribes first,
re-fetches the job, and sends a synthetic job.finished if the
state is already terminal.
* HTMX error visibility. New toast partial listens to
htmx:responseError and surfaces the response body as a
bottom-right toast — every server-side validation error now
becomes visible without per-handler JS wiring. Also handles
custom rm:toast events for future server-pushed notifications
via the HX-Trigger header. Themed via existing CSS vars.
* Dashboard rows are now whole-row clickable to host detail
(CSS card-link pattern: absolute-positioned anchor + .row-action
z-index restoration so the action button stays clickable).
"View →" on a running job links to /jobs/<id> rather than
/hosts/<id> since the row click already covers the host page.
* "Run first" / "Run first backup" → "Run now" everywhere for
consistency.
* runbook (docs/e2e-smoke.md) updated — live-log streaming step
now reflects P1-26; mentions the browser-driven Run-now flow.
* _diag/dump-creds — moved out of cmd/ so go build doesn't pick
it up; .gitignore now excludes /_diag/ entirely.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
146 lines
4.1 KiB
Go
146 lines
4.1 KiB
Go
package ws
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
)
|
|
|
|
// JobHub fans agent-emitted job messages (job.progress, log.stream,
|
|
// job.started, job.finished) out to every browser currently watching
|
|
// the matching job_id over /api/jobs/{id}/stream.
|
|
//
|
|
// Decoupled from the agent Hub: many subscribers per job_id, all
|
|
// read-only, lifecycle tied to the browser WS rather than the agent's.
|
|
type JobHub struct {
|
|
mu sync.RWMutex
|
|
subs map[string]map[*Subscriber]struct{} // job_id → set
|
|
}
|
|
|
|
// NewJobHub returns an empty hub.
|
|
func NewJobHub() *JobHub {
|
|
return &JobHub{subs: make(map[string]map[*Subscriber]struct{})}
|
|
}
|
|
|
|
// Subscriber is one browser WS subscription. Each gets its own
|
|
// buffered channel so a slow client can't block the broadcaster (or,
|
|
// transitively, the agent's read loop).
|
|
//
|
|
// Two-phase usage: Register() returns a Subscriber that's already in
|
|
// the hub's set (so concurrent Broadcasts will reach it), but no
|
|
// pump goroutine runs yet. The caller can prime the channel via Send
|
|
// — useful for late-subscriber catch-up — and then call Run to start
|
|
// the pump. Run blocks until ctx is cancelled or conn dies, and
|
|
// unregisters on return.
|
|
type Subscriber struct {
|
|
hub *JobHub
|
|
jobID string
|
|
ch chan api.Envelope
|
|
}
|
|
|
|
// Register adds a subscriber for jobID and returns it. The caller
|
|
// MUST call Run to pump messages — until then the subscriber's
|
|
// channel buffers silently (up to its capacity, then drops).
|
|
//
|
|
// Use Register + Send + Run when you need to prime the channel from
|
|
// the calling goroutine before the pump starts (e.g. to send a
|
|
// synthetic job.finished to a late subscriber whose target job is
|
|
// already terminal). For the simple case use Subscribe.
|
|
func (h *JobHub) Register(jobID string) *Subscriber {
|
|
const buf = 64
|
|
s := &Subscriber{hub: h, jobID: jobID, ch: make(chan api.Envelope, buf)}
|
|
h.mu.Lock()
|
|
if h.subs[jobID] == nil {
|
|
h.subs[jobID] = make(map[*Subscriber]struct{})
|
|
}
|
|
h.subs[jobID][s] = struct{}{}
|
|
h.mu.Unlock()
|
|
return s
|
|
}
|
|
|
|
// Send pushes env onto the subscriber's channel. Non-blocking: if the
|
|
// buffer is full, the message is dropped and a warning is logged.
|
|
func (s *Subscriber) Send(env api.Envelope) {
|
|
select {
|
|
case s.ch <- env:
|
|
default:
|
|
slog.Warn("ws browser sub: send buffer full, dropping message",
|
|
"job_id", s.jobID, "type", env.Type)
|
|
}
|
|
}
|
|
|
|
// Run pumps messages from the subscriber's channel onto conn until
|
|
// ctx is cancelled or conn dies. Unregisters on return. Caller is
|
|
// expected to invoke this from the goroutine that owns conn.
|
|
func (s *Subscriber) Run(ctx context.Context, conn *Conn) {
|
|
defer s.unregister()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case env, ok := <-s.ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
err := conn.Send(sendCtx, env)
|
|
cancel()
|
|
if err != nil {
|
|
slog.Info("ws browser send failed; closing subscriber",
|
|
"job_id", s.jobID, "err", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Subscriber) unregister() {
|
|
s.hub.mu.Lock()
|
|
if set, ok := s.hub.subs[s.jobID]; ok {
|
|
delete(set, s)
|
|
if len(set) == 0 {
|
|
delete(s.hub.subs, s.jobID)
|
|
}
|
|
}
|
|
s.hub.mu.Unlock()
|
|
}
|
|
|
|
// Subscribe is a one-call convenience for callers that don't need to
|
|
// prime the channel before the pump. Equivalent to Register + Run.
|
|
func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) {
|
|
s := h.Register(jobID)
|
|
s.Run(ctx, conn)
|
|
}
|
|
|
|
// Broadcast sends env to every subscriber for jobID. Non-blocking:
|
|
// if a subscriber's buffer is full, the message is dropped for that
|
|
// subscriber. Other subscribers are unaffected.
|
|
func (h *JobHub) Broadcast(jobID string, env api.Envelope) {
|
|
h.mu.RLock()
|
|
set := h.subs[jobID]
|
|
if len(set) == 0 {
|
|
h.mu.RUnlock()
|
|
return
|
|
}
|
|
targets := make([]*Subscriber, 0, len(set))
|
|
for s := range set {
|
|
targets = append(targets, s)
|
|
}
|
|
h.mu.RUnlock()
|
|
|
|
for _, s := range targets {
|
|
s.Send(env)
|
|
}
|
|
}
|
|
|
|
// SubscriberCount returns the number of browsers currently watching
|
|
// jobID.
|
|
func (h *JobHub) SubscriberCount(jobID string) int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.subs[jobID])
|
|
}
|