// Package restic wraps the restic CLI: locate the binary, run it // with --json, parse streamed events. The agent calls this; the // control-plane never invokes restic. package restic import ( "bufio" "context" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "strings" "time" ) // Locate resolves the path to the restic binary. Honour an explicit // override if provided, else fall back to PATH. func Locate(override string) (string, error) { if override != "" { if _, err := exec.LookPath(override); err == nil { return override, nil } return "", fmt.Errorf("restic: configured path %q not executable", override) } bin, err := exec.LookPath("restic") if err != nil { return "", fmt.Errorf("restic: not on PATH: %w", err) } return bin, nil } // Env is the per-invocation context for a restic command. // // RepoURL is the bare URL as the operator typed it — no embedded // credentials. RepoUsername (optional) carries the HTTP basic-auth // user for `rest:` repos. The merged URL (with `user:pass@host` // embedded) is built once inside envSlice() at the moment of exec // and fed straight to the subprocess via RESTIC_REPOSITORY; we // never assign it back to Env, never pass it to slog. If anything // in this package ever needs to *log* a URL, use RedactURL. type Env struct { Bin string // path to restic binary RepoURL string // RESTIC_REPOSITORY (no embedded creds) RepoUsername string // optional HTTP basic-auth user for rest: URLs RepoPassword string // doubles as RESTIC_PASSWORD and (for rest:) HTTP basic-auth password ExtraEnv map[string]string // any other RESTIC_* / passthrough WorkDir string // CWD; default = current } // EventKind enumerates what we care about in restic's --json output // for `backup`. Restic's other commands emit different shapes; we // switch on message_type. type EventKind string const ( EventStatus EventKind = "status" // periodic progress EventVerbose EventKind = "verbose_status" EventSummary EventKind = "summary" // emitted once at end of backup EventErrorEvent EventKind = "error" ) // BackupStatus mirrors the JSON status emitted by `restic backup`. type BackupStatus struct { MessageType string `json:"message_type"` PercentDone float64 `json:"percent_done"` TotalFiles int64 `json:"total_files"` FilesDone int64 `json:"files_done"` TotalBytes int64 `json:"total_bytes"` BytesDone int64 `json:"bytes_done"` SecondsElapsed int64 `json:"seconds_elapsed"` SecondsRem int64 `json:"seconds_remaining"` } // BackupSummary mirrors the JSON summary block. type BackupSummary struct { MessageType string `json:"message_type"` FilesNew int64 `json:"files_new"` FilesChanged int64 `json:"files_changed"` FilesUnmodified int64 `json:"files_unmodified"` DirsNew int64 `json:"dirs_new"` DirsChanged int64 `json:"dirs_changed"` DirsUnmodified int64 `json:"dirs_unmodified"` DataAdded int64 `json:"data_added"` TotalFilesProcessed int64 `json:"total_files_processed"` TotalBytesProcessed int64 `json:"total_bytes_processed"` TotalDuration float64 `json:"total_duration"` SnapshotID string `json:"snapshot_id"` } // LineHandler receives every stdout/stderr line. event is non-nil // when the line is a recognised JSON status; raw always carries the // original text (so we can also tee to job_logs as `stdout`). type LineHandler func(stream string, raw string, event any) // RunBackup executes `restic backup [paths...]` with --json and pumps // status/summary into handle. Returns nil on success (exit code 0 // or 3 — 3 means "completed but had issues"; restic considers it a // success). Other exit codes propagate as an error. func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, handle LineHandler) (*BackupSummary, error) { args := []string{"backup", "--json"} for _, ex := range excludes { args = append(args, "--exclude", ex) } for _, tag := range tags { args = append(args, "--tag", tag) } args = append(args, paths...) cmd := exec.CommandContext(ctx, e.Bin, args...) cmd.Env = e.envSlice() cmd.Dir = e.WorkDir stdout, err := cmd.StdoutPipe() if err != nil { return nil, fmt.Errorf("restic backup: stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { return nil, fmt.Errorf("restic backup: stderr pipe: %w", err) } if err := cmd.Start(); err != nil { return nil, fmt.Errorf("restic backup: start: %w", err) } var summary *BackupSummary done := make(chan error, 2) go func() { done <- pumpStdout(stdout, handle, &summary) }() go func() { done <- pumpStderr(stderr, handle) }() // Wait for both pumps + the process. for i := 0; i < 2; i++ { if err := <-done; err != nil && handle != nil { handle("event", fmt.Sprintf("pump error: %v", err), nil) } } werr := cmd.Wait() if werr != nil { var ee *exec.ExitError if errors.As(werr, &ee) && ee.ExitCode() == 3 { // "incomplete backup" — restic still produced a snapshot. return summary, nil } return summary, fmt.Errorf("restic backup: %w", werr) } return summary, nil } // RunInit executes `restic init` against the configured repo. Returns // nil on success. Restic init's output is small and not JSON-rich; // we tee stdout/stderr verbatim through handle so the operator sees // the same lines they'd see at the CLI ("created restic repository // at " on success, "config file already exists" on a // re-init attempt, etc.). func (e Env) RunInit(ctx context.Context, handle LineHandler) error { cmd := exec.CommandContext(ctx, e.Bin, "init") cmd.Env = e.envSlice() cmd.Dir = e.WorkDir stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("restic init: stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { return fmt.Errorf("restic init: stderr pipe: %w", err) } if err := cmd.Start(); err != nil { return fmt.Errorf("restic init: start: %w", err) } // Sniff for "config file already exists" on stderr; if we see it // we'll treat the non-zero exit as a soft success — running init // against an already-initialised repo is a no-op semantically, // not a failure. Wraps the caller's handle so the line still // gets streamed verbatim to the operator-facing log. alreadyInited := false sniff := func(stream, line string, ev any) { if stream == "stderr" && strings.Contains(line, "config file already exists") { alreadyInited = true } if handle != nil { handle(stream, line, ev) } } done := make(chan error, 2) go func() { done <- pumpPlain(stdout, "stdout", sniff) }() go func() { done <- pumpPlain(stderr, "stderr", sniff) }() for i := 0; i < 2; i++ { if err := <-done; err != nil && handle != nil { handle("event", fmt.Sprintf("pump error: %v", err), nil) } } if werr := cmd.Wait(); werr != nil { if alreadyInited { if handle != nil { handle("event", "repo already initialised — treating as success", nil) } return nil } return fmt.Errorf("restic init: %w", werr) } return nil } func pumpPlain(r io.Reader, stream string, handle LineHandler) error { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { if handle != nil { handle(stream, scanner.Text(), nil) } } return scanner.Err() } // envSlice converts Env's typed fields into the os/exec env shape. // // Deliberately does NOT inherit the parent process's environment: // any RESTIC_* / AWS_* / B2_* vars in the operator's shell or the // systemd unit's Environment= clause are filtered out so the // control-plane is the unambiguous source of truth. // // HOME / XDG_CACHE_HOME are set explicitly because restic insists // on one or the other for its cache dir; without it the command // fails before ever talking to the repo. func (e Env) envSlice() []string { home := "/var/lib/restic-manager" if h, ok := e.ExtraEnv["HOME"]; ok && h != "" { home = h } else if h := os.Getenv("HOME"); h != "" { home = h } xdg := home + "/.cache" if x, ok := e.ExtraEnv["XDG_CACHE_HOME"]; ok && x != "" { xdg = x } else if x := os.Getenv("XDG_CACHE_HOME"); x != "" { xdg = x } out := []string{ "RESTIC_REPOSITORY=" + mergeRestCreds(e.RepoURL, e.RepoUsername, e.RepoPassword), "RESTIC_PASSWORD=" + e.RepoPassword, // Feed restic via env-only — keeps creds off ps(1). "PATH=/usr/local/bin:/usr/bin:/bin", "HOME=" + home, "XDG_CACHE_HOME=" + xdg, } for k, v := range e.ExtraEnv { // HOME / XDG_CACHE_HOME already merged in above. if k == "HOME" || k == "XDG_CACHE_HOME" { continue } out = append(out, k+"="+v) } return out } func pumpStdout(r io.Reader, handle LineHandler, summary **BackupSummary) error { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024) // status lines can get long for scanner.Scan() { line := scanner.Text() if handle == nil { continue } // Sniff message_type without a full Unmarshal so non-JSON // lines (very rare on stdout, but possible) survive. if !strings.HasPrefix(line, "{") { handle("stdout", line, nil) continue } var probe struct { MessageType string `json:"message_type"` } if err := json.Unmarshal([]byte(line), &probe); err != nil { handle("stdout", line, nil) continue } switch EventKind(probe.MessageType) { case EventStatus, EventVerbose: var ev BackupStatus if json.Unmarshal([]byte(line), &ev) == nil { handle("event", line, ev) continue } case EventSummary: var ev BackupSummary if json.Unmarshal([]byte(line), &ev) == nil { if summary != nil { s := ev *summary = &s } handle("event", line, ev) continue } case EventErrorEvent: handle("event", line, nil) continue } handle("stdout", line, nil) } return scanner.Err() } func pumpStderr(r io.Reader, handle LineHandler) error { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { if handle != nil { handle("stderr", scanner.Text(), nil) } } return scanner.Err() } // suppress unused-time false-positive when nothing else in this file // uses time but the file is part of a package that grows over time var _ = time.Now