e871b05b38
CI / Test (linux/amd64) (pull_request) Successful in 34s
CI / Lint (pull_request) Failing after 16s
CI / Build (windows/amd64) (pull_request) Successful in 22s
CI / Build (linux/amd64) (pull_request) Successful in 20s
CI / Build (linux/arm64) (pull_request) Successful in 21s
Cleanup pass over the repo so CI can enforce lint going forward
without the only-new-issues escape hatch:
* gofumpt -w across the tree (31 hits, all formatting)
* misspell --fix (25 hits, US-locale spelling) — but reverted on
api.JobCancelled = "cancelled" since that literal is the wire +
DB CHECK constraint value, plus matched the case in store/fleet.go
back to "cancelled" and added //nolint:misspell on both for the
next time someone reaches for the auto-fix
* Wrap every `defer rows.Close()` / `defer stmt.Close()` /
`defer res.Body.Close()` in `defer func() { _ = .Close() }()`
to satisfy errcheck without losing the close itself
* websocket.Dial callers (1 prod, 4 tests) now capture + close the
upgrade response Body — coder/websocket can return res with a nil
Body on success, so the test deferred-closes guard against that
* Annotate the two genuine-by-design nilerr cases with //nolint
comments explaining why nil-on-error is the contract (cookie
missing = no session; ctx cancelled mid-backoff = clean shutdown)
* Add brief godoc on the 10 exported const groups + types that
revive flagged (api.HostOS/HostArch/JobKind/JobStatus/LogStream/
ErrorCode, restic.EventKind, store.Role, web.FS)
* Drop the unused (*Server).userByID method
* Inline the unparam baseView(active) — every UI page is under
the dashboard primary nav today
Result: `golangci-lint run ./...` reports 0 issues. CI lint job
no longer needs only-new-issues: true; X-06 follow-up entry in
tasks.md removed.
325 lines
9.1 KiB
Go
325 lines
9.1 KiB
Go
// Package runner spawns restic processes for the agent. It owns one
|
|
// Run() invocation per command.run; concurrency limits live a layer
|
|
// up (the WS handler).
|
|
package runner
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
|
|
)
|
|
|
|
// Sender is the agent's outbound message channel. Provided by
|
|
// wsclient so the runner can push job.started / job.progress /
|
|
// job.finished / log.stream back to the server.
|
|
type Sender interface {
|
|
Send(env api.Envelope) error
|
|
}
|
|
|
|
// Config bundles the long-lived settings the runner needs. They come
|
|
// from the agent's config file (server-pushed config.update payloads
|
|
// override these in memory).
|
|
type Config struct {
|
|
ResticBin string
|
|
RepoURL string
|
|
RepoUsername string
|
|
RepoPassword string
|
|
}
|
|
|
|
// Runner owns the restic invocations.
|
|
type Runner struct {
|
|
cfg Config
|
|
tx Sender
|
|
|
|
// progress throttling: we receive a status event from restic
|
|
// every ~100ms; the UI doesn't need anywhere near that rate.
|
|
// Cap WS sends to one per N (configurable; default 1s).
|
|
progressMinPeriod time.Duration
|
|
}
|
|
|
|
// New builds a Runner. progressMinPeriod = 0 uses the default 1s.
|
|
func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
|
|
if progressMinPeriod <= 0 {
|
|
progressMinPeriod = time.Second
|
|
}
|
|
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
|
|
}
|
|
|
|
// RunBackup executes a backup job and reports back via the sender.
|
|
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
|
|
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
|
|
startedAt := time.Now().UTC()
|
|
|
|
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
|
JobID: jobID, Kind: api.JobBackup, StartedAt: startedAt,
|
|
})
|
|
if err := r.tx.Send(startEnv); err != nil {
|
|
slog.Warn("runner: send job.started", "err", err)
|
|
}
|
|
|
|
env := restic.Env{
|
|
Bin: r.cfg.ResticBin,
|
|
RepoURL: r.cfg.RepoURL,
|
|
RepoUsername: r.cfg.RepoUsername,
|
|
RepoPassword: r.cfg.RepoPassword,
|
|
}
|
|
|
|
var seq atomic.Int64
|
|
lastProgress := time.Now()
|
|
|
|
handle := func(stream string, line string, ev any) {
|
|
// Throttled progress events come from restic's `status` JSON.
|
|
// We deliberately do NOT forward the raw status line to
|
|
// log.stream — it's emitted ~every 16ms by restic --json and
|
|
// would drown the live log in dupes for any short backup. The
|
|
// progress widget already covers the same information at a
|
|
// sane sample rate.
|
|
status, isStatus := ev.(restic.BackupStatus)
|
|
if !isStatus {
|
|
now := time.Now().UTC()
|
|
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
|
JobID: jobID,
|
|
Seq: seq.Add(1),
|
|
TS: now,
|
|
Stream: api.LogStream(stream),
|
|
Payload: line,
|
|
})
|
|
_ = r.tx.Send(logEnv)
|
|
}
|
|
|
|
if isStatus {
|
|
if time.Since(lastProgress) < r.progressMinPeriod {
|
|
return
|
|
}
|
|
lastProgress = time.Now()
|
|
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
|
|
JobID: jobID,
|
|
PercentDone: status.PercentDone,
|
|
FilesDone: status.FilesDone,
|
|
TotalFiles: status.TotalFiles,
|
|
BytesDone: status.BytesDone,
|
|
TotalBytes: status.TotalBytes,
|
|
ETASeconds: status.SecondsRem,
|
|
ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed),
|
|
})
|
|
_ = r.tx.Send(progEnv)
|
|
}
|
|
}
|
|
|
|
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
|
|
finishedAt := time.Now().UTC()
|
|
|
|
status := api.JobSucceeded
|
|
exit := 0
|
|
errMsg := ""
|
|
if err != nil {
|
|
status = api.JobFailed
|
|
exit = -1
|
|
errMsg = err.Error()
|
|
}
|
|
var statsBlob json.RawMessage
|
|
if summary != nil {
|
|
statsBlob, _ = json.Marshal(summary)
|
|
}
|
|
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
|
JobID: jobID,
|
|
Status: status,
|
|
ExitCode: exit,
|
|
FinishedAt: finishedAt,
|
|
Stats: statsBlob,
|
|
Error: errMsg,
|
|
})
|
|
_ = r.tx.Send(finEnv)
|
|
|
|
// On a successful backup, refresh the server's snapshot projection.
|
|
// We do this *after* job.finished so the UI sees the job land first;
|
|
// the snapshot list is a follow-up that the host detail page polls
|
|
// or the dashboard sees on its next refresh. A failure here is
|
|
// logged but doesn't fail the job — the next successful backup will
|
|
// catch the projection up.
|
|
if err == nil {
|
|
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
|
|
slog.Warn("runner: snapshots.report failed", "job_id", jobID, "err", rerr)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner backup: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunInit executes a repo-init job and reports back via the sender.
|
|
// Returns nil on success. Same envelope shape as RunBackup so the
|
|
// browser-side log viewer just works.
|
|
func (r *Runner) RunInit(ctx context.Context, jobID string) error {
|
|
startedAt := time.Now().UTC()
|
|
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
|
JobID: jobID, Kind: api.JobInit, StartedAt: startedAt,
|
|
})
|
|
if err := r.tx.Send(startEnv); err != nil {
|
|
slog.Warn("runner: send job.started (init)", "err", err)
|
|
}
|
|
|
|
env := restic.Env{
|
|
Bin: r.cfg.ResticBin,
|
|
RepoURL: r.cfg.RepoURL,
|
|
RepoUsername: r.cfg.RepoUsername,
|
|
RepoPassword: r.cfg.RepoPassword,
|
|
}
|
|
|
|
var seq atomic.Int64
|
|
handle := func(stream string, line string, _ any) {
|
|
now := time.Now().UTC()
|
|
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
|
JobID: jobID,
|
|
Seq: seq.Add(1),
|
|
TS: now,
|
|
Stream: api.LogStream(stream),
|
|
Payload: line,
|
|
})
|
|
_ = r.tx.Send(logEnv)
|
|
}
|
|
|
|
err := env.RunInit(ctx, handle)
|
|
finishedAt := time.Now().UTC()
|
|
|
|
status := api.JobSucceeded
|
|
exit := 0
|
|
errMsg := ""
|
|
if err != nil {
|
|
status = api.JobFailed
|
|
exit = -1
|
|
errMsg = err.Error()
|
|
}
|
|
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
|
JobID: jobID,
|
|
Status: status,
|
|
ExitCode: exit,
|
|
FinishedAt: finishedAt,
|
|
Error: errMsg,
|
|
})
|
|
_ = r.tx.Send(finEnv)
|
|
if err != nil {
|
|
return fmt.Errorf("runner init: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunForget executes a forget job against the configured repo with
|
|
// the given retention policy. Same envelope shape as RunBackup so
|
|
// the live log viewer + job lifecycle work without special-casing.
|
|
// On success refreshes the snapshot projection (forget rewrites the
|
|
// snapshot index — the host's snapshot list shrinks).
|
|
func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.ForgetPolicy) error {
|
|
startedAt := time.Now().UTC()
|
|
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
|
JobID: jobID, Kind: api.JobForget, StartedAt: startedAt,
|
|
})
|
|
if err := r.tx.Send(startEnv); err != nil {
|
|
slog.Warn("runner: send job.started (forget)", "err", err)
|
|
}
|
|
|
|
env := restic.Env{
|
|
Bin: r.cfg.ResticBin,
|
|
RepoURL: r.cfg.RepoURL,
|
|
RepoUsername: r.cfg.RepoUsername,
|
|
RepoPassword: r.cfg.RepoPassword,
|
|
}
|
|
|
|
var seq atomic.Int64
|
|
handle := func(stream string, line string, _ any) {
|
|
now := time.Now().UTC()
|
|
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
|
JobID: jobID,
|
|
Seq: seq.Add(1),
|
|
TS: now,
|
|
Stream: api.LogStream(stream),
|
|
Payload: line,
|
|
})
|
|
_ = r.tx.Send(logEnv)
|
|
}
|
|
|
|
err := env.RunForget(ctx, policy, handle)
|
|
finishedAt := time.Now().UTC()
|
|
|
|
status := api.JobSucceeded
|
|
exit := 0
|
|
errMsg := ""
|
|
if err != nil {
|
|
status = api.JobFailed
|
|
exit = -1
|
|
errMsg = err.Error()
|
|
}
|
|
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
|
JobID: jobID,
|
|
Status: status,
|
|
ExitCode: exit,
|
|
FinishedAt: finishedAt,
|
|
Error: errMsg,
|
|
})
|
|
_ = r.tx.Send(finEnv)
|
|
|
|
// Refresh the server's snapshot projection — forget rewrites the
|
|
// index so the host's snapshot list almost certainly shrunk.
|
|
if err == nil {
|
|
if rerr := r.reportSnapshots(ctx, env); rerr != nil {
|
|
slog.Warn("runner: snapshots.report after forget failed",
|
|
"job_id", jobID, "err", rerr)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("runner forget: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// reportSnapshots calls `restic snapshots --json`, translates the
|
|
// payload into the wire shape, and ships it as a snapshots.report
|
|
// envelope. Bounded by a separate timeout so a sluggish repo doesn't
|
|
// hang the runner forever; restic snapshots is normally sub-second.
|
|
func (r *Runner) reportSnapshots(ctx context.Context, env restic.Env) error {
|
|
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
snaps, err := env.ListSnapshots(listCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
out := make([]api.Snapshot, len(snaps))
|
|
for i, s := range snaps {
|
|
out[i] = api.Snapshot{
|
|
ID: s.ID,
|
|
ShortID: s.ShortID,
|
|
Time: s.Time.UTC(),
|
|
Hostname: s.Hostname,
|
|
Paths: s.Paths,
|
|
Tags: s.Tags,
|
|
}
|
|
if s.Summary != nil {
|
|
out[i].SizeBytes = s.Summary.TotalBytesProcessed
|
|
out[i].FileCount = s.Summary.TotalFilesProcessed
|
|
}
|
|
}
|
|
envOut, err := api.Marshal(api.MsgSnapshotsRpt, "", api.SnapshotsReportPayload{
|
|
Snapshots: out,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.tx.Send(envOut)
|
|
}
|
|
|
|
func throughput(bytesDone, secondsElapsed int64) int64 {
|
|
if secondsElapsed <= 0 {
|
|
return 0
|
|
}
|
|
return bytesDone / secondsElapsed
|
|
}
|