From bc6a91b06401e74db88f242f7da063c6ed7df16b Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sun, 3 May 2026 22:39:23 +0100 Subject: [PATCH] agent: RunPrune/RunCheck/RunUnlock + reportStats + admin-cred slot dispatch Extract resticEnv/sendStarted/streamHandler/sendFinished helpers to remove boilerplate duplication across Run* methods. Add RunPrune (ships repo.stats with LastPruneAt before job.finished), RunCheck (ships stats with LastCheckStatus/LockPresent regardless of outcome), RunUnlock (ships LockPresent=false on success), and reportStats (fills size fields via RunStats when caller didn't populate them). Wire JobPrune/JobCheck/JobUnlock into the dispatcher switch; teach MsgConfigUpdate about the Slot discriminator for admin vs repo creds; add strconv import for subset-pct parsing. --- cmd/agent/main.go | 126 +++++++++-- internal/agent/runner/runner.go | 306 +++++++++++++++++---------- internal/agent/runner/runner_test.go | 261 +++++++++++++++++++++++ 3 files changed, 554 insertions(+), 139 deletions(-) create mode 100644 internal/agent/runner/runner_test.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index cb38457..1bd7a65 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "os/signal" + "strconv" "syscall" "time" @@ -199,32 +200,68 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S case api.MsgConfigUpdate: var p api.ConfigUpdatePayload _ = env.UnmarshalPayload(&p) - // Merge with whatever's already in secrets.enc — empty fields - // in the push mean "leave alone." Atomic write underneath. - cur, err := d.secrets.Load() - if err != nil { - slog.Error("ws agent: load secrets for merge", "err", err) - return nil + slot := p.Slot + if slot == "" { + slot = "repo" } - changed := false - if p.RepoURL != "" && p.RepoURL != cur.URL { - cur.URL = p.RepoURL - changed = true - } - if p.RepoUsername != "" && p.RepoUsername != cur.Username { - cur.Username = p.RepoUsername - changed = true - } - if p.RepoPassword != "" && p.RepoPassword != cur.Password { - cur.Password = p.RepoPassword - changed = true - } - if changed { - if err := d.secrets.Save(cur); err != nil { - slog.Error("ws agent: persist secrets", "err", err) + switch slot { + case "repo": + // Merge with whatever's already in secrets.enc — empty fields + // in the push mean "leave alone." Atomic write underneath. + cur, err := d.secrets.Load() + if err != nil { + slog.Error("ws agent: load secrets for merge", "err", err) return nil } - slog.Info("ws agent: repo credentials updated via config.update") + changed := false + if p.RepoURL != "" && p.RepoURL != cur.URL { + cur.URL = p.RepoURL + changed = true + } + if p.RepoUsername != "" && p.RepoUsername != cur.Username { + cur.Username = p.RepoUsername + changed = true + } + if p.RepoPassword != "" && p.RepoPassword != cur.Password { + cur.Password = p.RepoPassword + changed = true + } + if changed { + if err := d.secrets.Save(cur); err != nil { + slog.Error("ws agent: persist secrets", "err", err) + return nil + } + slog.Info("ws agent: repo credentials updated via config.update") + } + case "admin": + cur, err := d.secrets.LoadAdmin() + if err != nil && !errors.Is(err, secrets.ErrNoAdmin) { + slog.Error("ws agent: load admin secrets", "err", err) + return nil + } + // ErrNoAdmin is not an error here — we are creating the slot. + changed := false + if p.RepoURL != "" && p.RepoURL != cur.URL { + cur.URL = p.RepoURL + changed = true + } + if p.RepoUsername != "" && p.RepoUsername != cur.Username { + cur.Username = p.RepoUsername + changed = true + } + if p.RepoPassword != "" && p.RepoPassword != cur.Password { + cur.Password = p.RepoPassword + changed = true + } + if changed { + if err := d.secrets.SaveAdmin(cur); err != nil { + slog.Error("ws agent: persist admin secrets", "err", err) + return nil + } + slog.Info("ws agent: admin credentials updated via config.update") + } + default: + slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot) } case api.MsgAgentUpdateAvail: @@ -318,6 +355,49 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc } slog.Info("agent: forget job complete", "job_id", p.JobID) }() + case api.JobPrune: + // Prune may require admin creds (delete authority on rest-server). + runCreds := creds + if p.RequiresAdminCreds { + ac, err := d.secrets.LoadAdmin() + if err != nil { + return fmt.Errorf("prune: admin creds not configured (server didn't push them): %w", err) + } + if ac.Empty() { + return fmt.Errorf("prune: admin creds incomplete") + } + runCreds = ac + } + prr := runner.New(runner.Config{ + ResticBin: d.resticBin, + RepoURL: runCreds.URL, + RepoUsername: runCreds.Username, + RepoPassword: runCreds.Password, + }, tx, time.Second) + slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds) + go func() { + if err := prr.RunPrune(ctx, p.JobID); err != nil { + slog.Warn("agent: prune job failed", "job_id", p.JobID, "err", err) + } + }() + case api.JobCheck: + subset := 0 + if len(p.Args) > 0 { + subset, _ = strconv.Atoi(p.Args[0]) + } + slog.Info("agent: accepting check job", "job_id", p.JobID, "subset_pct", subset) + go func() { + if err := r.RunCheck(ctx, p.JobID, subset); err != nil { + slog.Warn("agent: check job failed", "job_id", p.JobID, "err", err) + } + }() + case api.JobUnlock: + slog.Info("agent: accepting unlock job", "job_id", p.JobID) + go func() { + if err := r.RunUnlock(ctx, p.JobID); err != nil { + slog.Warn("agent: unlock job failed", "job_id", p.JobID, "err", err) + } + }() default: return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind) } diff --git a/internal/agent/runner/runner.go b/internal/agent/runner/runner.go index dc6763c..423af45 100644 --- a/internal/agent/runner/runner.go +++ b/internal/agent/runner/runner.go @@ -51,24 +51,70 @@ func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner { return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod} } -// RunBackup executes a backup job and reports back via the sender. -// Returns nil on a clean (or "incomplete-but-snapshot-created") finish. -func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error { - startedAt := time.Now().UTC() - - startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{ - JobID: jobID, Kind: api.JobBackup, StartedAt: startedAt, - }) - if err := r.tx.Send(startEnv); err != nil { - slog.Warn("runner: send job.started", "err", err) - } - - env := restic.Env{ +// resticEnv builds the shared restic.Env from r.cfg. +func (r *Runner) resticEnv() restic.Env { + return restic.Env{ Bin: r.cfg.ResticBin, RepoURL: r.cfg.RepoURL, RepoUsername: r.cfg.RepoUsername, RepoPassword: r.cfg.RepoPassword, } +} + +// sendStarted ships a job.started envelope. +func (r *Runner) sendStarted(jobID string, kind api.JobKind, startedAt time.Time) { + env, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{ + JobID: jobID, Kind: kind, StartedAt: startedAt, + }) + if err := r.tx.Send(env); err != nil { + slog.Warn("runner: send job.started", "job_id", jobID, "kind", kind, "err", err) + } +} + +// streamHandler returns a LineHandler that ships log.stream envelopes. +func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandler { + return func(stream string, line string, _ any) { + now := time.Now().UTC() + logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ + JobID: jobID, + Seq: seq.Add(1), + TS: now, + Stream: api.LogStream(stream), + Payload: line, + }) + _ = r.tx.Send(logEnv) + } +} + +// sendFinished ships a job.finished envelope. err==nil → succeeded; +// otherwise failed. statsBlob is forwarded as JobFinishedPayload.Stats. +func (r *Runner) sendFinished(jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) { + status := api.JobSucceeded + exit := 0 + errMsg := "" + if err != nil { + status = api.JobFailed + exit = -1 + errMsg = err.Error() + } + finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ + JobID: jobID, + Status: status, + ExitCode: exit, + FinishedAt: finishedAt, + Stats: statsBlob, + Error: errMsg, + }) + _ = r.tx.Send(finEnv) +} + +// RunBackup executes a backup job and reports back via the sender. +// Returns nil on a clean (or "incomplete-but-snapshot-created") finish. +func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error { + startedAt := time.Now().UTC() + r.sendStarted(jobID, api.JobBackup, startedAt) + + env := r.resticEnv() var seq atomic.Int64 lastProgress := time.Now() @@ -115,27 +161,11 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t summary, err := env.RunBackup(ctx, paths, excludes, tags, handle) finishedAt := time.Now().UTC() - status := api.JobSucceeded - exit := 0 - errMsg := "" - if err != nil { - status = api.JobFailed - exit = -1 - errMsg = err.Error() - } var statsBlob json.RawMessage if summary != nil { statsBlob, _ = json.Marshal(summary) } - finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ - JobID: jobID, - Status: status, - ExitCode: exit, - FinishedAt: finishedAt, - Stats: statsBlob, - Error: errMsg, - }) - _ = r.tx.Send(finEnv) + r.sendFinished(jobID, finishedAt, err, statsBlob) // On a successful backup, refresh the server's snapshot projection. // We do this *after* job.finished so the UI sees the job land first; @@ -147,6 +177,9 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t if rerr := r.reportSnapshots(ctx, env); rerr != nil { slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr) } + if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{}); rerr != nil { + slog.Warn("runner: stats.report after backup failed", "job_id", jobID, "err", rerr) + } } if err != nil { @@ -160,52 +193,13 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t // browser-side log viewer just works. func (r *Runner) RunInit(ctx context.Context, jobID string) error { startedAt := time.Now().UTC() - startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{ - JobID: jobID, Kind: api.JobInit, StartedAt: startedAt, - }) - if err := r.tx.Send(startEnv); err != nil { - slog.Warn("runner: send job.started (init)", "err", err) - } - - env := restic.Env{ - Bin: r.cfg.ResticBin, - RepoURL: r.cfg.RepoURL, - RepoUsername: r.cfg.RepoUsername, - RepoPassword: r.cfg.RepoPassword, - } + r.sendStarted(jobID, api.JobInit, startedAt) + env := r.resticEnv() var seq atomic.Int64 - handle := func(stream string, line string, _ any) { - now := time.Now().UTC() - logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ - JobID: jobID, - Seq: seq.Add(1), - TS: now, - Stream: api.LogStream(stream), - Payload: line, - }) - _ = r.tx.Send(logEnv) - } - - err := env.RunInit(ctx, handle) + err := env.RunInit(ctx, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() - - status := api.JobSucceeded - exit := 0 - errMsg := "" - if err != nil { - status = api.JobFailed - exit = -1 - errMsg = err.Error() - } - finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ - JobID: jobID, - Status: status, - ExitCode: exit, - FinishedAt: finishedAt, - Error: errMsg, - }) - _ = r.tx.Send(finEnv) + r.sendFinished(jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner init: %w", err) } @@ -219,52 +213,13 @@ func (r *Runner) RunInit(ctx context.Context, jobID string) error { // snapshot index — the host's snapshot list shrinks). func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.ForgetPolicy) error { startedAt := time.Now().UTC() - startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{ - JobID: jobID, Kind: api.JobForget, StartedAt: startedAt, - }) - if err := r.tx.Send(startEnv); err != nil { - slog.Warn("runner: send job.started (forget)", "err", err) - } - - env := restic.Env{ - Bin: r.cfg.ResticBin, - RepoURL: r.cfg.RepoURL, - RepoUsername: r.cfg.RepoUsername, - RepoPassword: r.cfg.RepoPassword, - } + r.sendStarted(jobID, api.JobForget, startedAt) + env := r.resticEnv() var seq atomic.Int64 - handle := func(stream string, line string, _ any) { - now := time.Now().UTC() - logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ - JobID: jobID, - Seq: seq.Add(1), - TS: now, - Stream: api.LogStream(stream), - Payload: line, - }) - _ = r.tx.Send(logEnv) - } - - err := env.RunForget(ctx, policy, handle) + err := env.RunForget(ctx, policy, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() - - status := api.JobSucceeded - exit := 0 - errMsg := "" - if err != nil { - status = api.JobFailed - exit = -1 - errMsg = err.Error() - } - finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ - JobID: jobID, - Status: status, - ExitCode: exit, - FinishedAt: finishedAt, - Error: errMsg, - }) - _ = r.tx.Send(finEnv) + r.sendFinished(jobID, finishedAt, err, nil) // Refresh the server's snapshot projection — forget rewrites the // index so the host's snapshot list almost certainly shrunk. @@ -281,6 +236,125 @@ func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.Forg return nil } +// RunPrune executes a prune job against the configured repo. On +// success it ships a repo.stats envelope with LastPruneAt set (plus +// a full size refresh via RunStats) before the job.finished envelope, +// so the UI can display updated size information alongside the +// completed job. On failure no stats refresh is attempted. +func (r *Runner) RunPrune(ctx context.Context, jobID string) error { + startedAt := time.Now().UTC() + r.sendStarted(jobID, api.JobPrune, startedAt) + + env := r.resticEnv() + var seq atomic.Int64 + err := env.RunPrune(ctx, r.streamHandler(jobID, &seq)) + finishedAt := time.Now().UTC() + + if err == nil { + pruneAt := finishedAt + if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{LastPruneAt: &pruneAt}); rerr != nil { + slog.Warn("runner: stats.report after prune failed", "job_id", jobID, "err", rerr) + } + } + + r.sendFinished(jobID, finishedAt, err, nil) + + if err != nil { + return fmt.Errorf("runner prune: %w", err) + } + return nil +} + +// RunCheck executes a `restic check` job. Always ships a repo.stats +// envelope (success or failure) with LastCheckAt, LastCheckStatus, +// and LockPresent populated from the check result. +func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) error { + startedAt := time.Now().UTC() + r.sendStarted(jobID, api.JobCheck, startedAt) + + env := r.resticEnv() + var seq atomic.Int64 + res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq)) + finishedAt := time.Now().UTC() + r.sendFinished(jobID, finishedAt, err, nil) + + // Determine check status string. + checkStatus := "ok" + if err != nil { + checkStatus = "failed" + } else if res.ErrorsFound { + checkStatus = "errors_found" + } + + lockPresent := res.LockPresent + now := finishedAt + patch := api.RepoStatsPayload{ + LastCheckAt: &now, + LastCheckStatus: checkStatus, + LockPresent: &lockPresent, + } + if rerr := r.reportStats(ctx, env, patch); rerr != nil { + slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr) + } + + if err != nil { + return fmt.Errorf("runner check: %w", err) + } + return nil +} + +// RunUnlock executes a `restic unlock` job. On success it ships a +// repo.stats envelope with LockPresent=false so the UI banner clears. +func (r *Runner) RunUnlock(ctx context.Context, jobID string) error { + startedAt := time.Now().UTC() + r.sendStarted(jobID, api.JobUnlock, startedAt) + + env := r.resticEnv() + var seq atomic.Int64 + err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq)) + finishedAt := time.Now().UTC() + r.sendFinished(jobID, finishedAt, err, nil) + + if err == nil { + lockFalse := false + patch := api.RepoStatsPayload{LockPresent: &lockFalse} + if rerr := r.reportStats(ctx, env, patch); rerr != nil { + slog.Warn("runner: stats.report after unlock failed", "job_id", jobID, "err", rerr) + } + } + + if err != nil { + return fmt.Errorf("runner unlock: %w", err) + } + return nil +} + +// reportStats ships a repo.stats envelope. If the patch doesn't +// already include size fields, fills them in by invoking env.RunStats. +// Errors from RunStats are non-fatal — the patch is shipped anyway +// with whatever the caller did populate. +func (r *Runner) reportStats(ctx context.Context, env restic.Env, patch api.RepoStatsPayload) error { + listCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + if patch.TotalSizeBytes == nil { + if s, err := env.RunStats(listCtx, nil); err == nil { + total := s.TotalSize + raw := s.TotalUncompressed + files := s.TotalFileCount + snaps := s.SnapshotsCount + patch.TotalSizeBytes = &total + patch.RawSizeBytes = &raw + patch.UniqueFiles = &files + patch.SnapshotCount = &snaps + } + } + envOut, err := api.Marshal(api.MsgRepoStats, "", patch) + if err != nil { + return err + } + return r.tx.Send(envOut) +} + // reportSnapshots calls `restic snapshots --json`, translates the // payload into the wire shape, and ships it as a snapshots.report // envelope. Bounded by a separate timeout so a sluggish repo doesn't diff --git a/internal/agent/runner/runner_test.go b/internal/agent/runner/runner_test.go new file mode 100644 index 0000000..147b9ec --- /dev/null +++ b/internal/agent/runner/runner_test.go @@ -0,0 +1,261 @@ +package runner + +import ( + "context" + "os" + "path/filepath" + "testing" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/restic" +) + +// fakeSender collects sent envelopes for assertions. +type fakeSender struct{ envs []api.Envelope } + +func (s *fakeSender) Send(e api.Envelope) error { + s.envs = append(s.envs, e) + return nil +} + +// setupScript writes a shell script (without shebang) to a temp dir, +// names it "restic", makes it executable, and returns the path. +func setupScript(t *testing.T, body string) string { + t.Helper() + dir := t.TempDir() + p := filepath.Join(dir, "restic") + if err := os.WriteFile(p, []byte("#!/bin/sh\n"+body+"\n"), 0o755); err != nil { + t.Fatalf("setupScript: %v", err) + } + return p +} + +// firstEnvOfType returns the first envelope with the given type, or +// fails the test if none is found. +func firstEnvOfType(t *testing.T, envs []api.Envelope, mt api.MessageType) api.Envelope { + t.Helper() + for _, e := range envs { + if e.Type == mt { + return e + } + } + t.Fatalf("no envelope of type %q found in %d envelopes", mt, len(envs)) + return api.Envelope{} +} + +// envelopeOrder returns the message types of all sent envelopes. +func envelopeOrder(envs []api.Envelope) []api.MessageType { + out := make([]api.MessageType, len(envs)) + for i, e := range envs { + out[i] = e.Type + } + return out +} + +// TestRunPruneShipsExpectedEnvelopes drives RunPrune with a fake +// binary that prints "prune" on stdout (for the log.stream envelope) +// and emits valid stats JSON so reportStats can populate size fields. +// Expected sequence: job.started → log.stream → repo.stats → job.finished. +func TestRunPruneShipsExpectedEnvelopes(t *testing.T) { + t.Parallel() + + // The fake "restic" handles both "prune" and "stats --json" calls. + statsJSON := `{"total_size":1000,"total_uncompressed_size":2000,"snapshots_count":3,"total_file_count":10}` + bin := setupScript(t, ` +case "$1" in + prune) echo "prune" ;; + stats) echo '`+statsJSON+`' ;; + *) echo "unknown: $*" ;; +esac +`) + + tx := &fakeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + if err := r.RunPrune(context.Background(), "job-1"); err != nil { + t.Fatalf("RunPrune: %v", err) + } + + order := envelopeOrder(tx.envs) + // Confirm landmark envelope types appear in the required order. + wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgLogStream, api.MsgRepoStats, api.MsgJobFinished} + positions := map[api.MessageType]int{} + for i, mt := range order { + if _, seen := positions[mt]; !seen { + positions[mt] = i + } + } + for i := 0; i < len(wantTypes)-1; i++ { + a, b := wantTypes[i], wantTypes[i+1] + pa, aOK := positions[a] + pb, bOK := positions[b] + if !aOK { + t.Errorf("envelope type %q not found in output %v", a, order) + continue + } + if !bOK { + t.Errorf("envelope type %q not found in output %v", b, order) + continue + } + if pa >= pb { + t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order) + } + } + + // The repo.stats payload must have LastPruneAt set. + statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats) + var statsPayload api.RepoStatsPayload + if err := statsEnv.UnmarshalPayload(&statsPayload); err != nil { + t.Fatalf("unmarshal repo.stats payload: %v", err) + } + if statsPayload.LastPruneAt == nil { + t.Error("expected LastPruneAt to be set in repo.stats after prune") + } + + // The job.finished payload must indicate success. + finEnv := firstEnvOfType(t, tx.envs, api.MsgJobFinished) + var finPayload api.JobFinishedPayload + if err := finEnv.UnmarshalPayload(&finPayload); err != nil { + t.Fatalf("unmarshal job.finished payload: %v", err) + } + if finPayload.Status != api.JobSucceeded { + t.Errorf("expected job.finished status=%q, got %q", api.JobSucceeded, finPayload.Status) + } +} + +// TestRunCheckShipsCheckStatus verifies that a check run which emits +// a stale-lock line on stderr (exit 0) reports LastCheckStatus="ok" +// and LockPresent=true. +func TestRunCheckShipsCheckStatus(t *testing.T) { + t.Parallel() + + statsJSON := `{"total_size":500,"total_uncompressed_size":600,"snapshots_count":1,"total_file_count":5}` + bin := setupScript(t, ` +case "$1" in + check) echo "Found stale lock" >&2; exit 0 ;; + stats) echo '`+statsJSON+`' ;; + *) exit 0 ;; +esac +`) + + tx := &fakeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + if err := r.RunCheck(context.Background(), "job-2", 0); err != nil { + t.Fatalf("RunCheck: %v", err) + } + + statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats) + var p api.RepoStatsPayload + if err := statsEnv.UnmarshalPayload(&p); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if p.LastCheckStatus != "ok" { + t.Errorf("LastCheckStatus: got %q, want %q", p.LastCheckStatus, "ok") + } + if p.LockPresent == nil || !*p.LockPresent { + t.Errorf("expected LockPresent=true, got %v", p.LockPresent) + } + if p.LastCheckAt == nil { + t.Error("expected LastCheckAt to be set") + } +} + +// TestRunCheckErrorsFoundShipsErrorsStatus verifies that a check run +// that exits 1 (errors found) reports LastCheckStatus="errors_found". +func TestRunCheckErrorsFoundShipsErrorsStatus(t *testing.T) { + t.Parallel() + + statsJSON := `{"total_size":500,"total_uncompressed_size":600,"snapshots_count":1,"total_file_count":5}` + bin := setupScript(t, ` +case "$1" in + check) exit 1 ;; + stats) echo '`+statsJSON+`' ;; + *) exit 0 ;; +esac +`) + + tx := &fakeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + // RunCheck returns nil for exit 1 (errors_found is not a wrapper failure). + if err := r.RunCheck(context.Background(), "job-3", 0); err != nil { + t.Fatalf("RunCheck: %v", err) + } + + statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats) + var p api.RepoStatsPayload + if err := statsEnv.UnmarshalPayload(&p); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if p.LastCheckStatus != "errors_found" { + t.Errorf("LastCheckStatus: got %q, want %q", p.LastCheckStatus, "errors_found") + } +} + +// TestRunUnlockClearsLock verifies that a successful unlock ships a +// repo.stats envelope with LockPresent=false. +func TestRunUnlockClearsLock(t *testing.T) { + t.Parallel() + + statsJSON := `{"total_size":100,"total_uncompressed_size":150,"snapshots_count":2,"total_file_count":8}` + bin := setupScript(t, ` +case "$1" in + unlock) echo "removed 1 locks" ;; + stats) echo '`+statsJSON+`' ;; + *) exit 0 ;; +esac +`) + + tx := &fakeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + if err := r.RunUnlock(context.Background(), "job-4"); err != nil { + t.Fatalf("RunUnlock: %v", err) + } + + statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats) + var p api.RepoStatsPayload + if err := statsEnv.UnmarshalPayload(&p); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if p.LockPresent == nil { + t.Fatal("expected LockPresent to be set (non-nil)") + } + if *p.LockPresent { + t.Errorf("expected LockPresent=false after unlock, got true") + } +} + +// TestRunInitShipsStartedAndFinished confirms the refactored RunInit +// still produces job.started and job.finished envelopes. +func TestRunInitShipsStartedAndFinished(t *testing.T) { + t.Parallel() + bin := setupScript(t, `echo "initialized repository"`) + tx := &fakeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + if err := r.RunInit(context.Background(), "job-init"); err != nil { + t.Fatalf("RunInit: %v", err) + } + _ = firstEnvOfType(t, tx.envs, api.MsgJobStarted) + _ = firstEnvOfType(t, tx.envs, api.MsgJobFinished) +} + +// TestRunForgetShipsStartedAndFinished confirms the refactored +// RunForget still produces job.started and job.finished envelopes. +func TestRunForgetShipsStartedAndFinished(t *testing.T) { + t.Parallel() + // Script handles both "forget --json ..." and "snapshots --json" calls. + bin := setupScript(t, ` +case "$1" in + forget) echo "[]" ;; + snapshots) echo "[]" ;; + *) exit 0 ;; +esac +`) + tx := &fakeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + keepLast := 1 + policy := restic.ForgetPolicy{KeepLast: &keepLast} + if err := r.RunForget(context.Background(), "job-forget", policy); err != nil { + t.Fatalf("RunForget: %v", err) + } + _ = firstEnvOfType(t, tx.envs, api.MsgJobStarted) + _ = firstEnvOfType(t, tx.envs, api.MsgJobFinished) +}