agent+server: P2R-11 pre/post hook execution for backup jobs

Agent: new runner.BackupHooks struct + runHook helper invoked via
/bin/sh -c (cmd.exe /C on Windows). pre_hook non-zero exit aborts
the backup; post_hook always runs with RM_JOB_STATUS=succeeded|failed
in env. Output streamed as 'hook(<phase>): …' log.stream lines.
Hooks only run for kind=backup (other kinds skip both phases).

Server: resolveBackupHooks resolves group → host default → empty,
decrypts via crypto.AEAD with per-slot ad bytes, plumbs plaintext
into CommandRunPayload for both schedule.fire and per-group
Run-now dispatch sites. Decrypt failures degrade silently to no
hook so a malformed blob can't poison every backup.
This commit is contained in:
2026-05-04 10:57:28 +01:00
parent 18b0bf976d
commit 7b1990cf11
11 changed files with 379 additions and 52 deletions
+106
View File
@@ -0,0 +1,106 @@
// hooks.go — pre/post backup hooks for the agent runner (P2R-11).
//
// Hooks fire only for backup jobs (the runner's other kinds —
// init/forget/prune/check/unlock — call shell scripts that touch
// repo internals; running operator hooks for those would be
// surprising). Hook bodies arrive plaintext on the wire (server
// decrypted before the WS push). The agent never persists them
// to disk; they live in memory for the lifetime of one job.
//
// Failure semantics:
// - pre_hook non-zero exit aborts the backup: the runner returns
// the error, the job is recorded as failed, and the actual
// restic invocation never runs.
// - post_hook non-zero exit is logged with a warning prefix in
// the job log but does NOT change the job status — the operator
// wants the backup result preserved even if the cleanup step
// misbehaved.
//
// Streaming: each line of the hook's stdout/stderr is shipped as a
// log.stream envelope with payload prefixed `hook: ` so the live
// log viewer can visually separate it from restic's own output.
package runner
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"runtime"
"sync/atomic"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// runHook executes script via the host shell. status is the value
// passed as RM_JOB_STATUS in the env (empty for pre-hooks; the
// final job status — "succeeded" or "failed" — for post-hooks).
// Returns an error iff the hook exited non-zero. ctx cancellation
// kills the subprocess.
func (r *Runner) runHook(ctx context.Context, jobID, phase, script, status string, seq *atomic.Int64) error {
if script == "" {
return nil
}
shell, flag := defaultShell()
cmd := exec.CommandContext(ctx, shell, flag, script)
cmd.Env = []string{
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
}
if status != "" {
cmd.Env = append(cmd.Env, "RM_JOB_STATUS="+status)
}
cmd.Env = append(cmd.Env, "RM_JOB_ID="+jobID, "RM_HOOK_PHASE="+phase)
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("hook %s: stdout pipe: %w", phase, err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("hook %s: stderr pipe: %w", phase, err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("hook %s: start: %w", phase, err)
}
done := make(chan struct{}, 2)
go func() { r.pumpHookLines(stdout, "stdout", phase, jobID, seq); done <- struct{}{} }()
go func() { r.pumpHookLines(stderr, "stderr", phase, jobID, seq); done <- struct{}{} }()
<-done
<-done
if werr := cmd.Wait(); werr != nil {
return fmt.Errorf("hook %s exited non-zero: %w", phase, werr)
}
return nil
}
// pumpHookLines streams lines as log.stream envelopes prefixed with
// "hook(<phase>): " so the live log can visually separate them.
func (r *Runner) pumpHookLines(rd io.Reader, stream, phase, jobID string, seq *atomic.Int64) {
scanner := bufio.NewScanner(rd)
scanner.Buffer(make([]byte, 0, 64*1024), 256*1024)
for scanner.Scan() {
line := "hook(" + phase + "): " + scanner.Text()
env, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: time.Now().UTC(),
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(env)
}
}
// defaultShell returns the (binary, single-arg-flag) pair to use for
// `<shell> <flag> "<script>"`. /bin/sh -c on Unix; cmd.exe /C on
// Windows. The hook author writes whichever shell they prefer
// inside the script body itself (PowerShell, bash, etc) — this is
// just the bootstrap interpreter.
func defaultShell() (string, string) {
if runtime.GOOS == "windows" {
return "cmd.exe", "/C"
}
return "/bin/sh", "-c"
}
+90
View File
@@ -0,0 +1,90 @@
// hooks_test.go — pre/post backup hook semantics (P2R-11).
package runner
import (
"context"
"strings"
"testing"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// TestPreHookFailureAbortsBackup: pre_hook exits 1 → restic never
// runs, job is recorded failed with the hook's error.
func TestPreHookFailureAbortsBackup(t *testing.T) {
t.Parallel()
// Restic script that records every invocation. If restic was
// called we'll see "restic-was-here" in the captured log.
bin := setupScript(t, `echo "restic-was-here"`)
tx := &fakeSender{}
r := New(Config{ResticBin: bin}, tx, 0)
err := r.RunBackup(context.Background(), "job-pre",
[]string{"/etc"}, nil, []string{"tag"},
BackupHooks{Pre: "exit 1"})
if err == nil {
t.Fatal("expected RunBackup to return an error from failed pre_hook")
}
if !strings.Contains(err.Error(), "pre_hook failed") {
t.Fatalf("error message: %q (want 'pre_hook failed')", err)
}
// job.finished arrived with status=failed.
finEnv := firstEnvOfType(t, tx.envs, api.MsgJobFinished)
var fin api.JobFinishedPayload
_ = finEnv.UnmarshalPayload(&fin)
if fin.Status != api.JobFailed {
t.Fatalf("status: %q, want failed", fin.Status)
}
// restic must NOT have run.
for _, env := range tx.envs {
if env.Type != api.MsgLogStream {
continue
}
var l api.LogStreamLine
_ = env.UnmarshalPayload(&l)
if strings.Contains(l.Payload, "restic-was-here") {
t.Fatal("restic was invoked despite pre_hook failure")
}
}
}
// TestPostHookRunsAfterBackup: post_hook fires after a successful
// backup and receives RM_JOB_STATUS=succeeded in the env.
func TestPostHookRunsAfterBackup(t *testing.T) {
t.Parallel()
bin := setupScript(t, `
case "$1" in
backup) echo '{"message_type":"summary","snapshot_id":"abc"}' ;;
snapshots) echo '[]' ;;
stats) echo '{"total_size":0,"total_uncompressed_size":0,"snapshots_count":0,"total_file_count":0,"total_blob_count":0}' ;;
*) exit 0 ;;
esac
`)
tx := &fakeSender{}
r := New(Config{ResticBin: bin}, tx, 0)
post := `echo "post-status=$RM_JOB_STATUS phase=$RM_HOOK_PHASE"`
if err := r.RunBackup(context.Background(), "job-post",
[]string{"/etc"}, nil, nil, BackupHooks{Post: post}); err != nil {
t.Fatalf("RunBackup: %v", err)
}
// Walk log.stream envelopes; one of them should be the post-hook
// line with the expected status.
var found bool
for _, env := range tx.envs {
if env.Type != api.MsgLogStream {
continue
}
var l api.LogStreamLine
_ = env.UnmarshalPayload(&l)
if strings.Contains(l.Payload, "post-status=succeeded") &&
strings.Contains(l.Payload, "phase=post") {
found = true
break
}
}
if !found {
t.Fatal("post_hook output not found in log.stream envelopes")
}
}
+36 -3
View File
@@ -116,15 +116,34 @@ func (r *Runner) sendFinished(jobID string, finishedAt time.Time, err error, sta
_ = r.tx.Send(finEnv)
}
// BackupHooks bundles the optional pre/post shell snippets that fire
// around a backup. Empty fields skip that phase. Resolved server-side
// (group → host default) before dispatch; the agent just executes
// whatever arrives in the payload.
type BackupHooks struct {
Pre string
Post string
}
// RunBackup executes a backup job and reports back via the sender.
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string, hooks BackupHooks) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobBackup, startedAt)
env := r.resticEnv()
var seq atomic.Int64
// pre_hook: non-zero exit aborts the backup. The job is recorded
// as failed with the hook's error and restic never runs.
if hooks.Pre != "" {
if err := r.runHook(ctx, jobID, "pre", hooks.Pre, "", &seq); err != nil {
finishedAt := time.Now().UTC()
r.sendFinished(jobID, finishedAt, err, nil)
return fmt.Errorf("pre_hook failed: %w", err)
}
}
env := r.resticEnv()
lastProgress := time.Now()
handle := func(stream string, line string, ev any) {
@@ -173,6 +192,20 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
if summary != nil {
statsBlob, _ = json.Marshal(summary)
}
// post_hook: always runs regardless of backup outcome. Receives
// RM_JOB_STATUS=succeeded|failed in env. Non-zero exit is logged
// but does not change the recorded job status.
if hooks.Post != "" {
status := "succeeded"
if err != nil {
status = "failed"
}
if perr := r.runHook(ctx, jobID, "post", hooks.Post, status, &seq); perr != nil {
slog.Warn("runner: post_hook exited non-zero", "job_id", jobID, "err", perr)
}
}
r.sendFinished(jobID, finishedAt, err, statsBlob)
// On a successful backup, refresh the server's snapshot projection.