a781e95c94
Three small follow-ups from review:
1. Restore target is now operator-editable. Default value is the
literal '\$HOME/rm-restore/<job-id>/' (agent expands \$HOME at
run time using os.UserHomeDir(); also handles \${HOME} and ~/
prefixes). Operator can replace with any absolute path.
- ui_restore.go validates the input is either absolute or starts
with one of the recognised prefixes; other env-var refs (\$PATH
etc.) are deliberately rejected so operator paths can't pick up
arbitrary agent env values.
- host_restore.html replaces the read-only mono-text display with
a real <input>; help text spells out that \$HOME resolves
agent-side and <job-id> is substituted on dispatch.
- install.sh + the systemd unit prep /root/rm-restore so the
default works under the sandbox: ReadWritePaths gains a soft
'-/root/rm-restore' entry (the '-' makes the bind-mount soft-fail
if missing, but install.sh pre-creates it root-owned 0700).
2. --no-ownership flag now gated on restic version. The flag was
added in restic 0.17 and 0.16 rejects it. Previously dropped it
wholesale — that meant new-dir restores silently preserved
ownership against design intent on 0.17+. Now the agent threads
its detected restic version (sysinfo already collects it) through
runner.Config -> restic.Env, and RunRestore appends --no-ownership
only when AtLeastVersion(0, 17) returns true. 0.16 hosts still
restore with original uid/gid; help text in the wizard explicitly
notes this. The previous 'Original ownership is preserved' copy
was wrong for new-dir mode and is corrected.
3. golangci-lint misspell locale switched US -> UK and the codebase
swept (73 corrections, mostly behaviour/serialise/recognise/honour).
Wire-format ErrorCode 'unauthorized' -> 'unauthorised' is a tiny
contract change but the agent doesn't parse those codes today and
no external API consumers exist yet. Tests passed before + after.
Tests:
- internal/restic/version_test.go covers Env.AtLeastVersion across
edge cases (empty, exact match, patch above, minor below, non-
numeric) and expandHome on \$HOME / \${HOME} / ~/, plus
pass-through for absolute paths and refusal of other env vars.
- ui_restore_test updated: TargetDir now starts '\$HOME/rm-restore/'
with the job_id substituted into the placeholder.
Live verified on the smoke env: default target restored to
/root/rm-restore/<job-id>/ as the agent's expanded \$HOME (2 files,
14 bytes); custom override '/tmp/custom-restore/<job-id>/' restored
into the agent's PrivateTmp namespace (1 file, 6 bytes); both jobs
'succeeded', exit 0.
556 lines
18 KiB
Go
556 lines
18 KiB
Go
// Package runner spawns restic processes for the agent. It owns one
|
|
// Run() invocation per command.run; concurrency limits live a layer
|
|
// up (the WS handler).
|
|
package runner
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
|
|
)
|
|
|
|
// Sender is the agent's outbound message channel. Provided by
|
|
// wsclient so the runner can push job.started / job.progress /
|
|
// job.finished / log.stream back to the server.
|
|
type Sender interface {
|
|
Send(env api.Envelope) error
|
|
}
|
|
|
|
// Config bundles the long-lived settings the runner needs. They come
|
|
// from the agent's config file (server-pushed config.update payloads
|
|
// override these in memory).
|
|
type Config struct {
|
|
ResticBin string
|
|
ResticVersion string // e.g. "0.17.1" — empty if unknown
|
|
RepoURL string
|
|
RepoUsername string
|
|
RepoPassword string
|
|
|
|
// Bandwidth caps in KB/s applied to every restic invocation.
|
|
// <=0 means "no cap". Per-job override: callers that build a
|
|
// runner per-dispatch can pass the override value here directly.
|
|
LimitUploadKBps int
|
|
LimitDownloadKBps int
|
|
}
|
|
|
|
// Runner owns the restic invocations.
|
|
type Runner struct {
|
|
cfg Config
|
|
tx Sender
|
|
|
|
// progress throttling: we receive a status event from restic
|
|
// every ~100ms; the UI doesn't need anywhere near that rate.
|
|
// Cap WS sends to one per N (configurable; default 1s).
|
|
progressMinPeriod time.Duration
|
|
}
|
|
|
|
// New builds a Runner. progressMinPeriod = 0 uses the default 1s.
|
|
func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
|
|
if progressMinPeriod <= 0 {
|
|
progressMinPeriod = time.Second
|
|
}
|
|
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
|
|
}
|
|
|
|
// resticEnv builds the shared restic.Env from r.cfg.
|
|
func (r *Runner) resticEnv() restic.Env {
|
|
return restic.Env{
|
|
Bin: r.cfg.ResticBin,
|
|
Version: r.cfg.ResticVersion,
|
|
RepoURL: r.cfg.RepoURL,
|
|
RepoUsername: r.cfg.RepoUsername,
|
|
RepoPassword: r.cfg.RepoPassword,
|
|
LimitUploadKBps: r.cfg.LimitUploadKBps,
|
|
LimitDownloadKBps: r.cfg.LimitDownloadKBps,
|
|
}
|
|
}
|
|
|
|
// sendStarted ships a job.started envelope.
|
|
func (r *Runner) sendStarted(jobID string, kind api.JobKind, startedAt time.Time) {
|
|
env, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
|
JobID: jobID, Kind: kind, StartedAt: startedAt,
|
|
})
|
|
if err := r.tx.Send(env); err != nil {
|
|
slog.Warn("runner: send job.started", "job_id", jobID, "kind", kind, "err", err)
|
|
}
|
|
}
|
|
|
|
// streamHandler returns a LineHandler that ships log.stream envelopes.
|
|
func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandler {
|
|
return func(stream string, line string, _ any) {
|
|
now := time.Now().UTC()
|
|
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
|
JobID: jobID,
|
|
Seq: seq.Add(1),
|
|
TS: now,
|
|
Stream: api.LogStream(stream),
|
|
Payload: line,
|
|
})
|
|
_ = r.tx.Send(logEnv)
|
|
}
|
|
}
|
|
|
|
// sendFinished ships a job.finished envelope. err==nil → succeeded;
|
|
// otherwise failed (or canceled if ctx was canceled — operator
|
|
// hit the Cancel button or the agent is shutting down).
|
|
// statsBlob is forwarded as JobFinishedPayload.Stats.
|
|
func (r *Runner) sendFinished(ctx context.Context, jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) {
|
|
status := api.JobSucceeded
|
|
exit := 0
|
|
errMsg := ""
|
|
if err != nil {
|
|
status = api.JobFailed
|
|
exit = -1
|
|
errMsg = err.Error()
|
|
// If the context was canceled, the failure is operator-driven
|
|
// (or shutdown). Surface as JobCancelled so the UI shows a
|
|
// neutral "canceled" state rather than a red "failed" one.
|
|
// exec.CommandContext returns the process's exit error on
|
|
// ctx-cancel, which we'd otherwise rebadge as failed.
|
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
|
status = api.JobCancelled
|
|
exit = 130 // POSIX convention for SIGINT/SIGTERM-killed
|
|
errMsg = "" // no need to surface the underlying restic error
|
|
}
|
|
}
|
|
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
|
JobID: jobID,
|
|
Status: status,
|
|
ExitCode: exit,
|
|
FinishedAt: finishedAt,
|
|
Stats: statsBlob,
|
|
Error: errMsg,
|
|
})
|
|
_ = r.tx.Send(finEnv)
|
|
}
|
|
|
|
// BackupHooks bundles the optional pre/post shell snippets that fire
|
|
// around a backup. Empty fields skip that phase. Resolved server-side
|
|
// (group → host default) before dispatch; the agent just executes
|
|
// whatever arrives in the payload.
|
|
type BackupHooks struct {
|
|
Pre string
|
|
Post string
|
|
}
|
|
|
|
// RunBackup executes a backup job and reports back via the sender.
|
|
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
|
|
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string, hooks BackupHooks) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobBackup, startedAt)
|
|
|
|
var seq atomic.Int64
|
|
|
|
// pre_hook: non-zero exit aborts the backup. The job is recorded
|
|
// as failed with the hook's error and restic never runs.
|
|
if hooks.Pre != "" {
|
|
if err := r.runHook(ctx, jobID, "pre", hooks.Pre, "", &seq); err != nil {
|
|
finishedAt := time.Now().UTC()
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
return fmt.Errorf("pre_hook failed: %w", err)
|
|
}
|
|
}
|
|
|
|
env := r.resticEnv()
|
|
lastProgress := time.Time{} // zero time → first status event always emits
|
|
|
|
handle := func(stream string, line string, ev any) {
|
|
// Throttled progress events come from restic's `status` JSON.
|
|
// We deliberately do NOT forward the raw status line to
|
|
// log.stream — it's emitted ~every 16ms by restic --json and
|
|
// would drown the live log in dupes for any short backup. The
|
|
// progress widget already covers the same information at a
|
|
// sane sample rate.
|
|
status, isStatus := ev.(restic.BackupStatus)
|
|
if !isStatus {
|
|
now := time.Now().UTC()
|
|
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
|
JobID: jobID,
|
|
Seq: seq.Add(1),
|
|
TS: now,
|
|
Stream: api.LogStream(stream),
|
|
Payload: line,
|
|
})
|
|
_ = r.tx.Send(logEnv)
|
|
}
|
|
|
|
if isStatus {
|
|
if time.Since(lastProgress) < r.progressMinPeriod {
|
|
return
|
|
}
|
|
lastProgress = time.Now()
|
|
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
|
|
JobID: jobID,
|
|
PercentDone: status.PercentDone,
|
|
FilesDone: status.FilesDone,
|
|
TotalFiles: status.TotalFiles,
|
|
BytesDone: status.BytesDone,
|
|
TotalBytes: status.TotalBytes,
|
|
ETASeconds: status.SecondsRem,
|
|
ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed),
|
|
})
|
|
_ = r.tx.Send(progEnv)
|
|
}
|
|
}
|
|
|
|
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
|
|
finishedAt := time.Now().UTC()
|
|
|
|
var statsBlob json.RawMessage
|
|
if summary != nil {
|
|
statsBlob, _ = json.Marshal(summary)
|
|
}
|
|
|
|
// post_hook: always runs regardless of backup outcome. Receives
|
|
// RM_JOB_STATUS=succeeded|failed in env. Non-zero exit is logged
|
|
// but does not change the recorded job status.
|
|
if hooks.Post != "" {
|
|
status := "succeeded"
|
|
if err != nil {
|
|
status = "failed"
|
|
}
|
|
if perr := r.runHook(ctx, jobID, "post", hooks.Post, status, &seq); perr != nil {
|
|
slog.Warn("runner: post_hook exited non-zero", "job_id", jobID, "err", perr)
|
|
}
|
|
}
|
|
|
|
r.sendFinished(ctx, jobID, finishedAt, err, statsBlob)
|
|
|
|
// On a successful backup, refresh the server's snapshot projection.
|
|
// We do this *after* job.finished so the UI sees the job land first;
|
|
// the snapshot list is a follow-up that the host detail page polls
|
|
// or the dashboard sees on its next refresh. A failure here is
|
|
// logged but doesn't fail the job — the next successful backup will
|
|
// catch the projection up.
|
|
if err == nil {
|
|
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
|
|
slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr)
|
|
}
|
|
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{}); rerr != nil {
|
|
slog.Warn("runner: stats.report after backup failed", "job_id", jobID, "err", rerr)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner backup: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunInit executes a repo-init job and reports back via the sender.
|
|
// Returns nil on success. Same envelope shape as RunBackup so the
|
|
// browser-side log viewer just works.
|
|
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobInit, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
err := env.RunInit(ctx, r.streamHandler(jobID, &seq))
|
|
finishedAt := time.Now().UTC()
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("runner init: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunForget executes a forget job against the configured repo by
|
|
// invoking `restic forget --tag <Tag> --keep-* …` once per group.
|
|
// Same envelope shape as RunBackup so the live log viewer + job
|
|
// lifecycle work without special-casing. On success refreshes the
|
|
// snapshot projection (forget rewrites the snapshot index — the
|
|
// host's snapshot list shrinks). Snapshot refresh runs once after
|
|
// every group completes, not per-group.
|
|
func (r *Runner) RunForget(ctx context.Context, jobID string, groups []restic.ForgetGroup) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobForget, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
err := env.RunForget(ctx, groups, r.streamHandler(jobID, &seq))
|
|
finishedAt := time.Now().UTC()
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
|
|
// Refresh the server's snapshot projection — forget rewrites the
|
|
// index so the host's snapshot list almost certainly shrunk.
|
|
if err == nil {
|
|
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
|
|
slog.Warn("runner: snapshots.report after forget failed",
|
|
"job_id", jobID, "err", rerr)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner forget: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunPrune executes a prune job against the configured repo. On
|
|
// success it ships a repo.stats envelope with LastPruneAt set (plus
|
|
// a full size refresh via RunStats) before the job.finished envelope,
|
|
// so the UI can display updated size information alongside the
|
|
// completed job. On failure no stats refresh is attempted.
|
|
func (r *Runner) RunPrune(ctx context.Context, jobID string) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobPrune, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
err := env.RunPrune(ctx, r.streamHandler(jobID, &seq))
|
|
finishedAt := time.Now().UTC()
|
|
|
|
if err == nil {
|
|
pruneAt := finishedAt
|
|
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{LastPruneAt: &pruneAt}); rerr != nil {
|
|
slog.Warn("runner: stats.report after prune failed", "job_id", jobID, "err", rerr)
|
|
}
|
|
}
|
|
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner prune: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunCheck executes a `restic check` job. Always ships a repo.stats
|
|
// envelope (success or failure) with LastCheckAt, LastCheckStatus,
|
|
// and LockPresent populated from the check result.
|
|
func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobCheck, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
|
|
finishedAt := time.Now().UTC()
|
|
|
|
// Determine check status string.
|
|
checkStatus := "ok"
|
|
if err != nil {
|
|
checkStatus = "failed"
|
|
} else if res.ErrorsFound {
|
|
checkStatus = "errors_found"
|
|
}
|
|
|
|
lockPresent := res.LockPresent
|
|
now := finishedAt
|
|
patch := api.RepoStatsPayload{
|
|
LastCheckAt: &now,
|
|
LastCheckStatus: checkStatus,
|
|
LockPresent: &lockPresent,
|
|
}
|
|
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
|
|
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
|
|
}
|
|
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner check: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunRestore executes a restic restore job and reports back via the
|
|
// sender. paths is the operator-selected file/dir list to restore.
|
|
// inPlace=true preserves uid/gid/mode and writes at "/"; inPlace=false
|
|
// writes at targetDir with --no-ownership.
|
|
//
|
|
// Status events from restic are throttled into job.progress in the
|
|
// same shape as backup; raw status lines are dropped from log.stream
|
|
// (they would drown the log on a fast restore — the progress widget
|
|
// already covers them).
|
|
func (r *Runner) RunRestore(ctx context.Context, jobID, snapshotID string, paths []string, inPlace bool, targetDir string) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobRestore, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
lastProgress := time.Time{} // zero time → first status event always emits
|
|
|
|
handle := func(stream string, line string, ev any) {
|
|
status, isStatus := ev.(restic.RestoreStatus)
|
|
if !isStatus {
|
|
now := time.Now().UTC()
|
|
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
|
JobID: jobID,
|
|
Seq: seq.Add(1),
|
|
TS: now,
|
|
Stream: api.LogStream(stream),
|
|
Payload: line,
|
|
})
|
|
_ = r.tx.Send(logEnv)
|
|
}
|
|
if isStatus {
|
|
if time.Since(lastProgress) < r.progressMinPeriod {
|
|
return
|
|
}
|
|
lastProgress = time.Now()
|
|
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
|
|
JobID: jobID,
|
|
PercentDone: status.PercentDone,
|
|
FilesDone: status.FilesRestored,
|
|
TotalFiles: status.TotalFiles,
|
|
BytesDone: status.BytesRestored,
|
|
TotalBytes: status.TotalBytes,
|
|
ETASeconds: estimateETA(status.BytesRestored, status.TotalBytes, status.SecondsElapsed),
|
|
ThroughputBps: throughput(status.BytesRestored, status.SecondsElapsed),
|
|
})
|
|
_ = r.tx.Send(progEnv)
|
|
}
|
|
}
|
|
|
|
summary, err := env.RunRestore(ctx, snapshotID, paths, inPlace, targetDir, handle)
|
|
finishedAt := time.Now().UTC()
|
|
|
|
var statsBlob json.RawMessage
|
|
if summary != nil {
|
|
statsBlob, _ = json.Marshal(summary)
|
|
}
|
|
r.sendFinished(ctx, jobID, finishedAt, err, statsBlob)
|
|
if err != nil {
|
|
return fmt.Errorf("runner restore: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// estimateETA computes an ETA in seconds based on current bytes
|
|
// progress + elapsed seconds. Restic restore's --json doesn't emit an
|
|
// ETA field of its own (unlike backup), so we approximate by linear
|
|
// extrapolation. Returns 0 when we don't have enough data.
|
|
func estimateETA(bytesDone, totalBytes, secondsElapsed int64) int64 {
|
|
if bytesDone <= 0 || totalBytes <= 0 || secondsElapsed <= 0 || bytesDone >= totalBytes {
|
|
return 0
|
|
}
|
|
rate := float64(bytesDone) / float64(secondsElapsed)
|
|
if rate <= 0 {
|
|
return 0
|
|
}
|
|
return int64(float64(totalBytes-bytesDone) / rate)
|
|
}
|
|
|
|
// RunDiff executes `restic diff --json <a> <b>` and forwards output
|
|
// as log.stream lines. No snapshot-list refresh, no stats update —
|
|
// diff is purely informational.
|
|
func (r *Runner) RunDiff(ctx context.Context, jobID, snapshotA, snapshotB string) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobDiff, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
err := env.RunDiff(ctx, snapshotA, snapshotB, r.streamHandler(jobID, &seq))
|
|
finishedAt := time.Now().UTC()
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("runner diff: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunUnlock executes a `restic unlock` job. On success it ships a
|
|
// repo.stats envelope with LockPresent=false so the UI banner clears.
|
|
func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
|
startedAt := time.Now().UTC()
|
|
r.sendStarted(jobID, api.JobUnlock, startedAt)
|
|
|
|
env := r.resticEnv()
|
|
var seq atomic.Int64
|
|
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
|
|
finishedAt := time.Now().UTC()
|
|
|
|
if err == nil {
|
|
lockFalse := false
|
|
patch := api.RepoStatsPayload{LockPresent: &lockFalse}
|
|
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
|
|
slog.Warn("runner: stats.report after unlock failed", "job_id", jobID, "err", rerr)
|
|
}
|
|
}
|
|
|
|
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner unlock: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// reportStats ships a repo.stats envelope. If the patch doesn't
|
|
// already include size fields, fills them in by invoking env.RunStats.
|
|
// Errors from RunStats are non-fatal — the patch is shipped anyway
|
|
// with whatever the caller did populate.
|
|
func (r *Runner) reportStats(ctx context.Context, env restic.Env, patch api.RepoStatsPayload) error {
|
|
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
if patch.TotalSizeBytes == nil {
|
|
if s, err := env.RunStats(listCtx, nil); err == nil {
|
|
total := s.TotalSize
|
|
raw := s.TotalUncompressed
|
|
files := s.TotalFileCount
|
|
snaps := s.SnapshotsCount
|
|
patch.TotalSizeBytes = &total
|
|
patch.RawSizeBytes = &raw
|
|
patch.UniqueFiles = &files
|
|
patch.SnapshotCount = &snaps
|
|
} else {
|
|
slog.Debug("runner: stats refresh failed (non-fatal)", "err", err)
|
|
}
|
|
}
|
|
envOut, err := api.Marshal(api.MsgRepoStats, "", patch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.tx.Send(envOut)
|
|
}
|
|
|
|
// reportSnapshots calls `restic snapshots --json`, translates the
|
|
// payload into the wire shape, and ships it as a snapshots.report
|
|
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
|
|
// hang the runner forever; restic snapshots is normally sub-second.
|
|
func (r *Runner) reportSnapshots(ctx context.Context, env restic.Env) error {
|
|
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
snaps, err := env.ListSnapshots(listCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
out := make([]api.Snapshot, len(snaps))
|
|
for i, s := range snaps {
|
|
out[i] = api.Snapshot{
|
|
ID: s.ID,
|
|
ShortID: s.ShortID,
|
|
Time: s.Time.UTC(),
|
|
Hostname: s.Hostname,
|
|
Paths: s.Paths,
|
|
Tags: s.Tags,
|
|
}
|
|
if s.Summary != nil {
|
|
out[i].SizeBytes = s.Summary.TotalBytesProcessed
|
|
out[i].FileCount = s.Summary.TotalFilesProcessed
|
|
}
|
|
}
|
|
envOut, err := api.Marshal(api.MsgSnapshotsRpt, "", api.SnapshotsReportPayload{
|
|
Snapshots: out,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.tx.Send(envOut)
|
|
}
|
|
|
|
func throughput(bytesDone, secondsElapsed int64) int64 {
|
|
if secondsElapsed <= 0 {
|
|
return 0
|
|
}
|
|
return bytesDone / secondsElapsed
|
|
}
|