P1 polish: agent-as-root, init-repo flow, rest creds passthrough, UX fixes

Cohesive batch from a smoke-test session against a real rest-server.
Themed bullets:

* Agent runs as root, sandboxed via systemd. CapabilityBoundingSet
  drops to CAP_DAC_READ_SEARCH + restore caps; ProtectSystem=strict
  with ReadWritePaths confined to /etc + /var/lib/restic-manager;
  NoNewPrivileges blocks escalation. Install script no longer
  creates a service user. spec.md §4.2 / §14.1 / §14.3 explain the
  rationale (matches UrBackup / Veeam / Bareos defaults; trying to
  back up "everything" as an unprivileged user creates silent skips
  on /home, /root, /var/lib/* with no upside vs the threat model
  the agent already implies).

* Init-repo end-to-end. New JobKind="init" wired through agent
  runner, restic.Env.RunInit, server dispatcher, and a UI button
  (red "Initialise repo" in the run-now panel). hosts.repo_initialised_at
  flips on init success, on backup success, or on a non-empty
  snapshots.report. The "Run now" / "Init" / "Retry" branching now
  drives both the dashboard host row and the host-detail panel.
  Migrations 0004 (column), 0005 (jobs.kind CHECK widened — using
  the safe create-new-then-rename pattern; first version corrupted
  job_logs.job_id FK), 0006 (cleans up job_logs FK on already-
  affected DBs).

* rest-server creds embedded at exec time only. restic.Env gains
  RepoUsername; mergeRestCreds() builds the user:pass@-prefixed URL
  inside envSlice() and never assigns it back to the struct, so
  nothing slog-able ever sees the cleartext form. RedactURL helper
  for any future surface that needs to log a URL safely. Both
  helpers tested.

* Add-host UX. Repo password is now optional — server mints a
  24-byte URL-safe random one and surfaces it once, alongside an
  htpasswd snippet ("echo PASS | htpasswd -B -i ... USERNAME") so
  the operator pastes one command on the rest-server host and one
  on the endpoint. Result page also links the install snippet at
  /install/install.sh (was /install.sh — 404'd before) and pipes
  to bash (not sh — script uses set -o pipefail and other
  bashisms; on Debian/Ubuntu sh is dash).

* Late-subscriber race in JobHub. A fast-failing job could finish
  (DB write + Broadcast) before the browser's HX-Redirect → page
  load → WS-connect path completed, so the JS sat forever waiting
  on a job.finished that already passed. JobHub split into
  Register + Send + Run; handleJobStream now subscribes first,
  re-fetches the job, and sends a synthetic job.finished if the
  state is already terminal.

* HTMX error visibility. New toast partial listens to
  htmx:responseError and surfaces the response body as a
  bottom-right toast — every server-side validation error now
  becomes visible without per-handler JS wiring. Also handles
  custom rm:toast events for future server-pushed notifications
  via the HX-Trigger header. Themed via existing CSS vars.

* Dashboard rows are now whole-row clickable to host detail
  (CSS card-link pattern: absolute-positioned anchor + .row-action
  z-index restoration so the action button stays clickable).
  "View →" on a running job links to /jobs/<id> rather than
  /hosts/<id> since the row click already covers the host page.

* "Run first" / "Run first backup" → "Run now" everywhere for
  consistency.

* runbook (docs/e2e-smoke.md) updated — live-log streaming step
  now reflects P1-26; mentions the browser-driven Run-now flow.

* _diag/dump-creds — moved out of cmd/ so go build doesn't pick
  it up; .gitignore now excludes /_diag/ entirely.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 11:02:12 +01:00
parent 8aa635f0c1
commit c8ead66f08
29 changed files with 885 additions and 129 deletions
+59
View File
@@ -28,6 +28,7 @@ type Sender interface {
type Config struct {
ResticBin string
RepoURL string
RepoUsername string
RepoPassword string
}
@@ -65,6 +66,7 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
env := restic.Env{
Bin: r.cfg.ResticBin,
RepoURL: r.cfg.RepoURL,
RepoUsername: r.cfg.RepoUsername,
RepoPassword: r.cfg.RepoPassword,
}
@@ -146,6 +148,63 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
return nil
}
// RunInit executes a repo-init job and reports back via the sender.
// Returns nil on success. Same envelope shape as RunBackup so the
// browser-side log viewer just works.
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
JobID: jobID, Kind: api.JobInit, StartedAt: startedAt,
})
if err := r.tx.Send(startEnv); err != nil {
slog.Warn("runner: send job.started (init)", "err", err)
}
env := restic.Env{
Bin: r.cfg.ResticBin,
RepoURL: r.cfg.RepoURL,
RepoUsername: r.cfg.RepoUsername,
RepoPassword: r.cfg.RepoPassword,
}
var seq atomic.Int64
handle := func(stream string, line string, _ any) {
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
}
err := env.RunInit(ctx, handle)
finishedAt := time.Now().UTC()
status := api.JobSucceeded
exit := 0
errMsg := ""
if err != nil {
status = api.JobFailed
exit = -1
errMsg = err.Error()
}
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
JobID: jobID,
Status: status,
ExitCode: exit,
FinishedAt: finishedAt,
Error: errMsg,
})
_ = r.tx.Send(finEnv)
if err != nil {
return fmt.Errorf("runner init: %w", err)
}
return nil
}
// reportSnapshots calls `restic snapshots --json`, translates the
// payload into the wire shape, and ships it as a snapshots.report
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
+1
View File
@@ -47,6 +47,7 @@ type JobKind string
const (
JobBackup JobKind = "backup"
JobInit JobKind = "init"
JobForget JobKind = "forget"
JobPrune JobKind = "prune"
JobCheck JobKind = "check"
+61 -3
View File
@@ -33,10 +33,19 @@ func Locate(override string) (string, error) {
}
// Env is the per-invocation context for a restic command.
//
// RepoURL is the bare URL as the operator typed it — no embedded
// credentials. RepoUsername (optional) carries the HTTP basic-auth
// user for `rest:` repos. The merged URL (with `user:pass@host`
// embedded) is built once inside envSlice() at the moment of exec
// and fed straight to the subprocess via RESTIC_REPOSITORY; we
// never assign it back to Env, never pass it to slog. If anything
// in this package ever needs to *log* a URL, use RedactURL.
type Env struct {
Bin string // path to restic binary
RepoURL string // RESTIC_REPOSITORY
RepoPassword string // RESTIC_PASSWORD (passed via env, never argv)
RepoURL string // RESTIC_REPOSITORY (no embedded creds)
RepoUsername string // optional HTTP basic-auth user for rest: URLs
RepoPassword string // doubles as RESTIC_PASSWORD and (for rest:) HTTP basic-auth password
ExtraEnv map[string]string // any other RESTIC_* / passthrough
WorkDir string // CWD; default = current
}
@@ -140,6 +149,55 @@ func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, hand
return summary, nil
}
// RunInit executes `restic init` against the configured repo. Returns
// nil on success. Restic init's output is small and not JSON-rich;
// we tee stdout/stderr verbatim through handle so the operator sees
// the same lines they'd see at the CLI ("created restic repository
// <id> at <url>" on success, "config file already exists" on a
// re-init attempt, etc.).
func (e Env) RunInit(ctx context.Context, handle LineHandler) error {
cmd := exec.CommandContext(ctx, e.Bin, "init")
cmd.Env = e.envSlice()
cmd.Dir = e.WorkDir
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("restic init: stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("restic init: stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("restic init: start: %w", err)
}
done := make(chan error, 2)
go func() { done <- pumpPlain(stdout, "stdout", handle) }()
go func() { done <- pumpPlain(stderr, "stderr", handle) }()
for i := 0; i < 2; i++ {
if err := <-done; err != nil && handle != nil {
handle("event", fmt.Sprintf("pump error: %v", err), nil)
}
}
if werr := cmd.Wait(); werr != nil {
return fmt.Errorf("restic init: %w", werr)
}
return nil
}
func pumpPlain(r io.Reader, stream string, handle LineHandler) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
if handle != nil {
handle(stream, scanner.Text(), nil)
}
}
return scanner.Err()
}
// envSlice converts Env's typed fields into the os/exec env shape.
//
// Deliberately does NOT inherit the parent process's environment:
@@ -164,7 +222,7 @@ func (e Env) envSlice() []string {
xdg = x
}
out := []string{
"RESTIC_REPOSITORY=" + e.RepoURL,
"RESTIC_REPOSITORY=" + mergeRestCreds(e.RepoURL, e.RepoUsername, e.RepoPassword),
"RESTIC_PASSWORD=" + e.RepoPassword,
// Feed restic via env-only — keeps creds off ps(1).
"PATH=/usr/local/bin:/usr/bin:/bin",
+85
View File
@@ -0,0 +1,85 @@
package restic
import (
"net/url"
"strings"
)
// mergeRestCreds embeds basic-auth user:pass into a `rest:` URL, only
// at the moment we hand it off to the restic subprocess. The result
// is intentionally NOT stored on Env or logged — restic's REST
// backend reads basic-auth from the URL only, so we have nowhere
// else to put them. Callers must treat the return value as
// secret-bearing and feed it straight into exec env.
//
// No-ops when:
// - the URL has no `rest:` prefix (other backends — s3, b2, sftp,
// etc. — get creds via their own env vars);
// - the URL already embeds user:pass (operator typed creds inline);
// - username is empty.
//
// Returns rawURL unchanged if it can't be parsed; restic will then
// reject it and the operator gets a clear error rather than a silent
// "I quietly stripped your URL" surprise.
func mergeRestCreds(rawURL, username, password string) string {
if !strings.HasPrefix(rawURL, "rest:") {
return rawURL
}
if username == "" {
return rawURL
}
inner := strings.TrimPrefix(rawURL, "rest:")
u, err := url.Parse(inner)
if err != nil || u.Host == "" {
// Either unparseable or a relative URL we shouldn't touch —
// pass through and let restic complain with a clear message.
return rawURL
}
if u.User != nil {
// Operator already embedded creds — don't overwrite.
return rawURL
}
u.User = url.UserPassword(username, password)
return "rest:" + u.String()
}
// RedactURL returns a logging-safe version of u with any password in
// the userinfo replaced by ***. Mirrors restic's own redaction so
// our logs match what restic prints. Use this — never the bare URL —
// whenever a URL might end up in slog output, audit entries, or any
// surface an operator can read.
//
// Non-restic URLs (s3, b2, sftp, …) pass through unchanged unless
// they happen to embed userinfo, in which case we redact the same
// way for consistency.
func RedactURL(u string) string {
prefix := ""
rest := u
if i := strings.Index(u, ":"); i > 0 && i+3 < len(u) && u[i+1:i+3] == "//" {
// scheme://… — keep "scheme:" intact.
prefix = u[:i+1]
rest = u[i+1:]
} else if strings.HasPrefix(u, "rest:") {
prefix = "rest:"
rest = strings.TrimPrefix(u, "rest:")
}
parsed, err := url.Parse(rest)
if err != nil || parsed.User == nil {
return u
}
if _, hasPass := parsed.User.Password(); !hasPass {
return u
}
// Build the redacted form by hand rather than via url.URL.String(),
// which percent-encodes the redaction marker into "%2A%2A%2A".
user := parsed.User.Username()
parsed.User = nil
rebuilt := parsed.String()
// rebuilt is "scheme://host/path…"; splice user:***@ in after "//".
const sep = "//"
idx := strings.Index(rebuilt, sep)
if idx < 0 {
return u
}
return prefix + rebuilt[:idx+len(sep)] + user + ":***@" + rebuilt[idx+len(sep):]
}
+41
View File
@@ -0,0 +1,41 @@
package restic
import "testing"
func TestMergeRestCreds(t *testing.T) {
cases := []struct {
name, url, user, pass, want string
}{
{"rest with creds", "rest:http://h:8000/p/", "u", "p", "rest:http://u:p@h:8000/p/"},
{"rest no user — no-op", "rest:http://h:8000/p/", "", "p", "rest:http://h:8000/p/"},
{"rest creds already inline — no-op",
"rest:http://existing:secret@h:8000/p/", "u", "p",
"rest:http://existing:secret@h:8000/p/"},
{"non-rest s3 — no-op", "s3:s3.amazonaws.com/bucket", "u", "p", "s3:s3.amazonaws.com/bucket"},
{"unparseable — pass through", "rest:not a url", "u", "p", "rest:not a url"},
{"https URL kept intact", "rest:https://h/p/", "u", "p", "rest:https://u:p@h/p/"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := mergeRestCreds(c.url, c.user, c.pass)
if got != c.want {
t.Fatalf("mergeRestCreds(%q,%q,***) = %q; want %q", c.url, c.user, got, c.want)
}
})
}
}
func TestRedactURL(t *testing.T) {
cases := []struct{ in, want string }{
{"rest:http://u:p@h:8000/p/", "rest:http://u:***@h:8000/p/"},
{"rest:http://h:8000/p/", "rest:http://h:8000/p/"},
{"https://u:p@example/", "https://u:***@example/"},
{"s3:s3.amazonaws.com/bucket", "s3:s3.amazonaws.com/bucket"},
}
for _, c := range cases {
got := RedactURL(c.in)
if got != c.want {
t.Fatalf("RedactURL(%q) = %q; want %q", c.in, got, c.want)
}
}
}
+1 -1
View File
@@ -135,7 +135,7 @@ func (s *Server) requireUser(r *stdhttp.Request) (*store.User, bool) {
func validJobKind(k api.JobKind) bool {
switch k {
case api.JobBackup, api.JobForget, api.JobPrune, api.JobCheck, api.JobUnlock:
case api.JobBackup, api.JobInit, api.JobForget, api.JobPrune, api.JobCheck, api.JobUnlock:
return true
}
return false
+3
View File
@@ -135,6 +135,9 @@ func (s *Server) routes(r chi.Router) {
r.Post("/logout", s.handleUILogoutPost)
// HTMX action endpoint for "Run now" buttons on the dashboard.
r.Post("/hosts/{id}/run-backup", s.handleUIRunBackup)
// HTMX action endpoint for the red "Initialise repo" button
// shown in the run-now panel until the repo is confirmed init'd.
r.Post("/hosts/{id}/init-repo", s.handleUIInitRepo)
// Add host flow.
r.Get("/hosts/new", s.handleUIAddHostGet)
r.Post("/hosts/new", s.handleUIAddHostPost)
+133 -3
View File
@@ -1,6 +1,8 @@
package http
import (
"crypto/rand"
"encoding/base64"
"errors"
"io/fs"
"log/slog"
@@ -178,6 +180,12 @@ func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request)
stdhttp.StatusBadRequest)
return
}
if host.RepoInitialisedAt == nil {
stdhttp.Error(w,
"this host's repo hasn't been initialised yet — click Initialise repo first",
stdhttp.StatusBadRequest)
return
}
res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobBackup, host.DefaultPaths)
if code != "" {
stdhttp.Error(w, msg, status)
@@ -197,6 +205,47 @@ func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request)
stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther)
}
// handleUIInitRepo dispatches a one-shot `restic init` job for a
// host. Surfaced in the run-now panel as a red "Initialise repo"
// button when host.repo_initialised_at IS NULL. On success it
// redirects to the live log page just like Run-now.
func (s *Server) handleUIInitRepo(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
if hostID == "" {
stdhttp.Error(w, "missing host id", stdhttp.StatusBadRequest)
return
}
storeUser, _, err := s.userByID(r, u.ID)
if err != nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobInit, nil)
if code != "" {
stdhttp.Error(w, msg, status)
return
}
target := "/jobs/" + res.JobID
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("HX-Redirect", target)
w.WriteHeader(stdhttp.StatusOK)
return
}
stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther)
}
// addHostPage carries the form state into the Add host template.
// In State A (form), Token is empty. In State B (result), Token is
// populated and the template renders the install command.
@@ -223,6 +272,16 @@ type addHostPage struct {
// install command panel instead of the form.
Token string
ExpiresAt time.Time
// RepoPassword is the password the agent will use against the
// rest-server. When the operator left the password field blank
// we generate one server-side; PasswordGenerated tracks which
// path produced it so the result page can label it appropriately.
// Either way it's surfaced on the result page exactly once,
// inside the htpasswd snippet — same one-time-view rule as the
// enrolment token. Reload = gone.
RepoPassword string
PasswordGenerated bool
}
// handleUIAddHostGet renders the empty Add host form.
@@ -264,8 +323,22 @@ func (s *Server) handleUIAddHostPost(w stdhttp.ResponseWriter, r *stdhttp.Reques
if page.Hostname == "" {
page.Error = "Hostname is required."
} else if page.RepoURL == "" || repoPassword == "" {
page.Error = "Repo URL and password are both required so the agent can back up the moment it comes online."
} else if page.RepoURL == "" {
page.Error = "Repo URL is required so the agent can back up the moment it comes online."
}
// If the operator didn't type a password, mint one. We surface it
// once on the result page (inside the htpasswd snippet) so they
// can paste it into the rest-server's htpasswd file.
if page.Error == "" && repoPassword == "" {
gen, err := generateRepoPassword()
if err != nil {
slog.Error("ui add_host: generate repo password", "err", err)
page.Error = "Couldnt generate a password — see the server log for details."
} else {
repoPassword = gen
page.PasswordGenerated = true
}
}
defaultPaths := splitPaths(page.Paths)
@@ -276,6 +349,7 @@ func (s *Server) handleUIAddHostPost(w stdhttp.ResponseWriter, r *stdhttp.Reques
case nil:
page.Token = token
page.ExpiresAt = expires
page.RepoPassword = repoPassword
case errMissingRepoCreds:
page.Error = "Repo URL and password are both required."
default:
@@ -355,6 +429,18 @@ func (s *Server) handleUIHostDetail(w stdhttp.ResponseWriter, r *stdhttp.Request
}
}
// generateRepoPassword returns a 24-byte URL-safe random string for
// use as a per-host rest-server password. URL-safe alphabet keeps
// it shell-safe inside single quotes — important since the operator
// pastes it into an `htpasswd -i` invocation on the rest-server.
func generateRepoPassword() (string, error) {
var buf [24]byte
if _, err := rand.Read(buf[:]); err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(buf[:]), nil
}
// splitPaths parses the textarea content into a clean []string —
// one path per line, leading/trailing whitespace trimmed, blanks
// dropped.
@@ -479,7 +565,51 @@ func (s *Server) handleJobStream(w stdhttp.ResponseWriter, r *stdhttp.Request) {
// Wrap so we get the same Send semantics as the agent path.
c := ws.NewConn("browser-"+jobID, conn)
s.deps.JobHub.Subscribe(r.Context(), jobID, c)
// Register first so future broadcasts reach us, then re-fetch the
// job to close the late-subscriber race: a fast-failing job can
// finish (DB write + Broadcast) before the browser's WS hop
// completes, leaving the JS waiting forever for a job.finished
// that already passed. If the job is already terminal here, prime
// the subscriber with a synthetic job.finished so the JS reloads.
sub := s.deps.JobHub.Register(jobID)
if cur, gerr := s.deps.Store.GetJob(r.Context(), jobID); gerr == nil && isTerminalJobStatus(cur.Status) {
if env, ferr := buildSyntheticJobFinished(cur); ferr == nil {
sub.Send(env)
}
}
sub.Run(r.Context(), c)
}
func isTerminalJobStatus(s string) bool {
switch api.JobStatus(s) {
case api.JobSucceeded, api.JobFailed, api.JobCancelled:
return true
}
return false
}
func buildSyntheticJobFinished(job *store.Job) (api.Envelope, error) {
var fin time.Time
if job.FinishedAt != nil {
fin = *job.FinishedAt
}
exit := 0
if job.ExitCode != nil {
exit = *job.ExitCode
}
errMsg := ""
if job.Error != nil {
errMsg = *job.Error
}
return api.Marshal(api.MsgJobFinished, "", api.JobFinishedPayload{
JobID: job.ID,
Status: api.JobStatus(job.Status),
ExitCode: exit,
FinishedAt: fin,
Stats: job.Stats,
Error: errMsg,
})
}
// userByID fetches the full store.User the UI session represents.
+1
View File
@@ -89,6 +89,7 @@ func New() (*Renderer, error) {
"templates/layouts/chromeless.html",
"templates/partials/nav.html",
"templates/partials/host_row.html",
"templates/partials/toast.html",
}
pageEntries, err := fs.Glob(web.FS, "templates/pages/*.html")
+19
View File
@@ -196,6 +196,16 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil {
slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err)
}
// A successful backup or init proves the repo exists; flip
// repo_initialised_at on the host (idempotent — set-if-null).
if p.Status == api.JobSucceeded {
if job, err := deps.Store.GetJob(ctx, p.JobID); err == nil &&
(job.Kind == string(api.JobBackup) || job.Kind == string(api.JobInit)) {
if _, err := deps.Store.MarkHostRepoInitialised(ctx, hostID, p.FinishedAt); err != nil {
slog.Warn("ws: mark repo initialised", "host_id", hostID, "err", err)
}
}
}
if deps.JobHub != nil {
deps.JobHub.Broadcast(p.JobID, env)
}
@@ -235,6 +245,15 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
} else {
slog.Info("ws: snapshots refreshed", "host_id", hostID, "count", len(snaps))
}
// A non-empty snapshot list also proves the repo is initialised
// (catches the case where an external job — `restic init` from
// the CLI, or a backup ran outside this control plane —
// initialised it before our first job dispatched).
if len(snaps) > 0 {
if _, err := deps.Store.MarkHostRepoInitialised(ctx, hostID, time.Now().UTC()); err != nil {
slog.Warn("ws: mark repo initialised (snapshots)", "host_id", hostID, "err", err)
}
}
case api.MsgRepoStats, api.MsgScheduleAck, api.MsgCommandResult:
// TODO(P2): persist these projections.
+65 -46
View File
@@ -17,54 +17,66 @@ import (
// read-only, lifecycle tied to the browser WS rather than the agent's.
type JobHub struct {
mu sync.RWMutex
subs map[string]map[*subscriber]struct{} // job_id → set
subs map[string]map[*Subscriber]struct{} // job_id → set
}
// NewJobHub returns an empty hub.
func NewJobHub() *JobHub {
return &JobHub{subs: make(map[string]map[*subscriber]struct{})}
return &JobHub{subs: make(map[string]map[*Subscriber]struct{})}
}
// subscriber is one browser WS subscription. Each gets its own
// buffered channel + writer goroutine so a slow client can't block
// the broadcaster (or, transitively, the agent's read loop).
type subscriber struct {
// Subscriber is one browser WS subscription. Each gets its own
// buffered channel so a slow client can't block the broadcaster (or,
// transitively, the agent's read loop).
//
// Two-phase usage: Register() returns a Subscriber that's already in
// the hub's set (so concurrent Broadcasts will reach it), but no
// pump goroutine runs yet. The caller can prime the channel via Send
// — useful for late-subscriber catch-up — and then call Run to start
// the pump. Run blocks until ctx is cancelled or conn dies, and
// unregisters on return.
type Subscriber struct {
hub *JobHub
jobID string
ch chan api.Envelope
}
// Subscribe registers a new subscriber for jobID. Run pumps messages
// from the subscriber's channel onto conn until ctx is cancelled or
// conn dies; it returns when one of those happens. Caller is
// expected to call this from the goroutine that owns conn.
// Register adds a subscriber for jobID and returns it. The caller
// MUST call Run to pump messages — until then the subscriber's
// channel buffers silently (up to its capacity, then drops).
//
// If the subscriber's send channel fills, broadcasts drop messages
// for that subscriber rather than blocking. The browser will see a
// gap; on completion the page can re-fetch persisted log_lines to
// reconcile.
func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) {
// Use Register + Send + Run when you need to prime the channel from
// the calling goroutine before the pump starts (e.g. to send a
// synthetic job.finished to a late subscriber whose target job is
// already terminal). For the simple case use Subscribe.
func (h *JobHub) Register(jobID string) *Subscriber {
const buf = 64
s := &subscriber{jobID: jobID, ch: make(chan api.Envelope, buf)}
s := &Subscriber{hub: h, jobID: jobID, ch: make(chan api.Envelope, buf)}
h.mu.Lock()
if h.subs[jobID] == nil {
h.subs[jobID] = make(map[*subscriber]struct{})
h.subs[jobID] = make(map[*Subscriber]struct{})
}
h.subs[jobID][s] = struct{}{}
h.mu.Unlock()
return s
}
defer func() {
h.mu.Lock()
if set, ok := h.subs[jobID]; ok {
delete(set, s)
if len(set) == 0 {
delete(h.subs, jobID)
}
}
h.mu.Unlock()
}()
// Send pushes env onto the subscriber's channel. Non-blocking: if the
// buffer is full, the message is dropped and a warning is logged.
func (s *Subscriber) Send(env api.Envelope) {
select {
case s.ch <- env:
default:
slog.Warn("ws browser sub: send buffer full, dropping message",
"job_id", s.jobID, "type", env.Type)
}
}
// Drain pump.
// Run pumps messages from the subscriber's channel onto conn until
// ctx is cancelled or conn dies. Unregisters on return. Caller is
// expected to invoke this from the goroutine that owns conn.
func (s *Subscriber) Run(ctx context.Context, conn *Conn) {
defer s.unregister()
for {
select {
case <-ctx.Done():
@@ -77,20 +89,35 @@ func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) {
err := conn.Send(sendCtx, env)
cancel()
if err != nil {
slog.Info("ws browser send failed; closing subscriber", "job_id", jobID, "err", err)
slog.Info("ws browser send failed; closing subscriber",
"job_id", s.jobID, "err", err)
return
}
}
}
}
func (s *Subscriber) unregister() {
s.hub.mu.Lock()
if set, ok := s.hub.subs[s.jobID]; ok {
delete(set, s)
if len(set) == 0 {
delete(s.hub.subs, s.jobID)
}
}
s.hub.mu.Unlock()
}
// Subscribe is a one-call convenience for callers that don't need to
// prime the channel before the pump. Equivalent to Register + Run.
func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) {
s := h.Register(jobID)
s.Run(ctx, conn)
}
// Broadcast sends env to every subscriber for jobID. Non-blocking:
// if a subscriber's buffer is full, the message is dropped for that
// subscriber and a warning is logged. Other subscribers are
// unaffected.
//
// Safe to call from any goroutine; holds an RLock briefly to snapshot
// the subscriber set, then releases before sending.
// subscriber. Other subscribers are unaffected.
func (h *JobHub) Broadcast(jobID string, env api.Envelope) {
h.mu.RLock()
set := h.subs[jobID]
@@ -98,27 +125,19 @@ func (h *JobHub) Broadcast(jobID string, env api.Envelope) {
h.mu.RUnlock()
return
}
targets := make([]*subscriber, 0, len(set))
targets := make([]*Subscriber, 0, len(set))
for s := range set {
targets = append(targets, s)
}
h.mu.RUnlock()
for _, s := range targets {
select {
case s.ch <- env:
default:
// Buffer full — drop. Logged once per drop; a flood means
// the browser is genuinely stuck, not just slow.
slog.Warn("ws browser sub: send buffer full, dropping message",
"job_id", jobID, "type", env.Type)
}
s.Send(env)
}
}
// SubscriberCount returns the number of browsers currently watching
// jobID. Used for diagnostics / future "this many people are
// watching" counters.
// jobID.
func (h *JobHub) SubscriberCount(jobID string) int {
h.mu.RLock()
defer h.mu.RUnlock()
+28 -4
View File
@@ -50,7 +50,7 @@ func (s *Store) LookupHostByAgentToken(ctx context.Context, tokenHash string) (*
enrolled_at, last_seen_at, status, repo_id, tags,
current_job_id, last_backup_at, last_backup_status,
repo_size_bytes, snapshot_count, open_alert_count,
applied_schedule_version, default_paths
applied_schedule_version, default_paths, repo_initialised_at
FROM hosts WHERE agent_token_hash = ?`,
tokenHash)
return scanHost(row)
@@ -63,7 +63,7 @@ func (s *Store) GetHost(ctx context.Context, id string) (*Host, error) {
enrolled_at, last_seen_at, status, repo_id, tags,
current_job_id, last_backup_at, last_backup_status,
repo_size_bytes, snapshot_count, open_alert_count,
applied_schedule_version, default_paths
applied_schedule_version, default_paths, repo_initialised_at
FROM hosts WHERE id = ?`, id)
return scanHost(row)
}
@@ -124,7 +124,7 @@ func (s *Store) ListHosts(ctx context.Context) ([]Host, error) {
enrolled_at, last_seen_at, status, repo_id, tags,
current_job_id, last_backup_at, last_backup_status,
repo_size_bytes, snapshot_count, open_alert_count,
applied_schedule_version, default_paths
applied_schedule_version, default_paths, repo_initialised_at
FROM hosts ORDER BY name`)
if err != nil {
return nil, fmt.Errorf("store: list hosts: %w", err)
@@ -163,13 +163,14 @@ func scanHostRow(s hostScanner) (*Host, error) {
enrolled string
tags string
defaultPaths string
repoInitAt sql.NullString
)
err := s.Scan(&h.ID, &h.Name, &h.OS, &h.Arch,
&h.AgentVersion, &h.ResticVersion, &h.ProtocolVersion,
&enrolled, &lastSeen, &h.Status, &repoID, &tags,
&currentJob, &lastBackupAt, &lastBkSt,
&h.RepoSizeBytes, &h.SnapshotCount, &h.OpenAlertCount,
&h.AppliedScheduleVersion, &defaultPaths)
&h.AppliedScheduleVersion, &defaultPaths, &repoInitAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
@@ -213,5 +214,28 @@ func scanHostRow(s hostScanner) (*Host, error) {
if defaultPaths != "" {
_ = json.Unmarshal([]byte(defaultPaths), &h.DefaultPaths)
}
if repoInitAt.Valid {
t, err := time.Parse(time.RFC3339Nano, repoInitAt.String)
if err != nil {
return nil, fmt.Errorf("store: parse repo_initialised_at: %w", err)
}
h.RepoInitialisedAt = &t
}
return &h, nil
}
// MarkHostRepoInitialised sets repo_initialised_at to `when` if it is
// currently NULL. Idempotent: re-firing for an already-initialised
// host is a no-op (we never want to clobber the original timestamp).
// Returns true if the row was updated, false if it was already set.
func (s *Store) MarkHostRepoInitialised(ctx context.Context, hostID string, when time.Time) (bool, error) {
res, err := s.db.ExecContext(ctx,
`UPDATE hosts SET repo_initialised_at = ?
WHERE id = ? AND repo_initialised_at IS NULL`,
when.UTC().Format(time.RFC3339Nano), hostID)
if err != nil {
return false, fmt.Errorf("store: mark repo initialised: %w", err)
}
n, _ := res.RowsAffected()
return n > 0, nil
}
@@ -0,0 +1,15 @@
-- 0004_repo_initialised.sql
--
-- Track whether a host's restic repo has been initialised. Set when:
-- 1. a `repo_init` job succeeds, OR
-- 2. any backup job succeeds (proves the repo exists), OR
-- 3. a snapshots.report arrives with at least one snapshot.
--
-- Once set, never cleared by code — only by the operator deleting the
-- host or wiping the column manually if they re-pointed the agent at
-- a different (empty) repo. The UI keys off NULL/non-NULL to decide
-- whether to surface the red "Initialise repo" affordance in the
-- run-now panel.
ALTER TABLE hosts
ADD COLUMN repo_initialised_at TEXT;
@@ -0,0 +1,47 @@
-- 0005_jobs_init_kind.sql
--
-- Add 'init' to the jobs.kind CHECK constraint so the operator can
-- dispatch a `restic init` job from the UI before the first backup.
-- SQLite can't ALTER a CHECK in place, so we rebuild the table.
--
-- Rebuild pattern note: we create jobs_new (with the wider CHECK),
-- copy data over, DROP the original jobs table, then ALTER RENAME
-- jobs_new TO jobs. This avoids the trap of renaming the original
-- first — with legacy_alter_table=OFF (the modern default), a rename
-- propagates into FK references in dependent tables (e.g.
-- job_logs.job_id), leaving them pointing at the temporary name even
-- after we drop it. Migration 0006 cleans up the orphan FK left by
-- the first version of this migration on already-affected DBs.
PRAGMA foreign_keys = OFF;
CREATE TABLE jobs_new (
id TEXT PRIMARY KEY,
host_id TEXT NOT NULL REFERENCES hosts(id) ON DELETE CASCADE,
kind TEXT NOT NULL CHECK (kind IN ('backup','init','forget','prune','check','unlock')),
status TEXT NOT NULL CHECK (status IN ('queued','running','succeeded','failed','cancelled')),
scheduled_id TEXT REFERENCES schedules(id) ON DELETE SET NULL,
actor_kind TEXT NOT NULL CHECK (actor_kind IN ('user','schedule','system')),
actor_id TEXT,
started_at TEXT,
finished_at TEXT,
exit_code INTEGER,
stats TEXT,
error TEXT,
created_at TEXT NOT NULL
);
INSERT INTO jobs_new
SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
started_at, finished_at, exit_code, stats, error, created_at
FROM jobs;
DROP TABLE jobs;
ALTER TABLE jobs_new RENAME TO jobs;
CREATE INDEX jobs_host_id ON jobs(host_id);
CREATE INDEX jobs_status ON jobs(status);
CREATE INDEX jobs_created_at ON jobs(created_at);
PRAGMA foreign_keys = ON;
@@ -0,0 +1,33 @@
-- 0006_fix_job_logs_fk.sql
--
-- Migration 0005 rebuilt the jobs table via the unsafe pattern of
-- renaming the original to jobs_old before dropping it. SQLite (with
-- legacy_alter_table=OFF, the modern default) propagated that rename
-- into the FK declaration of job_logs.job_id, which is now pointing
-- at jobs_old — a table that no longer exists. INSERTs into job_logs
-- fail with "no such table: main.jobs_old (1)".
--
-- Rebuild job_logs using the safe pattern: create job_logs_new with
-- a clean FK to jobs, copy rows, drop the broken job_logs, rename
-- job_logs_new to job_logs. Renaming job_logs_new is safe because
-- nothing references it.
PRAGMA foreign_keys = OFF;
CREATE TABLE job_logs_new (
job_id TEXT NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
seq INTEGER NOT NULL,
ts TEXT NOT NULL,
stream TEXT NOT NULL CHECK (stream IN ('stdout','stderr','event')),
payload TEXT NOT NULL,
PRIMARY KEY (job_id, seq)
);
INSERT INTO job_logs_new (job_id, seq, ts, stream, payload)
SELECT job_id, seq, ts, stream, payload FROM job_logs;
DROP TABLE job_logs;
ALTER TABLE job_logs_new RENAME TO job_logs;
PRAGMA foreign_keys = ON;
+6
View File
@@ -62,6 +62,12 @@ type Host struct {
// operator hits "Run now" without supplying paths. Phase 1
// interim — schedules (P2-01) supersede this.
DefaultPaths []string
// RepoInitialisedAt is non-nil once we've confirmed the host's
// repo has been initialised — either the operator clicked the
// init button, or a backup succeeded, or snapshots.report came
// back non-empty. The host detail run-now panel shows a red
// "Initialise repo" affordance while this is nil.
RepoInitialisedAt *time.Time
}
// EnrollmentToken is the issuer's view of a one-time token. The