P3-03: restic restore + diff execution path
Wires JobRestore and JobDiff end-to-end at the agent layer (the wizard
backend that drives this lands in the next slice).
- internal/api: JobRestore + JobDiff JobKind constants. CommandRunPayload
grows nullable Restore + Diff sub-payloads. RestorePayload carries
snapshot_id, paths, in_place, target_dir; DiffPayload carries
snapshot_a + snapshot_b.
- internal/restic.RunRestore wraps 'restic restore <sid> --target ...
[--no-ownership] [--include p]...' with --json. New pumpRestoreStdout
parses the per-line status / summary objects (drops raw status from
log.stream — the throttled job.progress envelope covers it). New
RestoreStatus + RestoreSummary types mirror restic's wire shape.
- internal/restic.RunDiff wraps 'restic diff --json <a> <b>'.
- internal/agent/runner: RunRestore translates RestoreStatus into
job.progress (mapping FilesRestored → FilesDone etc) with a small
estimateETA helper since restic doesn't provide ETA for restore.
RunDiff is a thin streamHandler wrapper.
- cmd/agent dispatcher gains JobRestore + JobDiff cases. Both reuse
the spawn() helper from P3-X1 so cancel just works.
- Drive-by fix: lastProgress was initialised to time.Now() so the
very first status event was suppressed by the 1s throttle if the
agent reported quickly. Initialise to time.Time{} (zero) so the
first event always emits. Affects backup + restore.
Tests:
- restore_test covers restore happy path (started → progress →
finished, kind=restore on the started envelope), in-place argv
asserts no --no-ownership, new-dir argv asserts --no-ownership +
--target + --include, diff produces the expected log.stream lines.
Restage block (CLAUDE.md) is deferred to the end of the restore
sub-phase so we restage once with all changes.
This commit is contained in:
@@ -0,0 +1,233 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
)
|
||||
|
||||
// TestRunRestoreShipsExpectedEnvelopes: a fake restic emits a couple
|
||||
// of restore status lines and a summary; the runner translates them
|
||||
// into job.progress envelopes and finishes the job successfully.
|
||||
func TestRunRestoreShipsExpectedEnvelopes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
bin := setupScript(t, `
|
||||
case "$1" in
|
||||
restore)
|
||||
echo '{"message_type":"status","seconds_elapsed":1,"percent_done":0.5,"total_files":10,"files_restored":5,"total_bytes":1000,"bytes_restored":500}'
|
||||
echo '{"message_type":"status","seconds_elapsed":2,"percent_done":1.0,"total_files":10,"files_restored":10,"total_bytes":1000,"bytes_restored":1000}'
|
||||
echo '{"message_type":"summary","seconds_elapsed":2,"total_files":10,"files_restored":10,"total_bytes":1000,"bytes_restored":1000}'
|
||||
;;
|
||||
*)
|
||||
echo "unknown: $*" ;;
|
||||
esac
|
||||
`)
|
||||
|
||||
tx := &fakeSender{}
|
||||
r := New(Config{ResticBin: bin}, tx, 0)
|
||||
|
||||
if err := r.RunRestore(context.Background(), "job-r1", "f3a7b2c1",
|
||||
[]string{"/etc/nginx/sites-available/alfa.conf"},
|
||||
false, "/tmp/restore-out"); err != nil {
|
||||
t.Fatalf("RunRestore: %v", err)
|
||||
}
|
||||
|
||||
// Confirm landmarks: started → progress → finished.
|
||||
order := envelopeOrder(tx.envs)
|
||||
wants := []api.MessageType{api.MsgJobStarted, api.MsgJobProgress, api.MsgJobFinished}
|
||||
positions := map[api.MessageType]int{}
|
||||
for i, mt := range order {
|
||||
if _, seen := positions[mt]; !seen {
|
||||
positions[mt] = i
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(wants)-1; i++ {
|
||||
a, b := wants[i], wants[i+1]
|
||||
pa, aOK := positions[a]
|
||||
pb, bOK := positions[b]
|
||||
if !aOK {
|
||||
t.Fatalf("envelope %q not found in %v", a, order)
|
||||
}
|
||||
if !bOK {
|
||||
t.Fatalf("envelope %q not found in %v", b, order)
|
||||
}
|
||||
if pa >= pb {
|
||||
t.Fatalf("expected %q before %q (positions %d, %d)", a, b, pa, pb)
|
||||
}
|
||||
}
|
||||
|
||||
// Started carries the right kind.
|
||||
startEnv := firstEnvOfType(t, tx.envs, api.MsgJobStarted)
|
||||
var startP api.JobStartedPayload
|
||||
if err := startEnv.UnmarshalPayload(&startP); err != nil {
|
||||
t.Fatalf("unmarshal started: %v", err)
|
||||
}
|
||||
if startP.Kind != api.JobRestore {
|
||||
t.Fatalf("kind: got %q want %q", startP.Kind, api.JobRestore)
|
||||
}
|
||||
|
||||
// Finished is succeeded.
|
||||
finEnv := firstEnvOfType(t, tx.envs, api.MsgJobFinished)
|
||||
var finP api.JobFinishedPayload
|
||||
if err := finEnv.UnmarshalPayload(&finP); err != nil {
|
||||
t.Fatalf("unmarshal finished: %v", err)
|
||||
}
|
||||
if finP.Status != api.JobSucceeded {
|
||||
t.Fatalf("status: got %q want %q", finP.Status, api.JobSucceeded)
|
||||
}
|
||||
// Progress envelope reflects the last status line: 100% with 10 files.
|
||||
progEnv := firstEnvOfType(t, tx.envs, api.MsgJobProgress)
|
||||
var progP api.JobProgressPayload
|
||||
if err := progEnv.UnmarshalPayload(&progP); err != nil {
|
||||
t.Fatalf("unmarshal progress: %v", err)
|
||||
}
|
||||
// First progress will be from line 1 (50%) since we send first status
|
||||
// immediately. Verify we at least see a sensible value.
|
||||
if progP.PercentDone <= 0 {
|
||||
t.Fatalf("expected non-zero progress, got %v", progP.PercentDone)
|
||||
}
|
||||
if progP.FilesDone <= 0 || progP.TotalFiles <= 0 {
|
||||
t.Fatalf("expected file counters set, got %+v", progP)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunRestoreInPlaceArgvHasNoNoOwnership: indirectly verifies that
|
||||
// in-place mode doesn't pass --no-ownership. We can't see the actual
|
||||
// argv without a custom test harness, so we use a fake restic that
|
||||
// echoes its args and check the captured log.stream.
|
||||
func TestRunRestoreInPlaceArgvHasNoNoOwnership(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
bin := setupScript(t, `
|
||||
case "$1" in
|
||||
restore)
|
||||
# Print all args on stderr so they're forwarded as log.stream.
|
||||
echo "argv: $*" 1>&2
|
||||
echo '{"message_type":"summary","seconds_elapsed":0,"total_files":0,"files_restored":0,"total_bytes":0,"bytes_restored":0}'
|
||||
;;
|
||||
esac
|
||||
`)
|
||||
|
||||
tx := &fakeSender{}
|
||||
r := New(Config{ResticBin: bin}, tx, 0)
|
||||
if err := r.RunRestore(context.Background(), "job-r2", "abc",
|
||||
nil, true, ""); err != nil {
|
||||
t.Fatalf("RunRestore: %v", err)
|
||||
}
|
||||
|
||||
// Reconstruct the argv from the captured stderr log line.
|
||||
var argv string
|
||||
for _, e := range tx.envs {
|
||||
if e.Type == api.MsgLogStream {
|
||||
var p api.LogStreamLine
|
||||
_ = e.UnmarshalPayload(&p)
|
||||
if p.Stream == api.LogStderr && strings.HasPrefix(p.Payload, "argv:") {
|
||||
argv = p.Payload
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if argv == "" {
|
||||
t.Fatal("never captured argv echo from fake restic")
|
||||
}
|
||||
if strings.Contains(argv, "--no-ownership") {
|
||||
t.Errorf("in-place restore should NOT pass --no-ownership; got argv=%q", argv)
|
||||
}
|
||||
if !strings.Contains(argv, "--target /") {
|
||||
t.Errorf("in-place restore should pass --target /; got argv=%q", argv)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunRestoreNewDirArgvHasNoOwnership: complement of the above —
|
||||
// non-in-place restore must include --no-ownership.
|
||||
func TestRunRestoreNewDirArgvHasNoOwnership(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
bin := setupScript(t, `
|
||||
case "$1" in
|
||||
restore)
|
||||
echo "argv: $*" 1>&2
|
||||
echo '{"message_type":"summary","seconds_elapsed":0,"total_files":0,"files_restored":0,"total_bytes":0,"bytes_restored":0}'
|
||||
;;
|
||||
esac
|
||||
`)
|
||||
tx := &fakeSender{}
|
||||
r := New(Config{ResticBin: bin}, tx, 0)
|
||||
if err := r.RunRestore(context.Background(), "job-r3", "abc",
|
||||
[]string{"/etc/foo"}, false, "/tmp/restore-out"); err != nil {
|
||||
t.Fatalf("RunRestore: %v", err)
|
||||
}
|
||||
|
||||
var argv string
|
||||
for _, e := range tx.envs {
|
||||
if e.Type == api.MsgLogStream {
|
||||
var p api.LogStreamLine
|
||||
_ = e.UnmarshalPayload(&p)
|
||||
if p.Stream == api.LogStderr && strings.HasPrefix(p.Payload, "argv:") {
|
||||
argv = p.Payload
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if argv == "" {
|
||||
t.Fatal("no argv echo")
|
||||
}
|
||||
if !strings.Contains(argv, "--no-ownership") {
|
||||
t.Errorf("new-dir restore should pass --no-ownership; got argv=%q", argv)
|
||||
}
|
||||
if !strings.Contains(argv, "--target /tmp/restore-out") {
|
||||
t.Errorf("expected --target /tmp/restore-out; got argv=%q", argv)
|
||||
}
|
||||
if !strings.Contains(argv, "--include /etc/foo") {
|
||||
t.Errorf("expected --include /etc/foo; got argv=%q", argv)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunDiffShipsLogLines: diff output is forwarded as log.stream.
|
||||
func TestRunDiffShipsLogLines(t *testing.T) {
|
||||
t.Parallel()
|
||||
bin := setupScript(t, `
|
||||
case "$1" in
|
||||
diff)
|
||||
echo '{"message_type":"change","path":"/etc/nginx/nginx.conf","modifier":"M"}'
|
||||
echo '{"message_type":"statistics","added":{"files":0,"dirs":0}}'
|
||||
;;
|
||||
esac
|
||||
`)
|
||||
tx := &fakeSender{}
|
||||
r := New(Config{ResticBin: bin}, tx, 0)
|
||||
if err := r.RunDiff(context.Background(), "job-d1", "snap-a", "snap-b"); err != nil {
|
||||
t.Fatalf("RunDiff: %v", err)
|
||||
}
|
||||
|
||||
startEnv := firstEnvOfType(t, tx.envs, api.MsgJobStarted)
|
||||
var startP api.JobStartedPayload
|
||||
_ = startEnv.UnmarshalPayload(&startP)
|
||||
if startP.Kind != api.JobDiff {
|
||||
t.Fatalf("kind: got %q want %q", startP.Kind, api.JobDiff)
|
||||
}
|
||||
finEnv := firstEnvOfType(t, tx.envs, api.MsgJobFinished)
|
||||
var finP api.JobFinishedPayload
|
||||
_ = finEnv.UnmarshalPayload(&finP)
|
||||
if finP.Status != api.JobSucceeded {
|
||||
t.Fatalf("status: %q", finP.Status)
|
||||
}
|
||||
// At least one log line should carry the change payload.
|
||||
var sawChange bool
|
||||
for _, e := range tx.envs {
|
||||
if e.Type != api.MsgLogStream {
|
||||
continue
|
||||
}
|
||||
var p api.LogStreamLine
|
||||
_ = e.UnmarshalPayload(&p)
|
||||
if strings.Contains(p.Payload, `"message_type":"change"`) {
|
||||
sawChange = true
|
||||
}
|
||||
}
|
||||
if !sawChange {
|
||||
t.Fatal("never saw a change log line in diff output")
|
||||
}
|
||||
}
|
||||
@@ -156,7 +156,7 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
|
||||
}
|
||||
|
||||
env := r.resticEnv()
|
||||
lastProgress := time.Now()
|
||||
lastProgress := time.Time{} // zero time → first status event always emits
|
||||
|
||||
handle := func(stream string, line string, ev any) {
|
||||
// Throttled progress events come from restic's `status` JSON.
|
||||
@@ -359,6 +359,102 @@ func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunRestore executes a restic restore job and reports back via the
|
||||
// sender. paths is the operator-selected file/dir list to restore.
|
||||
// inPlace=true preserves uid/gid/mode and writes at "/"; inPlace=false
|
||||
// writes at targetDir with --no-ownership.
|
||||
//
|
||||
// Status events from restic are throttled into job.progress in the
|
||||
// same shape as backup; raw status lines are dropped from log.stream
|
||||
// (they would drown the log on a fast restore — the progress widget
|
||||
// already covers them).
|
||||
func (r *Runner) RunRestore(ctx context.Context, jobID, snapshotID string, paths []string, inPlace bool, targetDir string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
r.sendStarted(jobID, api.JobRestore, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
lastProgress := time.Time{} // zero time → first status event always emits
|
||||
|
||||
handle := func(stream string, line string, ev any) {
|
||||
status, isStatus := ev.(restic.RestoreStatus)
|
||||
if !isStatus {
|
||||
now := time.Now().UTC()
|
||||
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
||||
JobID: jobID,
|
||||
Seq: seq.Add(1),
|
||||
TS: now,
|
||||
Stream: api.LogStream(stream),
|
||||
Payload: line,
|
||||
})
|
||||
_ = r.tx.Send(logEnv)
|
||||
}
|
||||
if isStatus {
|
||||
if time.Since(lastProgress) < r.progressMinPeriod {
|
||||
return
|
||||
}
|
||||
lastProgress = time.Now()
|
||||
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
|
||||
JobID: jobID,
|
||||
PercentDone: status.PercentDone,
|
||||
FilesDone: status.FilesRestored,
|
||||
TotalFiles: status.TotalFiles,
|
||||
BytesDone: status.BytesRestored,
|
||||
TotalBytes: status.TotalBytes,
|
||||
ETASeconds: estimateETA(status.BytesRestored, status.TotalBytes, status.SecondsElapsed),
|
||||
ThroughputBps: throughput(status.BytesRestored, status.SecondsElapsed),
|
||||
})
|
||||
_ = r.tx.Send(progEnv)
|
||||
}
|
||||
}
|
||||
|
||||
summary, err := env.RunRestore(ctx, snapshotID, paths, inPlace, targetDir, handle)
|
||||
finishedAt := time.Now().UTC()
|
||||
|
||||
var statsBlob json.RawMessage
|
||||
if summary != nil {
|
||||
statsBlob, _ = json.Marshal(summary)
|
||||
}
|
||||
r.sendFinished(ctx, jobID, finishedAt, err, statsBlob)
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner restore: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// estimateETA computes an ETA in seconds based on current bytes
|
||||
// progress + elapsed seconds. Restic restore's --json doesn't emit an
|
||||
// ETA field of its own (unlike backup), so we approximate by linear
|
||||
// extrapolation. Returns 0 when we don't have enough data.
|
||||
func estimateETA(bytesDone, totalBytes, secondsElapsed int64) int64 {
|
||||
if bytesDone <= 0 || totalBytes <= 0 || secondsElapsed <= 0 || bytesDone >= totalBytes {
|
||||
return 0
|
||||
}
|
||||
rate := float64(bytesDone) / float64(secondsElapsed)
|
||||
if rate <= 0 {
|
||||
return 0
|
||||
}
|
||||
return int64(float64(totalBytes-bytesDone) / rate)
|
||||
}
|
||||
|
||||
// RunDiff executes `restic diff --json <a> <b>` and forwards output
|
||||
// as log.stream lines. No snapshot-list refresh, no stats update —
|
||||
// diff is purely informational.
|
||||
func (r *Runner) RunDiff(ctx context.Context, jobID, snapshotA, snapshotB string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
r.sendStarted(jobID, api.JobDiff, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
err := env.RunDiff(ctx, snapshotA, snapshotB, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
r.sendFinished(ctx, jobID, finishedAt, err, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner diff: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunUnlock executes a `restic unlock` job. On success it ships a
|
||||
// repo.stats envelope with LockPresent=false so the UI banner clears.
|
||||
func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
||||
|
||||
@@ -52,14 +52,17 @@ type JobKind string
|
||||
|
||||
// Allowed JobKind values. backup is operator/cron driven; init runs
|
||||
// once per host on first connect; forget/prune/check fire from the
|
||||
// server-side maintenance ticker; unlock is operator-only.
|
||||
// server-side maintenance ticker; unlock and restore are operator-
|
||||
// only; diff is operator-only and read-only.
|
||||
const (
|
||||
JobBackup JobKind = "backup"
|
||||
JobInit JobKind = "init"
|
||||
JobForget JobKind = "forget"
|
||||
JobPrune JobKind = "prune"
|
||||
JobCheck JobKind = "check"
|
||||
JobUnlock JobKind = "unlock"
|
||||
JobBackup JobKind = "backup"
|
||||
JobInit JobKind = "init"
|
||||
JobForget JobKind = "forget"
|
||||
JobPrune JobKind = "prune"
|
||||
JobCheck JobKind = "check"
|
||||
JobUnlock JobKind = "unlock"
|
||||
JobRestore JobKind = "restore"
|
||||
JobDiff JobKind = "diff"
|
||||
)
|
||||
|
||||
// JobStatus is the lifecycle state of a job.
|
||||
@@ -143,6 +146,35 @@ type CommandRunPayload struct {
|
||||
// just executes whatever is here.
|
||||
PreHook string `json:"pre_hook,omitempty"`
|
||||
PostHook string `json:"post_hook,omitempty"`
|
||||
|
||||
// Restore is populated only for kind=restore. See RestorePayload
|
||||
// for the shape; nil for every other kind.
|
||||
Restore *RestorePayload `json:"restore,omitempty"`
|
||||
|
||||
// Diff is populated only for kind=diff. See DiffPayload for
|
||||
// shape; nil for every other kind.
|
||||
Diff *DiffPayload `json:"diff,omitempty"`
|
||||
}
|
||||
|
||||
// RestorePayload carries restore-specific arguments on a JobRestore
|
||||
// command.run. Paths are absolute paths inside the snapshot (same
|
||||
// shape restic accepts as positional args). When InPlace is true the
|
||||
// agent restores at root (`--target /`) and preserves uid/gid/mode;
|
||||
// otherwise it restores into TargetDir with --no-ownership so the
|
||||
// operator can inspect the files as the agent user.
|
||||
type RestorePayload struct {
|
||||
SnapshotID string `json:"snapshot_id"`
|
||||
Paths []string `json:"paths"`
|
||||
InPlace bool `json:"in_place"`
|
||||
TargetDir string `json:"target_dir,omitempty"` // ignored when in_place=true
|
||||
}
|
||||
|
||||
// DiffPayload carries snapshot-diff arguments on a JobDiff command.run.
|
||||
// SnapshotA / SnapshotB may be either short or long IDs; restic
|
||||
// accepts both.
|
||||
type DiffPayload struct {
|
||||
SnapshotA string `json:"snapshot_a"`
|
||||
SnapshotB string `json:"snapshot_b"`
|
||||
}
|
||||
|
||||
// CommandCancelPayload is the server → agent cancel signal.
|
||||
|
||||
@@ -0,0 +1,213 @@
|
||||
package restic
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// RestoreStatus mirrors the JSON `status` lines `restic restore --json`
|
||||
// emits while restoring. Field names track restic's wire format; we
|
||||
// project a subset (the rest are cosmetic).
|
||||
type RestoreStatus struct {
|
||||
MessageType string `json:"message_type"`
|
||||
SecondsElapsed int64 `json:"seconds_elapsed"`
|
||||
PercentDone float64 `json:"percent_done"`
|
||||
TotalFiles int64 `json:"total_files"`
|
||||
FilesRestored int64 `json:"files_restored"`
|
||||
FilesSkipped int64 `json:"files_skipped"`
|
||||
TotalBytes int64 `json:"total_bytes"`
|
||||
BytesRestored int64 `json:"bytes_restored"`
|
||||
BytesSkipped int64 `json:"bytes_skipped"`
|
||||
}
|
||||
|
||||
// RestoreSummary is the final summary line emitted after a successful
|
||||
// restore. Newer restic prints it; older clients leave us with no
|
||||
// summary, in which case the agent skips the stats and the live UI
|
||||
// just sees percent reach 100%.
|
||||
type RestoreSummary struct {
|
||||
MessageType string `json:"message_type"`
|
||||
SecondsElapsed int64 `json:"seconds_elapsed"`
|
||||
TotalFiles int64 `json:"total_files"`
|
||||
FilesRestored int64 `json:"files_restored"`
|
||||
FilesSkipped int64 `json:"files_skipped"`
|
||||
TotalBytes int64 `json:"total_bytes"`
|
||||
BytesRestored int64 `json:"bytes_restored"`
|
||||
BytesSkipped int64 `json:"bytes_skipped"`
|
||||
}
|
||||
|
||||
// RunRestore executes `restic restore <snapshotID> --target <dir>
|
||||
// [--include <p>...]` with --json and pumps progress events into
|
||||
// handle. paths is the operator-selected list (each becomes an
|
||||
// `--include` flag); preserveOwner controls --no-ownership.
|
||||
//
|
||||
// inPlace toggles target semantics:
|
||||
// - true → target is "/" and ownership is preserved
|
||||
// - false → target is targetDir and --no-ownership is passed
|
||||
//
|
||||
// targetDir is created on demand by restic itself.
|
||||
func (e Env) RunRestore(ctx context.Context, snapshotID string, paths []string, inPlace bool, targetDir string, handle LineHandler) (*RestoreSummary, error) {
|
||||
if snapshotID == "" {
|
||||
return nil, fmt.Errorf("restic restore: snapshot id required")
|
||||
}
|
||||
if !inPlace && targetDir == "" {
|
||||
return nil, fmt.Errorf("restic restore: target dir required for non-in-place restore")
|
||||
}
|
||||
|
||||
args := []string{"restore", "--json", snapshotID}
|
||||
target := targetDir
|
||||
if inPlace {
|
||||
target = "/"
|
||||
}
|
||||
args = append(args, "--target", target)
|
||||
if !inPlace {
|
||||
args = append(args, "--no-ownership")
|
||||
}
|
||||
for _, p := range paths {
|
||||
args = append(args, "--include", p)
|
||||
}
|
||||
|
||||
cmd := e.resticCmd(ctx, args...)
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("restic restore: stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("restic restore: stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("restic restore: start: %w", err)
|
||||
}
|
||||
|
||||
var summary *RestoreSummary
|
||||
done := make(chan error, 2)
|
||||
go func() { done <- pumpRestoreStdout(stdout, handle, &summary) }()
|
||||
go func() { done <- pumpStderr(stderr, handle) }()
|
||||
for i := 0; i < 2; i++ {
|
||||
if err := <-done; err != nil && handle != nil {
|
||||
handle("event", fmt.Sprintf("pump error: %v", err), nil)
|
||||
}
|
||||
}
|
||||
werr := cmd.Wait()
|
||||
if werr != nil {
|
||||
var ee *exec.ExitError
|
||||
if errors.As(werr, &ee) {
|
||||
return summary, fmt.Errorf("restic restore: exit %d", ee.ExitCode())
|
||||
}
|
||||
return summary, fmt.Errorf("restic restore: %w", werr)
|
||||
}
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
// pumpRestoreStdout is the restore variant of pumpStdout: it emits
|
||||
// `event` lines for the parsed status/summary objects (so the runner
|
||||
// can shape them into job.progress) and forwards everything else as
|
||||
// stdout — but unlike backup we include the raw status JSON in
|
||||
// log.stream too because restore is short and the live log audience
|
||||
// genuinely benefits from the per-file traffic. Actually — we mirror
|
||||
// backup's behavior and DROP raw status lines from log.stream
|
||||
// (they'd drown the log on a fast restore); the progress envelope
|
||||
// covers them.
|
||||
func pumpRestoreStdout(r io.Reader, handle LineHandler, summary **RestoreSummary) error {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if handle == nil {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(line, "{") {
|
||||
handle("stdout", line, nil)
|
||||
continue
|
||||
}
|
||||
var probe struct {
|
||||
MessageType string `json:"message_type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(line), &probe); err != nil {
|
||||
handle("stdout", line, nil)
|
||||
continue
|
||||
}
|
||||
switch probe.MessageType {
|
||||
case "status":
|
||||
var ev RestoreStatus
|
||||
if json.Unmarshal([]byte(line), &ev) == nil {
|
||||
// Don't tee status lines to log.stream — too chatty.
|
||||
handle("event", line, ev)
|
||||
continue
|
||||
}
|
||||
case "summary":
|
||||
var ev RestoreSummary
|
||||
if json.Unmarshal([]byte(line), &ev) == nil {
|
||||
if summary != nil {
|
||||
s := ev
|
||||
*summary = &s
|
||||
}
|
||||
handle("event", line, ev)
|
||||
continue
|
||||
}
|
||||
case "verbose_status":
|
||||
handle("event", line, nil)
|
||||
continue
|
||||
}
|
||||
handle("stdout", line, nil)
|
||||
}
|
||||
return scanner.Err()
|
||||
}
|
||||
|
||||
// RunDiff executes `restic diff --json <a> <b>` and forwards every
|
||||
// line to handle as stdout. Restic emits per-line "change" objects
|
||||
// plus a final "statistics" object; we don't parse them server-side —
|
||||
// the operator reads the raw output on the live job log page.
|
||||
func (e Env) RunDiff(ctx context.Context, snapshotA, snapshotB string, handle LineHandler) error {
|
||||
if snapshotA == "" || snapshotB == "" {
|
||||
return fmt.Errorf("restic diff: two snapshot ids required")
|
||||
}
|
||||
cmd := e.resticCmd(ctx, "diff", "--json", snapshotA, snapshotB)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("restic diff: stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("restic diff: stderr pipe: %w", err)
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return fmt.Errorf("restic diff: start: %w", err)
|
||||
}
|
||||
done := make(chan error, 2)
|
||||
// diff output isn't huge; pumpStderr-ish line-by-line forwarding
|
||||
// is fine.
|
||||
go func() {
|
||||
s := bufio.NewScanner(stdout)
|
||||
s.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||||
for s.Scan() {
|
||||
if handle != nil {
|
||||
handle("stdout", s.Text(), nil)
|
||||
}
|
||||
}
|
||||
done <- s.Err()
|
||||
}()
|
||||
go func() { done <- pumpStderr(stderr, handle) }()
|
||||
for i := 0; i < 2; i++ {
|
||||
if err := <-done; err != nil && handle != nil {
|
||||
handle("event", fmt.Sprintf("pump error: %v", err), nil)
|
||||
}
|
||||
}
|
||||
werr := cmd.Wait()
|
||||
if werr != nil {
|
||||
var ee *exec.ExitError
|
||||
if errors.As(werr, &ee) {
|
||||
return fmt.Errorf("restic diff: exit %d", ee.ExitCode())
|
||||
}
|
||||
return fmt.Errorf("restic diff: %w", werr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -152,7 +152,8 @@ func (s *Server) requireUser(r *stdhttp.Request) (*store.User, bool) {
|
||||
|
||||
func validJobKind(k api.JobKind) bool {
|
||||
switch k {
|
||||
case api.JobBackup, api.JobInit, api.JobForget, api.JobPrune, api.JobCheck, api.JobUnlock:
|
||||
case api.JobBackup, api.JobInit, api.JobForget, api.JobPrune,
|
||||
api.JobCheck, api.JobUnlock, api.JobRestore, api.JobDiff:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
Reference in New Issue
Block a user