Files
restic-manager/internal/agent/runner/runner.go
T
steve 94149a7324 P3-X1: cancel-job feature
Wires the existing job_detail Cancel button (which was a UI stub) into
real backend behaviour:

- internal/api already declared MsgCommandCancel + CommandCancelPayload;
  promote those from forward-declarations to a working envelope. Agent
  side: cmd/agent/main.go drops the TODO-stub and gains a per-job
  ctx.CancelFunc map. runJob's switch is refactored around a small
  spawn() helper so each kind's goroutine derives a per-job context,
  registers the cancel, and removes itself on completion regardless of
  outcome. command.cancel looks up the func and fires it.
- internal/agent/runner.sendFinished now takes ctx and rebadges
  ctx.Canceled errors as JobCancelled (exit 130) rather than
  JobFailed. All Run* call sites updated.
- internal/restic.resticCmd sets cmd.Cancel to send SIGTERM (via
  build-tagged sigterm constant; os.Kill on Windows since SIGTERM
  isn't deliverable there) and cmd.WaitDelay=5s for the SIGKILL
  fallback. SIGTERM lets restic remove its lock file before exiting.
- New POST /api/jobs/{id}/cancel server endpoint validates the job
  is non-terminal and the host is online, sends command.cancel via
  the hub, writes a job.cancel audit row, returns 202. The agent's
  resulting job.finished (status=cancelled) is what actually
  transitions the row.

Tests:
- internal/server/http/cancel_test.go covers happy path (envelope
  shape + audit row), 409 for terminal jobs, 404 for missing jobs,
  503 for offline hosts.
- internal/agent/runner/cancel_test.go covers cancel mid-run: a fake
  restic that exec'd into 'sleep 30' is canceled 150ms after start
  and the resulting job.finished reports JobCancelled with exit 130
  in well under the WaitDelay.

Foundational for P3 restore (operator needs to be able to cancel a
running backup if they need to restore urgently). Independently useful
for prune/check/backup that are stuck.
2026-05-04 15:11:49 +01:00

458 lines
14 KiB
Go

// Package runner spawns restic processes for the agent. It owns one
// Run() invocation per command.run; concurrency limits live a layer
// up (the WS handler).
package runner
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync/atomic"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
)
// Sender is the agent's outbound message channel. Provided by
// wsclient so the runner can push job.started / job.progress /
// job.finished / log.stream back to the server.
type Sender interface {
Send(env api.Envelope) error
}
// Config bundles the long-lived settings the runner needs. They come
// from the agent's config file (server-pushed config.update payloads
// override these in memory).
type Config struct {
ResticBin string
RepoURL string
RepoUsername string
RepoPassword string
// Bandwidth caps in KB/s applied to every restic invocation.
// <=0 means "no cap". Per-job override: callers that build a
// runner per-dispatch can pass the override value here directly.
LimitUploadKBps int
LimitDownloadKBps int
}
// Runner owns the restic invocations.
type Runner struct {
cfg Config
tx Sender
// progress throttling: we receive a status event from restic
// every ~100ms; the UI doesn't need anywhere near that rate.
// Cap WS sends to one per N (configurable; default 1s).
progressMinPeriod time.Duration
}
// New builds a Runner. progressMinPeriod = 0 uses the default 1s.
func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
if progressMinPeriod <= 0 {
progressMinPeriod = time.Second
}
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
}
// resticEnv builds the shared restic.Env from r.cfg.
func (r *Runner) resticEnv() restic.Env {
return restic.Env{
Bin: r.cfg.ResticBin,
RepoURL: r.cfg.RepoURL,
RepoUsername: r.cfg.RepoUsername,
RepoPassword: r.cfg.RepoPassword,
LimitUploadKBps: r.cfg.LimitUploadKBps,
LimitDownloadKBps: r.cfg.LimitDownloadKBps,
}
}
// sendStarted ships a job.started envelope.
func (r *Runner) sendStarted(jobID string, kind api.JobKind, startedAt time.Time) {
env, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
JobID: jobID, Kind: kind, StartedAt: startedAt,
})
if err := r.tx.Send(env); err != nil {
slog.Warn("runner: send job.started", "job_id", jobID, "kind", kind, "err", err)
}
}
// streamHandler returns a LineHandler that ships log.stream envelopes.
func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandler {
return func(stream string, line string, _ any) {
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
}
}
// sendFinished ships a job.finished envelope. err==nil → succeeded;
// otherwise failed (or canceled if ctx was canceled — operator
// hit the Cancel button or the agent is shutting down).
// statsBlob is forwarded as JobFinishedPayload.Stats.
func (r *Runner) sendFinished(ctx context.Context, jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) {
status := api.JobSucceeded
exit := 0
errMsg := ""
if err != nil {
status = api.JobFailed
exit = -1
errMsg = err.Error()
// If the context was canceled, the failure is operator-driven
// (or shutdown). Surface as JobCancelled so the UI shows a
// neutral "canceled" state rather than a red "failed" one.
// exec.CommandContext returns the process's exit error on
// ctx-cancel, which we'd otherwise rebadge as failed.
if ctxErr := ctx.Err(); ctxErr != nil {
status = api.JobCancelled
exit = 130 // POSIX convention for SIGINT/SIGTERM-killed
errMsg = "" // no need to surface the underlying restic error
}
}
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
JobID: jobID,
Status: status,
ExitCode: exit,
FinishedAt: finishedAt,
Stats: statsBlob,
Error: errMsg,
})
_ = r.tx.Send(finEnv)
}
// BackupHooks bundles the optional pre/post shell snippets that fire
// around a backup. Empty fields skip that phase. Resolved server-side
// (group → host default) before dispatch; the agent just executes
// whatever arrives in the payload.
type BackupHooks struct {
Pre string
Post string
}
// RunBackup executes a backup job and reports back via the sender.
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string, hooks BackupHooks) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobBackup, startedAt)
var seq atomic.Int64
// pre_hook: non-zero exit aborts the backup. The job is recorded
// as failed with the hook's error and restic never runs.
if hooks.Pre != "" {
if err := r.runHook(ctx, jobID, "pre", hooks.Pre, "", &seq); err != nil {
finishedAt := time.Now().UTC()
r.sendFinished(ctx, jobID, finishedAt, err, nil)
return fmt.Errorf("pre_hook failed: %w", err)
}
}
env := r.resticEnv()
lastProgress := time.Now()
handle := func(stream string, line string, ev any) {
// Throttled progress events come from restic's `status` JSON.
// We deliberately do NOT forward the raw status line to
// log.stream — it's emitted ~every 16ms by restic --json and
// would drown the live log in dupes for any short backup. The
// progress widget already covers the same information at a
// sane sample rate.
status, isStatus := ev.(restic.BackupStatus)
if !isStatus {
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
}
if isStatus {
if time.Since(lastProgress) < r.progressMinPeriod {
return
}
lastProgress = time.Now()
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
JobID: jobID,
PercentDone: status.PercentDone,
FilesDone: status.FilesDone,
TotalFiles: status.TotalFiles,
BytesDone: status.BytesDone,
TotalBytes: status.TotalBytes,
ETASeconds: status.SecondsRem,
ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed),
})
_ = r.tx.Send(progEnv)
}
}
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
finishedAt := time.Now().UTC()
var statsBlob json.RawMessage
if summary != nil {
statsBlob, _ = json.Marshal(summary)
}
// post_hook: always runs regardless of backup outcome. Receives
// RM_JOB_STATUS=succeeded|failed in env. Non-zero exit is logged
// but does not change the recorded job status.
if hooks.Post != "" {
status := "succeeded"
if err != nil {
status = "failed"
}
if perr := r.runHook(ctx, jobID, "post", hooks.Post, status, &seq); perr != nil {
slog.Warn("runner: post_hook exited non-zero", "job_id", jobID, "err", perr)
}
}
r.sendFinished(ctx, jobID, finishedAt, err, statsBlob)
// On a successful backup, refresh the server's snapshot projection.
// We do this *after* job.finished so the UI sees the job land first;
// the snapshot list is a follow-up that the host detail page polls
// or the dashboard sees on its next refresh. A failure here is
// logged but doesn't fail the job — the next successful backup will
// catch the projection up.
if err == nil {
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr)
}
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{}); rerr != nil {
slog.Warn("runner: stats.report after backup failed", "job_id", jobID, "err", rerr)
}
}
if err != nil {
return fmt.Errorf("runner backup: %w", err)
}
return nil
}
// RunInit executes a repo-init job and reports back via the sender.
// Returns nil on success. Same envelope shape as RunBackup so the
// browser-side log viewer just works.
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobInit, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunInit(ctx, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
r.sendFinished(ctx, jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner init: %w", err)
}
return nil
}
// RunForget executes a forget job against the configured repo by
// invoking `restic forget --tag <Tag> --keep-* …` once per group.
// Same envelope shape as RunBackup so the live log viewer + job
// lifecycle work without special-casing. On success refreshes the
// snapshot projection (forget rewrites the snapshot index — the
// host's snapshot list shrinks). Snapshot refresh runs once after
// every group completes, not per-group.
func (r *Runner) RunForget(ctx context.Context, jobID string, groups []restic.ForgetGroup) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobForget, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunForget(ctx, groups, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
r.sendFinished(ctx, jobID, finishedAt, err, nil)
// Refresh the server's snapshot projection — forget rewrites the
// index so the host's snapshot list almost certainly shrunk.
if err == nil {
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
slog.Warn("runner: snapshots.report after forget failed",
"job_id", jobID, "err", rerr)
}
}
if err != nil {
return fmt.Errorf("runner forget: %w", err)
}
return nil
}
// RunPrune executes a prune job against the configured repo. On
// success it ships a repo.stats envelope with LastPruneAt set (plus
// a full size refresh via RunStats) before the job.finished envelope,
// so the UI can display updated size information alongside the
// completed job. On failure no stats refresh is attempted.
func (r *Runner) RunPrune(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobPrune, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunPrune(ctx, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
if err == nil {
pruneAt := finishedAt
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{LastPruneAt: &pruneAt}); rerr != nil {
slog.Warn("runner: stats.report after prune failed", "job_id", jobID, "err", rerr)
}
}
r.sendFinished(ctx, jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner prune: %w", err)
}
return nil
}
// RunCheck executes a `restic check` job. Always ships a repo.stats
// envelope (success or failure) with LastCheckAt, LastCheckStatus,
// and LockPresent populated from the check result.
func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobCheck, startedAt)
env := r.resticEnv()
var seq atomic.Int64
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
// Determine check status string.
checkStatus := "ok"
if err != nil {
checkStatus = "failed"
} else if res.ErrorsFound {
checkStatus = "errors_found"
}
lockPresent := res.LockPresent
now := finishedAt
patch := api.RepoStatsPayload{
LastCheckAt: &now,
LastCheckStatus: checkStatus,
LockPresent: &lockPresent,
}
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
}
r.sendFinished(ctx, jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner check: %w", err)
}
return nil
}
// RunUnlock executes a `restic unlock` job. On success it ships a
// repo.stats envelope with LockPresent=false so the UI banner clears.
func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobUnlock, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
if err == nil {
lockFalse := false
patch := api.RepoStatsPayload{LockPresent: &lockFalse}
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
slog.Warn("runner: stats.report after unlock failed", "job_id", jobID, "err", rerr)
}
}
r.sendFinished(ctx, jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner unlock: %w", err)
}
return nil
}
// reportStats ships a repo.stats envelope. If the patch doesn't
// already include size fields, fills them in by invoking env.RunStats.
// Errors from RunStats are non-fatal — the patch is shipped anyway
// with whatever the caller did populate.
func (r *Runner) reportStats(ctx context.Context, env restic.Env, patch api.RepoStatsPayload) error {
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if patch.TotalSizeBytes == nil {
if s, err := env.RunStats(listCtx, nil); err == nil {
total := s.TotalSize
raw := s.TotalUncompressed
files := s.TotalFileCount
snaps := s.SnapshotsCount
patch.TotalSizeBytes = &total
patch.RawSizeBytes = &raw
patch.UniqueFiles = &files
patch.SnapshotCount = &snaps
} else {
slog.Debug("runner: stats refresh failed (non-fatal)", "err", err)
}
}
envOut, err := api.Marshal(api.MsgRepoStats, "", patch)
if err != nil {
return err
}
return r.tx.Send(envOut)
}
// reportSnapshots calls `restic snapshots --json`, translates the
// payload into the wire shape, and ships it as a snapshots.report
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
// hang the runner forever; restic snapshots is normally sub-second.
func (r *Runner) reportSnapshots(ctx context.Context, env restic.Env) error {
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
snaps, err := env.ListSnapshots(listCtx)
if err != nil {
return err
}
out := make([]api.Snapshot, len(snaps))
for i, s := range snaps {
out[i] = api.Snapshot{
ID: s.ID,
ShortID: s.ShortID,
Time: s.Time.UTC(),
Hostname: s.Hostname,
Paths: s.Paths,
Tags: s.Tags,
}
if s.Summary != nil {
out[i].SizeBytes = s.Summary.TotalBytesProcessed
out[i].FileCount = s.Summary.TotalFilesProcessed
}
}
envOut, err := api.Marshal(api.MsgSnapshotsRpt, "", api.SnapshotsReportPayload{
Snapshots: out,
})
if err != nil {
return err
}
return r.tx.Send(envOut)
}
func throughput(bytesDone, secondsElapsed int64) int64 {
if secondsElapsed <= 0 {
return 0
}
return bytesDone / secondsElapsed
}