Files
restic-manager/internal/agent/runner/runner.go
T
steve ee3ee241ea
CI / Test (linux/amd64) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Build (windows/amd64) (push) Has been cancelled
CI / Build (linux/amd64) (push) Has been cancelled
CI / Build (linux/arm64) (push) Has been cancelled
P1 polish: agent-as-root, init-repo flow, rest creds passthrough, UX fixes
Cohesive batch from a smoke-test session against a real rest-server.
Themed bullets:

* Agent runs as root, sandboxed via systemd. CapabilityBoundingSet
  drops to CAP_DAC_READ_SEARCH + restore caps; ProtectSystem=strict
  with ReadWritePaths confined to /etc + /var/lib/restic-manager;
  NoNewPrivileges blocks escalation. Install script no longer
  creates a service user. spec.md §4.2 / §14.1 / §14.3 explain the
  rationale (matches UrBackup / Veeam / Bareos defaults; trying to
  back up "everything" as an unprivileged user creates silent skips
  on /home, /root, /var/lib/* with no upside vs the threat model
  the agent already implies).

* Init-repo end-to-end. New JobKind="init" wired through agent
  runner, restic.Env.RunInit, server dispatcher, and a UI button
  (red "Initialise repo" in the run-now panel). hosts.repo_initialised_at
  flips on init success, on backup success, or on a non-empty
  snapshots.report. The "Run now" / "Init" / "Retry" branching now
  drives both the dashboard host row and the host-detail panel.
  Migrations 0004 (column), 0005 (jobs.kind CHECK widened — using
  the safe create-new-then-rename pattern; first version corrupted
  job_logs.job_id FK), 0006 (cleans up job_logs FK on already-
  affected DBs).

* rest-server creds embedded at exec time only. restic.Env gains
  RepoUsername; mergeRestCreds() builds the user:pass@-prefixed URL
  inside envSlice() and never assigns it back to the struct, so
  nothing slog-able ever sees the cleartext form. RedactURL helper
  for any future surface that needs to log a URL safely. Both
  helpers tested.

* Add-host UX. Repo password is now optional — server mints a
  24-byte URL-safe random one and surfaces it once, alongside an
  htpasswd snippet ("echo PASS | htpasswd -B -i ... USERNAME") so
  the operator pastes one command on the rest-server host and one
  on the endpoint. Result page also links the install snippet at
  /install/install.sh (was /install.sh — 404'd before) and pipes
  to bash (not sh — script uses set -o pipefail and other
  bashisms; on Debian/Ubuntu sh is dash).

* Late-subscriber race in JobHub. A fast-failing job could finish
  (DB write + Broadcast) before the browser's HX-Redirect → page
  load → WS-connect path completed, so the JS sat forever waiting
  on a job.finished that already passed. JobHub split into
  Register + Send + Run; handleJobStream now subscribes first,
  re-fetches the job, and sends a synthetic job.finished if the
  state is already terminal.

* HTMX error visibility. New toast partial listens to
  htmx:responseError and surfaces the response body as a
  bottom-right toast — every server-side validation error now
  becomes visible without per-handler JS wiring. Also handles
  custom rm:toast events for future server-pushed notifications
  via the HX-Trigger header. Themed via existing CSS vars.

* Dashboard rows are now whole-row clickable to host detail
  (CSS card-link pattern: absolute-positioned anchor + .row-action
  z-index restoration so the action button stays clickable).
  "View →" on a running job links to /jobs/<id> rather than
  /hosts/<id> since the row click already covers the host page.

* "Run first" / "Run first backup" → "Run now" everywhere for
  consistency.

* runbook (docs/e2e-smoke.md) updated — live-log streaming step
  now reflects P1-26; mentions the browser-driven Run-now flow.

* _diag/dump-creds — moved out of cmd/ so go build doesn't pick
  it up; .gitignore now excludes /_diag/ entirely.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 11:02:12 +01:00

249 lines
6.8 KiB
Go

// Package runner spawns restic processes for the agent. It owns one
// Run() invocation per command.run; concurrency limits live a layer
// up (the WS handler).
package runner
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync/atomic"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
)
// Sender is the agent's outbound message channel. Provided by
// wsclient so the runner can push job.started / job.progress /
// job.finished / log.stream back to the server.
type Sender interface {
Send(env api.Envelope) error
}
// Config bundles the long-lived settings the runner needs. They come
// from the agent's config file (server-pushed config.update payloads
// override these in memory).
type Config struct {
ResticBin string
RepoURL string
RepoUsername string
RepoPassword string
}
// Runner owns the restic invocations.
type Runner struct {
cfg Config
tx Sender
// progress throttling: we receive a status event from restic
// every ~100ms; the UI doesn't need anywhere near that rate.
// Cap WS sends to one per N (configurable; default 1s).
progressMinPeriod time.Duration
}
// New builds a Runner. progressMinPeriod = 0 uses the default 1s.
func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
if progressMinPeriod <= 0 {
progressMinPeriod = time.Second
}
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
}
// RunBackup executes a backup job and reports back via the sender.
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
startedAt := time.Now().UTC()
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
JobID: jobID, Kind: api.JobBackup, StartedAt: startedAt,
})
if err := r.tx.Send(startEnv); err != nil {
slog.Warn("runner: send job.started", "err", err)
}
env := restic.Env{
Bin: r.cfg.ResticBin,
RepoURL: r.cfg.RepoURL,
RepoUsername: r.cfg.RepoUsername,
RepoPassword: r.cfg.RepoPassword,
}
var seq atomic.Int64
lastProgress := time.Now()
handle := func(stream string, line string, ev any) {
// Forward every line to the server as log.stream.
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
// Throttled progress events.
if status, ok := ev.(restic.BackupStatus); ok {
if time.Since(lastProgress) < r.progressMinPeriod {
return
}
lastProgress = time.Now()
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
JobID: jobID,
PercentDone: status.PercentDone,
FilesDone: status.FilesDone,
TotalFiles: status.TotalFiles,
BytesDone: status.BytesDone,
TotalBytes: status.TotalBytes,
ETASeconds: status.SecondsRem,
ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed),
})
_ = r.tx.Send(progEnv)
}
}
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
finishedAt := time.Now().UTC()
status := api.JobSucceeded
exit := 0
errMsg := ""
if err != nil {
status = api.JobFailed
exit = -1
errMsg = err.Error()
}
var statsBlob json.RawMessage
if summary != nil {
statsBlob, _ = json.Marshal(summary)
}
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
JobID: jobID,
Status: status,
ExitCode: exit,
FinishedAt: finishedAt,
Stats: statsBlob,
Error: errMsg,
})
_ = r.tx.Send(finEnv)
// On a successful backup, refresh the server's snapshot projection.
// We do this *after* job.finished so the UI sees the job land first;
// the snapshot list is a follow-up that the host detail page polls
// or the dashboard sees on its next refresh. A failure here is
// logged but doesn't fail the job — the next successful backup will
// catch the projection up.
if err == nil {
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr)
}
}
if err != nil {
return fmt.Errorf("runner backup: %w", err)
}
return nil
}
// RunInit executes a repo-init job and reports back via the sender.
// Returns nil on success. Same envelope shape as RunBackup so the
// browser-side log viewer just works.
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
JobID: jobID, Kind: api.JobInit, StartedAt: startedAt,
})
if err := r.tx.Send(startEnv); err != nil {
slog.Warn("runner: send job.started (init)", "err", err)
}
env := restic.Env{
Bin: r.cfg.ResticBin,
RepoURL: r.cfg.RepoURL,
RepoUsername: r.cfg.RepoUsername,
RepoPassword: r.cfg.RepoPassword,
}
var seq atomic.Int64
handle := func(stream string, line string, _ any) {
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
}
err := env.RunInit(ctx, handle)
finishedAt := time.Now().UTC()
status := api.JobSucceeded
exit := 0
errMsg := ""
if err != nil {
status = api.JobFailed
exit = -1
errMsg = err.Error()
}
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
JobID: jobID,
Status: status,
ExitCode: exit,
FinishedAt: finishedAt,
Error: errMsg,
})
_ = r.tx.Send(finEnv)
if err != nil {
return fmt.Errorf("runner init: %w", err)
}
return nil
}
// reportSnapshots calls `restic snapshots --json`, translates the
// payload into the wire shape, and ships it as a snapshots.report
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
// hang the runner forever; restic snapshots is normally sub-second.
func (r *Runner) reportSnapshots(ctx context.Context, env restic.Env) error {
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
snaps, err := env.ListSnapshots(listCtx)
if err != nil {
return err
}
out := make([]api.Snapshot, len(snaps))
for i, s := range snaps {
out[i] = api.Snapshot{
ID: s.ID,
ShortID: s.ShortID,
Time: s.Time.UTC(),
Hostname: s.Hostname,
Paths: s.Paths,
Tags: s.Tags,
}
if s.Summary != nil {
out[i].SizeBytes = s.Summary.TotalBytesProcessed
out[i].FileCount = s.Summary.TotalFilesProcessed
}
}
envOut, err := api.Marshal(api.MsgSnapshotsRpt, "", api.SnapshotsReportPayload{
Snapshots: out,
})
if err != nil {
return err
}
return r.tx.Send(envOut)
}
func throughput(bytesDone, secondsElapsed int64) int64 {
if secondsElapsed <= 0 {
return 0
}
return bytesDone / secondsElapsed
}