Files
restic-manager/internal/agent/runner/runner.go
T
steve a110e3c00c agent: secrets fail-loud on corrupt blob + small polish
Save and SaveAdmin now propagate loadBundle errors instead of silently
overwriting a corrupt file (data-loss fix). Tests added for both paths.
reportStats logs a Debug on RunStats failure; r in runJob gets a comment
explaining the prune-runner asymmetry; runner_test comment tightened.
2026-05-04 10:19:15 +01:00

403 lines
12 KiB
Go

// Package runner spawns restic processes for the agent. It owns one
// Run() invocation per command.run; concurrency limits live a layer
// up (the WS handler).
package runner
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync/atomic"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
)
// Sender is the agent's outbound message channel. Provided by
// wsclient so the runner can push job.started / job.progress /
// job.finished / log.stream back to the server.
type Sender interface {
Send(env api.Envelope) error
}
// Config bundles the long-lived settings the runner needs. They come
// from the agent's config file (server-pushed config.update payloads
// override these in memory).
type Config struct {
ResticBin string
RepoURL string
RepoUsername string
RepoPassword string
}
// Runner owns the restic invocations.
type Runner struct {
cfg Config
tx Sender
// progress throttling: we receive a status event from restic
// every ~100ms; the UI doesn't need anywhere near that rate.
// Cap WS sends to one per N (configurable; default 1s).
progressMinPeriod time.Duration
}
// New builds a Runner. progressMinPeriod = 0 uses the default 1s.
func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
if progressMinPeriod <= 0 {
progressMinPeriod = time.Second
}
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
}
// resticEnv builds the shared restic.Env from r.cfg.
func (r *Runner) resticEnv() restic.Env {
return restic.Env{
Bin: r.cfg.ResticBin,
RepoURL: r.cfg.RepoURL,
RepoUsername: r.cfg.RepoUsername,
RepoPassword: r.cfg.RepoPassword,
}
}
// sendStarted ships a job.started envelope.
func (r *Runner) sendStarted(jobID string, kind api.JobKind, startedAt time.Time) {
env, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
JobID: jobID, Kind: kind, StartedAt: startedAt,
})
if err := r.tx.Send(env); err != nil {
slog.Warn("runner: send job.started", "job_id", jobID, "kind", kind, "err", err)
}
}
// streamHandler returns a LineHandler that ships log.stream envelopes.
func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandler {
return func(stream string, line string, _ any) {
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
}
}
// sendFinished ships a job.finished envelope. err==nil → succeeded;
// otherwise failed. statsBlob is forwarded as JobFinishedPayload.Stats.
func (r *Runner) sendFinished(jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) {
status := api.JobSucceeded
exit := 0
errMsg := ""
if err != nil {
status = api.JobFailed
exit = -1
errMsg = err.Error()
}
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
JobID: jobID,
Status: status,
ExitCode: exit,
FinishedAt: finishedAt,
Stats: statsBlob,
Error: errMsg,
})
_ = r.tx.Send(finEnv)
}
// RunBackup executes a backup job and reports back via the sender.
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobBackup, startedAt)
env := r.resticEnv()
var seq atomic.Int64
lastProgress := time.Now()
handle := func(stream string, line string, ev any) {
// Throttled progress events come from restic's `status` JSON.
// We deliberately do NOT forward the raw status line to
// log.stream — it's emitted ~every 16ms by restic --json and
// would drown the live log in dupes for any short backup. The
// progress widget already covers the same information at a
// sane sample rate.
status, isStatus := ev.(restic.BackupStatus)
if !isStatus {
now := time.Now().UTC()
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: jobID,
Seq: seq.Add(1),
TS: now,
Stream: api.LogStream(stream),
Payload: line,
})
_ = r.tx.Send(logEnv)
}
if isStatus {
if time.Since(lastProgress) < r.progressMinPeriod {
return
}
lastProgress = time.Now()
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
JobID: jobID,
PercentDone: status.PercentDone,
FilesDone: status.FilesDone,
TotalFiles: status.TotalFiles,
BytesDone: status.BytesDone,
TotalBytes: status.TotalBytes,
ETASeconds: status.SecondsRem,
ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed),
})
_ = r.tx.Send(progEnv)
}
}
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
finishedAt := time.Now().UTC()
var statsBlob json.RawMessage
if summary != nil {
statsBlob, _ = json.Marshal(summary)
}
r.sendFinished(jobID, finishedAt, err, statsBlob)
// On a successful backup, refresh the server's snapshot projection.
// We do this *after* job.finished so the UI sees the job land first;
// the snapshot list is a follow-up that the host detail page polls
// or the dashboard sees on its next refresh. A failure here is
// logged but doesn't fail the job — the next successful backup will
// catch the projection up.
if err == nil {
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr)
}
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{}); rerr != nil {
slog.Warn("runner: stats.report after backup failed", "job_id", jobID, "err", rerr)
}
}
if err != nil {
return fmt.Errorf("runner backup: %w", err)
}
return nil
}
// RunInit executes a repo-init job and reports back via the sender.
// Returns nil on success. Same envelope shape as RunBackup so the
// browser-side log viewer just works.
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobInit, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunInit(ctx, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
r.sendFinished(jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner init: %w", err)
}
return nil
}
// RunForget executes a forget job against the configured repo with
// the given retention policy. Same envelope shape as RunBackup so
// the live log viewer + job lifecycle work without special-casing.
// On success refreshes the snapshot projection (forget rewrites the
// snapshot index — the host's snapshot list shrinks).
func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.ForgetPolicy) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobForget, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunForget(ctx, policy, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
r.sendFinished(jobID, finishedAt, err, nil)
// Refresh the server's snapshot projection — forget rewrites the
// index so the host's snapshot list almost certainly shrunk.
if err == nil {
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
slog.Warn("runner: snapshots.report after forget failed",
"job_id", jobID, "err", rerr)
}
}
if err != nil {
return fmt.Errorf("runner forget: %w", err)
}
return nil
}
// RunPrune executes a prune job against the configured repo. On
// success it ships a repo.stats envelope with LastPruneAt set (plus
// a full size refresh via RunStats) before the job.finished envelope,
// so the UI can display updated size information alongside the
// completed job. On failure no stats refresh is attempted.
func (r *Runner) RunPrune(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobPrune, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunPrune(ctx, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
if err == nil {
pruneAt := finishedAt
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{LastPruneAt: &pruneAt}); rerr != nil {
slog.Warn("runner: stats.report after prune failed", "job_id", jobID, "err", rerr)
}
}
r.sendFinished(jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner prune: %w", err)
}
return nil
}
// RunCheck executes a `restic check` job. Always ships a repo.stats
// envelope (success or failure) with LastCheckAt, LastCheckStatus,
// and LockPresent populated from the check result.
func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobCheck, startedAt)
env := r.resticEnv()
var seq atomic.Int64
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
// Determine check status string.
checkStatus := "ok"
if err != nil {
checkStatus = "failed"
} else if res.ErrorsFound {
checkStatus = "errors_found"
}
lockPresent := res.LockPresent
now := finishedAt
patch := api.RepoStatsPayload{
LastCheckAt: &now,
LastCheckStatus: checkStatus,
LockPresent: &lockPresent,
}
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
}
r.sendFinished(jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner check: %w", err)
}
return nil
}
// RunUnlock executes a `restic unlock` job. On success it ships a
// repo.stats envelope with LockPresent=false so the UI banner clears.
func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
startedAt := time.Now().UTC()
r.sendStarted(jobID, api.JobUnlock, startedAt)
env := r.resticEnv()
var seq atomic.Int64
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
finishedAt := time.Now().UTC()
if err == nil {
lockFalse := false
patch := api.RepoStatsPayload{LockPresent: &lockFalse}
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
slog.Warn("runner: stats.report after unlock failed", "job_id", jobID, "err", rerr)
}
}
r.sendFinished(jobID, finishedAt, err, nil)
if err != nil {
return fmt.Errorf("runner unlock: %w", err)
}
return nil
}
// reportStats ships a repo.stats envelope. If the patch doesn't
// already include size fields, fills them in by invoking env.RunStats.
// Errors from RunStats are non-fatal — the patch is shipped anyway
// with whatever the caller did populate.
func (r *Runner) reportStats(ctx context.Context, env restic.Env, patch api.RepoStatsPayload) error {
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if patch.TotalSizeBytes == nil {
if s, err := env.RunStats(listCtx, nil); err == nil {
total := s.TotalSize
raw := s.TotalUncompressed
files := s.TotalFileCount
snaps := s.SnapshotsCount
patch.TotalSizeBytes = &total
patch.RawSizeBytes = &raw
patch.UniqueFiles = &files
patch.SnapshotCount = &snaps
} else {
slog.Debug("runner: stats refresh failed (non-fatal)", "err", err)
}
}
envOut, err := api.Marshal(api.MsgRepoStats, "", patch)
if err != nil {
return err
}
return r.tx.Send(envOut)
}
// reportSnapshots calls `restic snapshots --json`, translates the
// payload into the wire shape, and ships it as a snapshots.report
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
// hang the runner forever; restic snapshots is normally sub-second.
func (r *Runner) reportSnapshots(ctx context.Context, env restic.Env) error {
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
snaps, err := env.ListSnapshots(listCtx)
if err != nil {
return err
}
out := make([]api.Snapshot, len(snaps))
for i, s := range snaps {
out[i] = api.Snapshot{
ID: s.ID,
ShortID: s.ShortID,
Time: s.Time.UTC(),
Hostname: s.Hostname,
Paths: s.Paths,
Tags: s.Tags,
}
if s.Summary != nil {
out[i].SizeBytes = s.Summary.TotalBytesProcessed
out[i].FileCount = s.Summary.TotalFilesProcessed
}
}
envOut, err := api.Marshal(api.MsgSnapshotsRpt, "", api.SnapshotsReportPayload{
Snapshots: out,
})
if err != nil {
return err
}
return r.tx.Send(envOut)
}
func throughput(bytesDone, secondsElapsed int64) int64 {
if secondsElapsed <= 0 {
return 0
}
return bytesDone / secondsElapsed
}