// Package runner spawns restic processes for the agent. It owns one // Run() invocation per command.run; concurrency limits live a layer // up (the WS handler). package runner import ( "context" "encoding/json" "fmt" "log/slog" "sync/atomic" "time" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/restic" ) // Sender is the agent's outbound message channel. Provided by // wsclient so the runner can push job.started / job.progress / // job.finished / log.stream back to the server. type Sender interface { Send(env api.Envelope) error } // Config bundles the long-lived settings the runner needs. They come // from the agent's config file (server-pushed config.update payloads // override these in memory). type Config struct { ResticBin string RepoURL string RepoUsername string RepoPassword string // Bandwidth caps in KB/s applied to every restic invocation. // <=0 means "no cap". Per-job override: callers that build a // runner per-dispatch can pass the override value here directly. LimitUploadKBps int LimitDownloadKBps int } // Runner owns the restic invocations. type Runner struct { cfg Config tx Sender // progress throttling: we receive a status event from restic // every ~100ms; the UI doesn't need anywhere near that rate. // Cap WS sends to one per N (configurable; default 1s). progressMinPeriod time.Duration } // New builds a Runner. progressMinPeriod = 0 uses the default 1s. func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner { if progressMinPeriod <= 0 { progressMinPeriod = time.Second } return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod} } // resticEnv builds the shared restic.Env from r.cfg. func (r *Runner) resticEnv() restic.Env { return restic.Env{ Bin: r.cfg.ResticBin, RepoURL: r.cfg.RepoURL, RepoUsername: r.cfg.RepoUsername, RepoPassword: r.cfg.RepoPassword, LimitUploadKBps: r.cfg.LimitUploadKBps, LimitDownloadKBps: r.cfg.LimitDownloadKBps, } } // sendStarted ships a job.started envelope. func (r *Runner) sendStarted(jobID string, kind api.JobKind, startedAt time.Time) { env, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{ JobID: jobID, Kind: kind, StartedAt: startedAt, }) if err := r.tx.Send(env); err != nil { slog.Warn("runner: send job.started", "job_id", jobID, "kind", kind, "err", err) } } // streamHandler returns a LineHandler that ships log.stream envelopes. func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandler { return func(stream string, line string, _ any) { now := time.Now().UTC() logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ JobID: jobID, Seq: seq.Add(1), TS: now, Stream: api.LogStream(stream), Payload: line, }) _ = r.tx.Send(logEnv) } } // sendFinished ships a job.finished envelope. err==nil → succeeded; // otherwise failed (or canceled if ctx was canceled — operator // hit the Cancel button or the agent is shutting down). // statsBlob is forwarded as JobFinishedPayload.Stats. func (r *Runner) sendFinished(ctx context.Context, jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) { status := api.JobSucceeded exit := 0 errMsg := "" if err != nil { status = api.JobFailed exit = -1 errMsg = err.Error() // If the context was canceled, the failure is operator-driven // (or shutdown). Surface as JobCancelled so the UI shows a // neutral "canceled" state rather than a red "failed" one. // exec.CommandContext returns the process's exit error on // ctx-cancel, which we'd otherwise rebadge as failed. if ctxErr := ctx.Err(); ctxErr != nil { status = api.JobCancelled exit = 130 // POSIX convention for SIGINT/SIGTERM-killed errMsg = "" // no need to surface the underlying restic error } } finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ JobID: jobID, Status: status, ExitCode: exit, FinishedAt: finishedAt, Stats: statsBlob, Error: errMsg, }) _ = r.tx.Send(finEnv) } // BackupHooks bundles the optional pre/post shell snippets that fire // around a backup. Empty fields skip that phase. Resolved server-side // (group → host default) before dispatch; the agent just executes // whatever arrives in the payload. type BackupHooks struct { Pre string Post string } // RunBackup executes a backup job and reports back via the sender. // Returns nil on a clean (or "incomplete-but-snapshot-created") finish. func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string, hooks BackupHooks) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobBackup, startedAt) var seq atomic.Int64 // pre_hook: non-zero exit aborts the backup. The job is recorded // as failed with the hook's error and restic never runs. if hooks.Pre != "" { if err := r.runHook(ctx, jobID, "pre", hooks.Pre, "", &seq); err != nil { finishedAt := time.Now().UTC() r.sendFinished(ctx, jobID, finishedAt, err, nil) return fmt.Errorf("pre_hook failed: %w", err) } } env := r.resticEnv() lastProgress := time.Time{} // zero time → first status event always emits handle := func(stream string, line string, ev any) { // Throttled progress events come from restic's `status` JSON. // We deliberately do NOT forward the raw status line to // log.stream — it's emitted ~every 16ms by restic --json and // would drown the live log in dupes for any short backup. The // progress widget already covers the same information at a // sane sample rate. status, isStatus := ev.(restic.BackupStatus) if !isStatus { now := time.Now().UTC() logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ JobID: jobID, Seq: seq.Add(1), TS: now, Stream: api.LogStream(stream), Payload: line, }) _ = r.tx.Send(logEnv) } if isStatus { if time.Since(lastProgress) < r.progressMinPeriod { return } lastProgress = time.Now() progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{ JobID: jobID, PercentDone: status.PercentDone, FilesDone: status.FilesDone, TotalFiles: status.TotalFiles, BytesDone: status.BytesDone, TotalBytes: status.TotalBytes, ETASeconds: status.SecondsRem, ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed), }) _ = r.tx.Send(progEnv) } } summary, err := env.RunBackup(ctx, paths, excludes, tags, handle) finishedAt := time.Now().UTC() var statsBlob json.RawMessage if summary != nil { statsBlob, _ = json.Marshal(summary) } // post_hook: always runs regardless of backup outcome. Receives // RM_JOB_STATUS=succeeded|failed in env. Non-zero exit is logged // but does not change the recorded job status. if hooks.Post != "" { status := "succeeded" if err != nil { status = "failed" } if perr := r.runHook(ctx, jobID, "post", hooks.Post, status, &seq); perr != nil { slog.Warn("runner: post_hook exited non-zero", "job_id", jobID, "err", perr) } } r.sendFinished(ctx, jobID, finishedAt, err, statsBlob) // On a successful backup, refresh the server's snapshot projection. // We do this *after* job.finished so the UI sees the job land first; // the snapshot list is a follow-up that the host detail page polls // or the dashboard sees on its next refresh. A failure here is // logged but doesn't fail the job — the next successful backup will // catch the projection up. if err == nil { if rerr := r.reportSnapshots(ctx, env); rerr != nil { slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr) } if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{}); rerr != nil { slog.Warn("runner: stats.report after backup failed", "job_id", jobID, "err", rerr) } } if err != nil { return fmt.Errorf("runner backup: %w", err) } return nil } // RunInit executes a repo-init job and reports back via the sender. // Returns nil on success. Same envelope shape as RunBackup so the // browser-side log viewer just works. func (r *Runner) RunInit(ctx context.Context, jobID string) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobInit, startedAt) env := r.resticEnv() var seq atomic.Int64 err := env.RunInit(ctx, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner init: %w", err) } return nil } // RunForget executes a forget job against the configured repo by // invoking `restic forget --tag --keep-* …` once per group. // Same envelope shape as RunBackup so the live log viewer + job // lifecycle work without special-casing. On success refreshes the // snapshot projection (forget rewrites the snapshot index — the // host's snapshot list shrinks). Snapshot refresh runs once after // every group completes, not per-group. func (r *Runner) RunForget(ctx context.Context, jobID string, groups []restic.ForgetGroup) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobForget, startedAt) env := r.resticEnv() var seq atomic.Int64 err := env.RunForget(ctx, groups, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() r.sendFinished(ctx, jobID, finishedAt, err, nil) // Refresh the server's snapshot projection — forget rewrites the // index so the host's snapshot list almost certainly shrunk. if err == nil { if rerr := r.reportSnapshots(ctx, env); rerr != nil { slog.Warn("runner: snapshots.report after forget failed", "job_id", jobID, "err", rerr) } } if err != nil { return fmt.Errorf("runner forget: %w", err) } return nil } // RunPrune executes a prune job against the configured repo. On // success it ships a repo.stats envelope with LastPruneAt set (plus // a full size refresh via RunStats) before the job.finished envelope, // so the UI can display updated size information alongside the // completed job. On failure no stats refresh is attempted. func (r *Runner) RunPrune(ctx context.Context, jobID string) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobPrune, startedAt) env := r.resticEnv() var seq atomic.Int64 err := env.RunPrune(ctx, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() if err == nil { pruneAt := finishedAt if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{LastPruneAt: &pruneAt}); rerr != nil { slog.Warn("runner: stats.report after prune failed", "job_id", jobID, "err", rerr) } } r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner prune: %w", err) } return nil } // RunCheck executes a `restic check` job. Always ships a repo.stats // envelope (success or failure) with LastCheckAt, LastCheckStatus, // and LockPresent populated from the check result. func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobCheck, startedAt) env := r.resticEnv() var seq atomic.Int64 res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() // Determine check status string. checkStatus := "ok" if err != nil { checkStatus = "failed" } else if res.ErrorsFound { checkStatus = "errors_found" } lockPresent := res.LockPresent now := finishedAt patch := api.RepoStatsPayload{ LastCheckAt: &now, LastCheckStatus: checkStatus, LockPresent: &lockPresent, } if rerr := r.reportStats(ctx, env, patch); rerr != nil { slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr) } r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner check: %w", err) } return nil } // RunRestore executes a restic restore job and reports back via the // sender. paths is the operator-selected file/dir list to restore. // inPlace=true preserves uid/gid/mode and writes at "/"; inPlace=false // writes at targetDir with --no-ownership. // // Status events from restic are throttled into job.progress in the // same shape as backup; raw status lines are dropped from log.stream // (they would drown the log on a fast restore — the progress widget // already covers them). func (r *Runner) RunRestore(ctx context.Context, jobID, snapshotID string, paths []string, inPlace bool, targetDir string) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobRestore, startedAt) env := r.resticEnv() var seq atomic.Int64 lastProgress := time.Time{} // zero time → first status event always emits handle := func(stream string, line string, ev any) { status, isStatus := ev.(restic.RestoreStatus) if !isStatus { now := time.Now().UTC() logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ JobID: jobID, Seq: seq.Add(1), TS: now, Stream: api.LogStream(stream), Payload: line, }) _ = r.tx.Send(logEnv) } if isStatus { if time.Since(lastProgress) < r.progressMinPeriod { return } lastProgress = time.Now() progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{ JobID: jobID, PercentDone: status.PercentDone, FilesDone: status.FilesRestored, TotalFiles: status.TotalFiles, BytesDone: status.BytesRestored, TotalBytes: status.TotalBytes, ETASeconds: estimateETA(status.BytesRestored, status.TotalBytes, status.SecondsElapsed), ThroughputBps: throughput(status.BytesRestored, status.SecondsElapsed), }) _ = r.tx.Send(progEnv) } } summary, err := env.RunRestore(ctx, snapshotID, paths, inPlace, targetDir, handle) finishedAt := time.Now().UTC() var statsBlob json.RawMessage if summary != nil { statsBlob, _ = json.Marshal(summary) } r.sendFinished(ctx, jobID, finishedAt, err, statsBlob) if err != nil { return fmt.Errorf("runner restore: %w", err) } return nil } // estimateETA computes an ETA in seconds based on current bytes // progress + elapsed seconds. Restic restore's --json doesn't emit an // ETA field of its own (unlike backup), so we approximate by linear // extrapolation. Returns 0 when we don't have enough data. func estimateETA(bytesDone, totalBytes, secondsElapsed int64) int64 { if bytesDone <= 0 || totalBytes <= 0 || secondsElapsed <= 0 || bytesDone >= totalBytes { return 0 } rate := float64(bytesDone) / float64(secondsElapsed) if rate <= 0 { return 0 } return int64(float64(totalBytes-bytesDone) / rate) } // RunDiff executes `restic diff --json ` and forwards output // as log.stream lines. No snapshot-list refresh, no stats update — // diff is purely informational. func (r *Runner) RunDiff(ctx context.Context, jobID, snapshotA, snapshotB string) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobDiff, startedAt) env := r.resticEnv() var seq atomic.Int64 err := env.RunDiff(ctx, snapshotA, snapshotB, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner diff: %w", err) } return nil } // RunUnlock executes a `restic unlock` job. On success it ships a // repo.stats envelope with LockPresent=false so the UI banner clears. func (r *Runner) RunUnlock(ctx context.Context, jobID string) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobUnlock, startedAt) env := r.resticEnv() var seq atomic.Int64 err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() if err == nil { lockFalse := false patch := api.RepoStatsPayload{LockPresent: &lockFalse} if rerr := r.reportStats(ctx, env, patch); rerr != nil { slog.Warn("runner: stats.report after unlock failed", "job_id", jobID, "err", rerr) } } r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner unlock: %w", err) } return nil } // reportStats ships a repo.stats envelope. If the patch doesn't // already include size fields, fills them in by invoking env.RunStats. // Errors from RunStats are non-fatal — the patch is shipped anyway // with whatever the caller did populate. func (r *Runner) reportStats(ctx context.Context, env restic.Env, patch api.RepoStatsPayload) error { listCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() if patch.TotalSizeBytes == nil { if s, err := env.RunStats(listCtx, nil); err == nil { total := s.TotalSize raw := s.TotalUncompressed files := s.TotalFileCount snaps := s.SnapshotsCount patch.TotalSizeBytes = &total patch.RawSizeBytes = &raw patch.UniqueFiles = &files patch.SnapshotCount = &snaps } else { slog.Debug("runner: stats refresh failed (non-fatal)", "err", err) } } envOut, err := api.Marshal(api.MsgRepoStats, "", patch) if err != nil { return err } return r.tx.Send(envOut) } // reportSnapshots calls `restic snapshots --json`, translates the // payload into the wire shape, and ships it as a snapshots.report // envelope. Bounded by a separate timeout so a sluggish repo doesn't // hang the runner forever; restic snapshots is normally sub-second. func (r *Runner) reportSnapshots(ctx context.Context, env restic.Env) error { listCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() snaps, err := env.ListSnapshots(listCtx) if err != nil { return err } out := make([]api.Snapshot, len(snaps)) for i, s := range snaps { out[i] = api.Snapshot{ ID: s.ID, ShortID: s.ShortID, Time: s.Time.UTC(), Hostname: s.Hostname, Paths: s.Paths, Tags: s.Tags, } if s.Summary != nil { out[i].SizeBytes = s.Summary.TotalBytesProcessed out[i].FileCount = s.Summary.TotalFilesProcessed } } envOut, err := api.Marshal(api.MsgSnapshotsRpt, "", api.SnapshotsReportPayload{ Snapshots: out, }) if err != nil { return err } return r.tx.Send(envOut) } func throughput(bytesDone, secondsElapsed int64) int64 { if secondsElapsed <= 0 { return 0 } return bytesDone / secondsElapsed }