agent: RunPrune/RunCheck/RunUnlock + reportStats + admin-cred slot dispatch
Extract resticEnv/sendStarted/streamHandler/sendFinished helpers to remove boilerplate duplication across Run* methods. Add RunPrune (ships repo.stats with LastPruneAt before job.finished), RunCheck (ships stats with LastCheckStatus/LockPresent regardless of outcome), RunUnlock (ships LockPresent=false on success), and reportStats (fills size fields via RunStats when caller didn't populate them). Wire JobPrune/JobCheck/JobUnlock into the dispatcher switch; teach MsgConfigUpdate about the Slot discriminator for admin vs repo creds; add strconv import for subset-pct parsing.
This commit is contained in:
+190
-116
@@ -51,24 +51,70 @@ func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
|
||||
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
|
||||
}
|
||||
|
||||
// RunBackup executes a backup job and reports back via the sender.
|
||||
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
|
||||
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
|
||||
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
||||
JobID: jobID, Kind: api.JobBackup, StartedAt: startedAt,
|
||||
})
|
||||
if err := r.tx.Send(startEnv); err != nil {
|
||||
slog.Warn("runner: send job.started", "err", err)
|
||||
}
|
||||
|
||||
env := restic.Env{
|
||||
// resticEnv builds the shared restic.Env from r.cfg.
|
||||
func (r *Runner) resticEnv() restic.Env {
|
||||
return restic.Env{
|
||||
Bin: r.cfg.ResticBin,
|
||||
RepoURL: r.cfg.RepoURL,
|
||||
RepoUsername: r.cfg.RepoUsername,
|
||||
RepoPassword: r.cfg.RepoPassword,
|
||||
}
|
||||
}
|
||||
|
||||
// sendStarted ships a job.started envelope.
|
||||
func (r *Runner) sendStarted(jobID string, kind api.JobKind, startedAt time.Time) {
|
||||
env, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
||||
JobID: jobID, Kind: kind, StartedAt: startedAt,
|
||||
})
|
||||
if err := r.tx.Send(env); err != nil {
|
||||
slog.Warn("runner: send job.started", "job_id", jobID, "kind", kind, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// streamHandler returns a LineHandler that ships log.stream envelopes.
|
||||
func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandler {
|
||||
return func(stream string, line string, _ any) {
|
||||
now := time.Now().UTC()
|
||||
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
||||
JobID: jobID,
|
||||
Seq: seq.Add(1),
|
||||
TS: now,
|
||||
Stream: api.LogStream(stream),
|
||||
Payload: line,
|
||||
})
|
||||
_ = r.tx.Send(logEnv)
|
||||
}
|
||||
}
|
||||
|
||||
// sendFinished ships a job.finished envelope. err==nil → succeeded;
|
||||
// otherwise failed. statsBlob is forwarded as JobFinishedPayload.Stats.
|
||||
func (r *Runner) sendFinished(jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) {
|
||||
status := api.JobSucceeded
|
||||
exit := 0
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
status = api.JobFailed
|
||||
exit = -1
|
||||
errMsg = err.Error()
|
||||
}
|
||||
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
ExitCode: exit,
|
||||
FinishedAt: finishedAt,
|
||||
Stats: statsBlob,
|
||||
Error: errMsg,
|
||||
})
|
||||
_ = r.tx.Send(finEnv)
|
||||
}
|
||||
|
||||
// RunBackup executes a backup job and reports back via the sender.
|
||||
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
|
||||
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
r.sendStarted(jobID, api.JobBackup, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
|
||||
var seq atomic.Int64
|
||||
lastProgress := time.Now()
|
||||
@@ -115,27 +161,11 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
|
||||
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
|
||||
finishedAt := time.Now().UTC()
|
||||
|
||||
status := api.JobSucceeded
|
||||
exit := 0
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
status = api.JobFailed
|
||||
exit = -1
|
||||
errMsg = err.Error()
|
||||
}
|
||||
var statsBlob json.RawMessage
|
||||
if summary != nil {
|
||||
statsBlob, _ = json.Marshal(summary)
|
||||
}
|
||||
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
ExitCode: exit,
|
||||
FinishedAt: finishedAt,
|
||||
Stats: statsBlob,
|
||||
Error: errMsg,
|
||||
})
|
||||
_ = r.tx.Send(finEnv)
|
||||
r.sendFinished(jobID, finishedAt, err, statsBlob)
|
||||
|
||||
// On a successful backup, refresh the server's snapshot projection.
|
||||
// We do this *after* job.finished so the UI sees the job land first;
|
||||
@@ -147,6 +177,9 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
|
||||
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
|
||||
slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr)
|
||||
}
|
||||
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{}); rerr != nil {
|
||||
slog.Warn("runner: stats.report after backup failed", "job_id", jobID, "err", rerr)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -160,52 +193,13 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t
|
||||
// browser-side log viewer just works.
|
||||
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
||||
JobID: jobID, Kind: api.JobInit, StartedAt: startedAt,
|
||||
})
|
||||
if err := r.tx.Send(startEnv); err != nil {
|
||||
slog.Warn("runner: send job.started (init)", "err", err)
|
||||
}
|
||||
|
||||
env := restic.Env{
|
||||
Bin: r.cfg.ResticBin,
|
||||
RepoURL: r.cfg.RepoURL,
|
||||
RepoUsername: r.cfg.RepoUsername,
|
||||
RepoPassword: r.cfg.RepoPassword,
|
||||
}
|
||||
r.sendStarted(jobID, api.JobInit, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
handle := func(stream string, line string, _ any) {
|
||||
now := time.Now().UTC()
|
||||
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
||||
JobID: jobID,
|
||||
Seq: seq.Add(1),
|
||||
TS: now,
|
||||
Stream: api.LogStream(stream),
|
||||
Payload: line,
|
||||
})
|
||||
_ = r.tx.Send(logEnv)
|
||||
}
|
||||
|
||||
err := env.RunInit(ctx, handle)
|
||||
err := env.RunInit(ctx, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
|
||||
status := api.JobSucceeded
|
||||
exit := 0
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
status = api.JobFailed
|
||||
exit = -1
|
||||
errMsg = err.Error()
|
||||
}
|
||||
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
ExitCode: exit,
|
||||
FinishedAt: finishedAt,
|
||||
Error: errMsg,
|
||||
})
|
||||
_ = r.tx.Send(finEnv)
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner init: %w", err)
|
||||
}
|
||||
@@ -219,52 +213,13 @@ func (r *Runner) RunInit(ctx context.Context, jobID string) error {
|
||||
// snapshot index — the host's snapshot list shrinks).
|
||||
func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.ForgetPolicy) error {
|
||||
startedAt := time.Now().UTC()
|
||||
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
||||
JobID: jobID, Kind: api.JobForget, StartedAt: startedAt,
|
||||
})
|
||||
if err := r.tx.Send(startEnv); err != nil {
|
||||
slog.Warn("runner: send job.started (forget)", "err", err)
|
||||
}
|
||||
|
||||
env := restic.Env{
|
||||
Bin: r.cfg.ResticBin,
|
||||
RepoURL: r.cfg.RepoURL,
|
||||
RepoUsername: r.cfg.RepoUsername,
|
||||
RepoPassword: r.cfg.RepoPassword,
|
||||
}
|
||||
r.sendStarted(jobID, api.JobForget, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
handle := func(stream string, line string, _ any) {
|
||||
now := time.Now().UTC()
|
||||
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
||||
JobID: jobID,
|
||||
Seq: seq.Add(1),
|
||||
TS: now,
|
||||
Stream: api.LogStream(stream),
|
||||
Payload: line,
|
||||
})
|
||||
_ = r.tx.Send(logEnv)
|
||||
}
|
||||
|
||||
err := env.RunForget(ctx, policy, handle)
|
||||
err := env.RunForget(ctx, policy, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
|
||||
status := api.JobSucceeded
|
||||
exit := 0
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
status = api.JobFailed
|
||||
exit = -1
|
||||
errMsg = err.Error()
|
||||
}
|
||||
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
ExitCode: exit,
|
||||
FinishedAt: finishedAt,
|
||||
Error: errMsg,
|
||||
})
|
||||
_ = r.tx.Send(finEnv)
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
// Refresh the server's snapshot projection — forget rewrites the
|
||||
// index so the host's snapshot list almost certainly shrunk.
|
||||
@@ -281,6 +236,125 @@ func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.Forg
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunPrune executes a prune job against the configured repo. On
|
||||
// success it ships a repo.stats envelope with LastPruneAt set (plus
|
||||
// a full size refresh via RunStats) before the job.finished envelope,
|
||||
// so the UI can display updated size information alongside the
|
||||
// completed job. On failure no stats refresh is attempted.
|
||||
func (r *Runner) RunPrune(ctx context.Context, jobID string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
r.sendStarted(jobID, api.JobPrune, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
err := env.RunPrune(ctx, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
|
||||
if err == nil {
|
||||
pruneAt := finishedAt
|
||||
if rerr := r.reportStats(ctx, env, api.RepoStatsPayload{LastPruneAt: &pruneAt}); rerr != nil {
|
||||
slog.Warn("runner: stats.report after prune failed", "job_id", jobID, "err", rerr)
|
||||
}
|
||||
}
|
||||
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner prune: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunCheck executes a `restic check` job. Always ships a repo.stats
|
||||
// envelope (success or failure) with LastCheckAt, LastCheckStatus,
|
||||
// and LockPresent populated from the check result.
|
||||
func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) error {
|
||||
startedAt := time.Now().UTC()
|
||||
r.sendStarted(jobID, api.JobCheck, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
// Determine check status string.
|
||||
checkStatus := "ok"
|
||||
if err != nil {
|
||||
checkStatus = "failed"
|
||||
} else if res.ErrorsFound {
|
||||
checkStatus = "errors_found"
|
||||
}
|
||||
|
||||
lockPresent := res.LockPresent
|
||||
now := finishedAt
|
||||
patch := api.RepoStatsPayload{
|
||||
LastCheckAt: &now,
|
||||
LastCheckStatus: checkStatus,
|
||||
LockPresent: &lockPresent,
|
||||
}
|
||||
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
|
||||
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner check: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunUnlock executes a `restic unlock` job. On success it ships a
|
||||
// repo.stats envelope with LockPresent=false so the UI banner clears.
|
||||
func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
r.sendStarted(jobID, api.JobUnlock, startedAt)
|
||||
|
||||
env := r.resticEnv()
|
||||
var seq atomic.Int64
|
||||
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
if err == nil {
|
||||
lockFalse := false
|
||||
patch := api.RepoStatsPayload{LockPresent: &lockFalse}
|
||||
if rerr := r.reportStats(ctx, env, patch); rerr != nil {
|
||||
slog.Warn("runner: stats.report after unlock failed", "job_id", jobID, "err", rerr)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner unlock: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// reportStats ships a repo.stats envelope. If the patch doesn't
|
||||
// already include size fields, fills them in by invoking env.RunStats.
|
||||
// Errors from RunStats are non-fatal — the patch is shipped anyway
|
||||
// with whatever the caller did populate.
|
||||
func (r *Runner) reportStats(ctx context.Context, env restic.Env, patch api.RepoStatsPayload) error {
|
||||
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
if patch.TotalSizeBytes == nil {
|
||||
if s, err := env.RunStats(listCtx, nil); err == nil {
|
||||
total := s.TotalSize
|
||||
raw := s.TotalUncompressed
|
||||
files := s.TotalFileCount
|
||||
snaps := s.SnapshotsCount
|
||||
patch.TotalSizeBytes = &total
|
||||
patch.RawSizeBytes = &raw
|
||||
patch.UniqueFiles = &files
|
||||
patch.SnapshotCount = &snaps
|
||||
}
|
||||
}
|
||||
envOut, err := api.Marshal(api.MsgRepoStats, "", patch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.tx.Send(envOut)
|
||||
}
|
||||
|
||||
// reportSnapshots calls `restic snapshots --json`, translates the
|
||||
// payload into the wire shape, and ships it as a snapshots.report
|
||||
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
|
||||
|
||||
Reference in New Issue
Block a user