Files
restic-manager/cmd/agent/main.go
T
steve 9fa2ef48f0 P3-X1: cancel-job feature
Wires the existing job_detail Cancel button (which was a UI stub) into
real backend behaviour:

- internal/api already declared MsgCommandCancel + CommandCancelPayload;
  promote those from forward-declarations to a working envelope. Agent
  side: cmd/agent/main.go drops the TODO-stub and gains a per-job
  ctx.CancelFunc map. runJob's switch is refactored around a small
  spawn() helper so each kind's goroutine derives a per-job context,
  registers the cancel, and removes itself on completion regardless of
  outcome. command.cancel looks up the func and fires it.
- internal/agent/runner.sendFinished now takes ctx and rebadges
  ctx.Canceled errors as JobCancelled (exit 130) rather than
  JobFailed. All Run* call sites updated.
- internal/restic.resticCmd sets cmd.Cancel to send SIGTERM (via
  build-tagged sigterm constant; os.Kill on Windows since SIGTERM
  isn't deliverable there) and cmd.WaitDelay=5s for the SIGKILL
  fallback. SIGTERM lets restic remove its lock file before exiting.
- New POST /api/jobs/{id}/cancel server endpoint validates the job
  is non-terminal and the host is online, sends command.cancel via
  the hub, writes a job.cancel audit row, returns 202. The agent's
  resulting job.finished (status=cancelled) is what actually
  transitions the row.

Tests:
- internal/server/http/cancel_test.go covers happy path (envelope
  shape + audit row), 409 for terminal jobs, 404 for missing jobs,
  503 for offline hosts.
- internal/agent/runner/cancel_test.go covers cancel mid-run: a fake
  restic that exec'd into 'sleep 30' is canceled 150ms after start
  and the resulting job.finished reports JobCancelled with exit 130
  in well under the WaitDelay.

Foundational for P3 restore (operator needs to be able to cancel a
running backup if they need to restore urgently). Independently useful
for prune/check/backup that are stuck.
2026-05-04 15:11:49 +01:00

573 lines
18 KiB
Go

package main
import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/scheduler"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/secrets"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/service"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
)
var version = "dev"
func main() {
if err := run(); err != nil {
slog.Error("agent fatal", "err", err)
os.Exit(1)
}
}
func run() error {
// Optional first positional verb for SCM control on Windows.
// `restic-manager-agent install|uninstall|start|stop` route into
// the service package; everything else falls through to the
// flag-driven default (which is what systemd / interactive runs
// hit). On non-Windows builds these verbs return a clear error.
if len(os.Args) > 1 {
switch os.Args[1] {
case "install":
return service.Install()
case "uninstall":
return service.Uninstall()
case "start":
return service.Start()
case "stop":
return service.Stop()
case "run":
// Strip the verb so flag.Parse sees the rest unchanged.
os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
}
}
configPath := flag.String("config", config.DefaultPath(), "path to agent.yaml")
enrollServer := flag.String("enroll-server", "", "server URL (used with -enroll-token to perform first-run enrollment)")
enrollToken := flag.String("enroll-token", "", "one-time enrollment token (operator copies this from the UI)")
showVersion := flag.Bool("version", false, "print version and exit")
flag.Parse()
if *showVersion {
fmt.Println("restic-manager-agent", version)
return nil
}
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)
cfg, err := config.Load(*configPath)
if err != nil {
return fmt.Errorf("config: %w", err)
}
if *enrollToken != "" {
if *enrollServer == "" {
return errors.New("enrollment: -enroll-server is required with -enroll-token")
}
return doEnroll(*enrollServer, *enrollToken, cfg, version)
}
// Announce-and-approve: -enroll-server set, no token, agent not
// yet enrolled. Run the announce flow inline; on success the cfg
// has the bearer + host_id and we drop into the normal run loop.
if !cfg.Enrolled() && *enrollServer != "" {
if err := doAnnounce(*enrollServer, cfg, version); err != nil {
return fmt.Errorf("announce: %w", err)
}
}
if !cfg.Enrolled() {
return fmt.Errorf("agent is not enrolled; run with -enroll-server (and either -enroll-token or wait for admin to accept the announce) first (config %q)", *configPath)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
if err != nil {
return fmt.Errorf("sysinfo: %w", err)
}
slog.Info("agent starting",
"version", version,
"host_id", cfg.HostID,
"server", cfg.ServerURL,
"restic_version", snap.ResticVersion,
"protocol_version", snap.ProtocolVersion,
)
resticBin, _ := restic.Locate(cfg.ResticPath) // empty is fine; commands fail with a clear error later
// Open the secrets store. If the agent is enrolled but has no
// secrets key yet (legacy YAML), mint one and migrate any
// plaintext repo fields into the encrypted blob.
sec, err := openSecretsStore(cfg)
if err != nil {
return fmt.Errorf("secrets: %w", err)
}
wsCfg := wsclient.Config{
ServerURL: cfg.ServerURL,
AgentToken: cfg.AgentToken,
HostID: cfg.HostID,
CertPinSHA256: cfg.CertPinSHA256,
HelloPayload: api.HelloPayload{
ProtocolVersion: snap.ProtocolVersion,
AgentVersion: version,
ResticVersion: snap.ResticVersion,
Hostname: snap.Hostname,
OS: snap.OS,
Arch: snap.Arch,
},
}
d := &dispatcher{
resticBin: resticBin,
secrets: sec,
scheduler: scheduler.New(),
}
if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil {
return fmt.Errorf("ws run: %w", err)
}
slog.Info("agent shutting down")
return nil
}
// openSecretsStore opens (or one-time migrates) the agent's encrypted
// secrets file. Side effects:
// - mints SecretsKey if absent and persists agent.yaml.
// - if legacy plaintext repo_url/repo_password sit in agent.yaml,
// copies them into secrets.enc and clears the YAML fields on
// the next save.
func openSecretsStore(cfg *config.Config) (*secrets.Store, error) {
if err := cfg.EnsureSecretsKey(); err != nil {
return nil, err
}
keyBytes, err := cfg.SecretsKeyBytes()
if err != nil {
return nil, err
}
st, err := secrets.New(cfg.ResolvedSecretsPath(), keyBytes)
if err != nil {
return nil, err
}
migrated := false
if cfg.LegacyRepoURL != "" || cfg.LegacyRepoPassword != "" {
cur, _ := st.Load() // empty Repo on first run is fine
if cur.URL == "" {
cur.URL = cfg.LegacyRepoURL
}
if cur.Password == "" {
cur.Password = cfg.LegacyRepoPassword
}
if err := st.Save(cur); err != nil {
return nil, fmt.Errorf("migrate legacy creds into secrets.enc: %w", err)
}
cfg.LegacyRepoURL = ""
cfg.LegacyRepoPassword = ""
migrated = true
slog.Info("agent: migrated legacy plaintext repo creds into secrets.enc")
}
// Persist key (and the cleared legacy fields) regardless of
// whether we migrated, in case we just minted SecretsKey.
if migrated || cfg.SecretsKey != "" {
if err := cfg.Save(); err != nil {
return nil, fmt.Errorf("persist agent config: %w", err)
}
}
return st, nil
}
// dispatcher closes over the long-lived agent settings (restic path
// + secrets handle) so handle() can spawn the runner without
// re-loading config every time. Repo creds are read fresh from the
// secrets store on each job — config.update writes through to disk,
// so a job dispatched in the same session sees the latest values.
type dispatcher struct {
resticBin string
secrets *secrets.Store
scheduler *scheduler.Scheduler
// Bandwidth caps in KB/s pushed via config.update. Mutated under
// bwMu by the config.update handler; read by runJob when building
// the runner. <=0 means "no cap" (do not pass --limit-* to restic).
// Per-job overrides on CommandRunPayload take precedence.
bwMu sync.Mutex
bwUpKBps int
bwDownKBps int
// Per-running-job cancellation handles. Populated when runJob
// spawns the goroutine, removed when it returns. Looked up by
// the command.cancel handler (server → agent) to abort an
// in-flight restic invocation.
cancelMu sync.Mutex
cancels map[string]context.CancelFunc
}
// trackJob registers a cancel func for an in-flight job and returns a
// cleanup that removes it. Call cleanup when the job goroutine exits
// regardless of outcome — runs even on panic.
func (d *dispatcher) trackJob(jobID string, cancel context.CancelFunc) func() {
d.cancelMu.Lock()
if d.cancels == nil {
d.cancels = make(map[string]context.CancelFunc)
}
d.cancels[jobID] = cancel
d.cancelMu.Unlock()
return func() {
d.cancelMu.Lock()
delete(d.cancels, jobID)
d.cancelMu.Unlock()
}
}
// cancelJob fires the cancel func for jobID if there is one and
// returns whether the job was actually known. The runner is expected
// to surface the resulting context.Canceled as a JobCancelled status
// in its job.finished envelope (see runner.sendFinished).
func (d *dispatcher) cancelJob(jobID string) bool {
d.cancelMu.Lock()
cancel, ok := d.cancels[jobID]
d.cancelMu.Unlock()
if !ok {
return false
}
cancel()
return true
}
func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error {
switch env.Type {
case api.MsgCommandRun:
var p api.CommandRunPayload
if err := env.UnmarshalPayload(&p); err != nil {
return fmt.Errorf("command.run: %w", err)
}
return d.runJob(ctx, p, tx)
case api.MsgCommandCancel:
var p api.CommandCancelPayload
if err := env.UnmarshalPayload(&p); err != nil {
return fmt.Errorf("command.cancel: %w", err)
}
if d.cancelJob(p.JobID) {
slog.Info("ws agent: command.cancel applied", "job_id", p.JobID)
} else {
// Job already finished or was never seen on this agent.
// Not an error — operator may have raced cancel against
// natural completion. Server-side state is authoritative.
slog.Info("ws agent: command.cancel for unknown job (already finished?)", "job_id", p.JobID)
}
case api.MsgScheduleSet:
var p api.ScheduleSetPayload
if err := env.UnmarshalPayload(&p); err != nil {
return fmt.Errorf("schedule.set: %w", err)
}
// scheduler.Apply rebuilds the local cron from scratch and
// emits schedule.ack via tx. Async-safe: tx may have to wait
// briefly on the connection's writeMu, but the read loop
// keeps draining other messages.
go d.scheduler.Apply(p, tx)
case api.MsgConfigUpdate:
var p api.ConfigUpdatePayload
_ = env.UnmarshalPayload(&p)
slot := p.Slot
if slot == "" {
slot = "repo"
}
switch slot {
case "repo":
// Merge with whatever's already in secrets.enc — empty fields
// in the push mean "leave alone." Atomic write underneath.
cur, err := d.secrets.Load()
if err != nil {
slog.Error("ws agent: load secrets for merge", "err", err)
return nil
}
changed := false
if p.RepoURL != "" && p.RepoURL != cur.URL {
cur.URL = p.RepoURL
changed = true
}
if p.RepoUsername != "" && p.RepoUsername != cur.Username {
cur.Username = p.RepoUsername
changed = true
}
if p.RepoPassword != "" && p.RepoPassword != cur.Password {
cur.Password = p.RepoPassword
changed = true
}
if changed {
if err := d.secrets.Save(cur); err != nil {
slog.Error("ws agent: persist secrets", "err", err)
return nil
}
slog.Info("ws agent: repo credentials updated via config.update")
}
case "admin":
cur, err := d.secrets.LoadAdmin()
if err != nil && !errors.Is(err, secrets.ErrNoAdmin) {
slog.Error("ws agent: load admin secrets", "err", err)
return nil
}
// ErrNoAdmin is not an error here — we are creating the slot.
changed := false
if p.RepoURL != "" && p.RepoURL != cur.URL {
cur.URL = p.RepoURL
changed = true
}
if p.RepoUsername != "" && p.RepoUsername != cur.Username {
cur.Username = p.RepoUsername
changed = true
}
if p.RepoPassword != "" && p.RepoPassword != cur.Password {
cur.Password = p.RepoPassword
changed = true
}
if changed {
if err := d.secrets.SaveAdmin(cur); err != nil {
slog.Error("ws agent: persist admin secrets", "err", err)
return nil
}
slog.Info("ws agent: admin credentials updated via config.update")
}
default:
slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot)
}
// Bandwidth caps ride independently of the slot — they're host-
// wide and apply to every restic invocation regardless of which
// credentials slot the job uses. nil pointer = no change in this
// push; non-nil = set to that value (≤0 clears the cap).
if p.BandwidthUpKBps != nil || p.BandwidthDownKBps != nil {
d.bwMu.Lock()
if p.BandwidthUpKBps != nil {
d.bwUpKBps = *p.BandwidthUpKBps
}
if p.BandwidthDownKBps != nil {
d.bwDownKBps = *p.BandwidthDownKBps
}
up, down := d.bwUpKBps, d.bwDownKBps
d.bwMu.Unlock()
slog.Info("ws agent: bandwidth caps updated",
"up_kbps", up, "down_kbps", down)
}
case api.MsgAgentUpdateAvail:
var p api.AgentUpdateAvailablePayload
_ = env.UnmarshalPayload(&p)
slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL)
default:
slog.Debug("ws agent: ignored message", "type", env.Type)
}
return nil
}
// runJob spawns a runner for one job. We launch a goroutine so the
// WS read loop keeps draining messages while restic chugs along.
func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsclient.Sender) error {
if d.resticBin == "" {
return fmt.Errorf("restic binary not located on this agent")
}
creds, err := d.secrets.Load()
if err != nil {
return fmt.Errorf("load repo credentials: %w", err)
}
if creds.Empty() {
return fmt.Errorf("repo credentials not configured (waiting for server config.update push)")
}
// r is the everyday runner — bound to the host's repo
// (append-only) credentials. Reused by every kind except
// JobPrune, which builds its own runner against the
// admin-credentials slot when p.RequiresAdminCreds is set
// (admin creds are not loaded for any other kind, so they're
// not on r). If you find yourself adding a new JobKind that
// needs delete authority, mirror the JobPrune pattern below
// — don't try to overload r.
// Resolve bandwidth caps: per-job override (if set) wins over the
// host-wide caps last pushed via config.update. <=0 means no cap.
d.bwMu.Lock()
upKBps, downKBps := d.bwUpKBps, d.bwDownKBps
d.bwMu.Unlock()
if p.BandwidthUpKBps != nil {
upKBps = *p.BandwidthUpKBps
}
if p.BandwidthDownKBps != nil {
downKBps = *p.BandwidthDownKBps
}
r := runner.New(runner.Config{
ResticBin: d.resticBin,
RepoURL: creds.URL,
RepoUsername: creds.Username,
RepoPassword: creds.Password,
LimitUploadKBps: upKBps,
LimitDownloadKBps: downKBps,
}, tx, time.Second)
// spawn wraps the kind-specific goroutine: derives a per-job
// cancellable context from the connection-scoped ctx, registers
// the cancel func so command.cancel can fire it, deregisters on
// completion. Per-job ctx means canceling one job doesn't kill
// any other in-flight invocations.
spawn := func(name string, fn func(ctx context.Context) error) {
jobCtx, cancel := context.WithCancel(ctx)
cleanup := d.trackJob(p.JobID, cancel)
go func() {
defer cleanup()
defer cancel() // release ctx resources on goroutine exit
if err := fn(jobCtx); err != nil {
slog.Warn("agent: "+name+" job failed", "job_id", p.JobID, "err", err)
return
}
slog.Info("agent: "+name+" job complete", "job_id", p.JobID)
}()
}
switch p.Kind {
case api.JobBackup:
// Includes/Excludes/Tag come from the source group resolved
// server-side. Args is preserved for backwards compatibility:
// if the server sends only Args (older shape) we fall back to
// treating it as the paths list with no tag.
paths := p.Includes
if len(paths) == 0 {
paths = p.Args
}
var tags []string
if p.Tag != "" {
tags = []string{p.Tag}
}
slog.Info("agent: accepting backup job",
"job_id", p.JobID, "paths", paths, "excludes", p.Excludes, "tag", p.Tag)
hooks := runner.BackupHooks{Pre: p.PreHook, Post: p.PostHook}
spawn("backup", func(jobCtx context.Context) error {
return r.RunBackup(jobCtx, p.JobID, paths, p.Excludes, tags, hooks)
})
case api.JobInit:
slog.Info("agent: accepting init job", "job_id", p.JobID)
spawn("init", func(jobCtx context.Context) error {
return r.RunInit(jobCtx, p.JobID)
})
case api.JobForget:
if len(p.ForgetGroups) == 0 {
// Hard-error rather than fall back to a single-policy form:
// the server-side dispatch path (maintenance ticker) is the
// only writer of forget command.run today, and it always
// populates ForgetGroups. A backwards-compatible single-
// policy fallback was specced but skipped — see the
// Phase 5 plan rationale and version.go's lockstep-deploy
// note for why.
return fmt.Errorf("forget: command.run carried no forget_groups (server didn't populate them)")
}
groups := make([]restic.ForgetGroup, 0, len(p.ForgetGroups))
for _, g := range p.ForgetGroups {
groups = append(groups, restic.ForgetGroup{
Tag: g.Tag,
Policy: restic.ForgetPolicy{
KeepLast: g.Policy.KeepLast,
KeepHourly: g.Policy.KeepHourly,
KeepDaily: g.Policy.KeepDaily,
KeepWeekly: g.Policy.KeepWeekly,
KeepMonthly: g.Policy.KeepMonthly,
KeepYearly: g.Policy.KeepYearly,
},
})
}
slog.Info("agent: accepting forget job", "job_id", p.JobID, "groups", len(groups))
spawn("forget", func(jobCtx context.Context) error {
return r.RunForget(jobCtx, p.JobID, groups)
})
case api.JobPrune:
// Prune may require admin creds (delete authority on rest-server).
runCreds := creds
if p.RequiresAdminCreds {
ac, err := d.secrets.LoadAdmin()
if err != nil {
return fmt.Errorf("prune: admin creds not configured (server didn't push them): %w", err)
}
if ac.Empty() {
return fmt.Errorf("prune: admin creds incomplete")
}
runCreds = ac
}
prr := runner.New(runner.Config{
ResticBin: d.resticBin,
RepoURL: runCreds.URL,
RepoUsername: runCreds.Username,
RepoPassword: runCreds.Password,
LimitUploadKBps: upKBps,
LimitDownloadKBps: downKBps,
}, tx, time.Second)
slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds)
spawn("prune", func(jobCtx context.Context) error {
return prr.RunPrune(jobCtx, p.JobID)
})
case api.JobCheck:
subset := 0
if len(p.Args) > 0 {
subset, _ = strconv.Atoi(p.Args[0])
}
slog.Info("agent: accepting check job", "job_id", p.JobID, "subset_pct", subset)
spawn("check", func(jobCtx context.Context) error {
return r.RunCheck(jobCtx, p.JobID, subset)
})
case api.JobUnlock:
slog.Info("agent: accepting unlock job", "job_id", p.JobID)
spawn("unlock", func(jobCtx context.Context) error {
return r.RunUnlock(jobCtx, p.JobID)
})
default:
return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind)
}
return nil
}
func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
if err != nil {
return fmt.Errorf("sysinfo: %w", err)
}
res, err := wsclient.Enroll(ctx, serverURL, wsclient.EnrollRequest{
Token: token,
HostName: snap.Hostname,
OS: snap.OS,
Arch: snap.Arch,
AgentVersion: agentVersion,
ResticVersion: snap.ResticVersion,
})
if err != nil {
return fmt.Errorf("enroll: %w", err)
}
cfg.ServerURL = serverURL
cfg.HostID = res.HostID
cfg.AgentToken = res.AgentToken
cfg.CertPinSHA256 = res.CertPinSHA256
if err := cfg.Save(); err != nil {
return fmt.Errorf("save config: %w", err)
}
fmt.Fprintf(os.Stderr, "enrolled as host %s on %s\n", res.HostID, serverURL)
return nil
}