8afda7cd8c
CI / Test (store) (pull_request) Successful in 5s
CI / Test (rest) (pull_request) Successful in 9s
CI / Build (windows/amd64) (pull_request) Successful in 7s
CI / Test (server-http) (pull_request) Successful in 17s
CI / Build (linux/amd64) (pull_request) Successful in 7s
CI / Lint (pull_request) Successful in 19s
CI / Build (linux/arm64) (pull_request) Successful in 14s
e2e / Playwright vs docker-compose (pull_request) Successful in 1m27s
The Dockerfile only set `-X main.version=...`, so docker-built binaries
left `internal/version.Version` at its default "dev". The update logic
(host_update.go:61, hosts.go:94, fleet_update.go:101 et al.) compares
against `internal/version.Version`, so a v1.0.0 host always looked
out-of-date to a v1.0.0 server, the chip never cleared, and pressing
"update" re-downloaded the same bundled binary on a loop.
Collapse the two version sources: drop the `var version/commit/date`
locals in cmd/{server,agent}/main.go, route everything through
internal/version (now also carrying Date), and have both the Dockerfile
and the Makefile set the same single set of -X flags. Verified
end-to-end: make build and docker build both emit binaries whose
--version reflects the build VERSION.
730 lines
25 KiB
Go
730 lines
25 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/scheduler"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/secrets"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/service"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/version"
|
|
)
|
|
|
|
func main() {
|
|
if err := run(); err != nil {
|
|
slog.Error("agent fatal", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func run() error {
|
|
// Optional first positional verb for SCM control on Windows.
|
|
// `restic-manager-agent install|uninstall|start|stop` route into
|
|
// the service package; everything else falls through to the
|
|
// flag-driven default (which is what systemd / interactive runs
|
|
// hit). On non-Windows builds these verbs return a clear error.
|
|
if len(os.Args) > 1 {
|
|
switch os.Args[1] {
|
|
case "install":
|
|
return service.Install()
|
|
case "uninstall":
|
|
return service.Uninstall()
|
|
case "start":
|
|
return service.Start()
|
|
case "stop":
|
|
return service.Stop()
|
|
case "run":
|
|
// Strip the verb so flag.Parse sees the rest unchanged.
|
|
os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
|
|
}
|
|
}
|
|
|
|
configPath := flag.String("config", config.DefaultPath(), "path to agent.yaml")
|
|
enrollServer := flag.String("enroll-server", "", "server URL (used with -enroll-token to perform first-run enrollment)")
|
|
enrollToken := flag.String("enroll-token", "", "one-time enrollment token (operator copies this from the UI)")
|
|
showVersion := flag.Bool("version", false, "print version and exit")
|
|
flag.Parse()
|
|
|
|
if *showVersion {
|
|
fmt.Printf("restic-manager-agent %s (commit %s, built %s)\n", version.Version, version.Commit, version.Date)
|
|
return nil
|
|
}
|
|
|
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
|
slog.SetDefault(logger)
|
|
|
|
cfg, err := config.Load(*configPath)
|
|
if err != nil {
|
|
return fmt.Errorf("config: %w", err)
|
|
}
|
|
|
|
if *enrollToken != "" {
|
|
if *enrollServer == "" {
|
|
return errors.New("enrollment: -enroll-server is required with -enroll-token")
|
|
}
|
|
return doEnroll(*enrollServer, *enrollToken, cfg, version.Version)
|
|
}
|
|
|
|
// Announce-and-approve: -enroll-server set, no token, agent not
|
|
// yet enrolled. Run the announce flow inline; on success the cfg
|
|
// has the bearer + host_id and we drop into the normal run loop.
|
|
if !cfg.Enrolled() && *enrollServer != "" {
|
|
if err := doAnnounce(*enrollServer, cfg, version.Version); err != nil {
|
|
return fmt.Errorf("announce: %w", err)
|
|
}
|
|
}
|
|
|
|
if !cfg.Enrolled() {
|
|
return fmt.Errorf("agent is not enrolled; run with -enroll-server (and either -enroll-token or wait for admin to accept the announce) first (config %q)", *configPath)
|
|
}
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
|
|
if err != nil {
|
|
return fmt.Errorf("sysinfo: %w", err)
|
|
}
|
|
slog.Info("agent starting",
|
|
"version", version.Version,
|
|
"host_id", cfg.HostID,
|
|
"server", cfg.ServerURL,
|
|
"restic_version", snap.ResticVersion,
|
|
"protocol_version", snap.ProtocolVersion,
|
|
)
|
|
|
|
resticBin, _ := restic.Locate(cfg.ResticPath) // empty is fine; commands fail with a clear error later
|
|
|
|
// Probe the actual restic binary for restore-flag support. We used
|
|
// to gate --no-ownership on a SemVer comparison (added in 0.17),
|
|
// but a restic 0.18.1 build was observed in the wild that still
|
|
// rejects the flag. The help text is the only reliable signal.
|
|
resticSupportsNoOwnership := restic.SupportsRestoreNoOwnership(ctx, resticBin)
|
|
|
|
// Open the secrets store. If the agent is enrolled but has no
|
|
// secrets key yet (legacy YAML), mint one and migrate any
|
|
// plaintext repo fields into the encrypted blob.
|
|
sec, err := openSecretsStore(cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("secrets: %w", err)
|
|
}
|
|
|
|
wsCfg := wsclient.Config{
|
|
ServerURL: cfg.ServerURL,
|
|
AgentToken: cfg.AgentToken,
|
|
HostID: cfg.HostID,
|
|
CertPinSHA256: cfg.CertPinSHA256,
|
|
HelloPayload: api.HelloPayload{
|
|
ProtocolVersion: snap.ProtocolVersion,
|
|
AgentVersion: version.Version,
|
|
ResticVersion: snap.ResticVersion,
|
|
Hostname: snap.Hostname,
|
|
OS: snap.OS,
|
|
Arch: snap.Arch,
|
|
},
|
|
}
|
|
|
|
d := &dispatcher{
|
|
resticBin: resticBin,
|
|
resticVer: snap.ResticVersion,
|
|
resticSupportsNoOwnership: resticSupportsNoOwnership,
|
|
serverURL: cfg.ServerURL,
|
|
secrets: sec,
|
|
scheduler: scheduler.New(),
|
|
}
|
|
if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil {
|
|
return fmt.Errorf("ws run: %w", err)
|
|
}
|
|
slog.Info("agent shutting down")
|
|
return nil
|
|
}
|
|
|
|
// openSecretsStore opens (or one-time migrates) the agent's encrypted
|
|
// secrets file. Side effects:
|
|
// - mints SecretsKey if absent and persists agent.yaml.
|
|
// - if legacy plaintext repo_url/repo_password sit in agent.yaml,
|
|
// copies them into secrets.enc and clears the YAML fields on
|
|
// the next save.
|
|
func openSecretsStore(cfg *config.Config) (*secrets.Store, error) {
|
|
if err := cfg.EnsureSecretsKey(); err != nil {
|
|
return nil, err
|
|
}
|
|
keyBytes, err := cfg.SecretsKeyBytes()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
st, err := secrets.New(cfg.ResolvedSecretsPath(), keyBytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
migrated := false
|
|
if cfg.LegacyRepoURL != "" || cfg.LegacyRepoPassword != "" {
|
|
cur, _ := st.Load() // empty Repo on first run is fine
|
|
if cur.URL == "" {
|
|
cur.URL = cfg.LegacyRepoURL
|
|
}
|
|
if cur.Password == "" {
|
|
cur.Password = cfg.LegacyRepoPassword
|
|
}
|
|
if err := st.Save(cur); err != nil {
|
|
return nil, fmt.Errorf("migrate legacy creds into secrets.enc: %w", err)
|
|
}
|
|
cfg.LegacyRepoURL = ""
|
|
cfg.LegacyRepoPassword = ""
|
|
migrated = true
|
|
slog.Info("agent: migrated legacy plaintext repo creds into secrets.enc")
|
|
}
|
|
|
|
// Persist key (and the cleared legacy fields) regardless of
|
|
// whether we migrated, in case we just minted SecretsKey.
|
|
if migrated || cfg.SecretsKey != "" {
|
|
if err := cfg.Save(); err != nil {
|
|
return nil, fmt.Errorf("persist agent config: %w", err)
|
|
}
|
|
}
|
|
return st, nil
|
|
}
|
|
|
|
// dispatcher closes over the long-lived agent settings (restic path
|
|
// + secrets handle) so handle() can spawn the runner without
|
|
// re-loading config every time. Repo creds are read fresh from the
|
|
// secrets store on each job — config.update writes through to disk,
|
|
// so a job dispatched in the same session sees the latest values.
|
|
type dispatcher struct {
|
|
resticBin string
|
|
resticVer string // e.g. "0.17.1"; empty if restic isn't installed yet
|
|
resticSupportsNoOwnership bool // captured at startup from `restic restore --help`
|
|
serverURL string // base URL of the server (used by the self-update fetch)
|
|
secrets *secrets.Store
|
|
scheduler *scheduler.Scheduler
|
|
|
|
// Bandwidth caps in KB/s pushed via config.update. Mutated under
|
|
// bwMu by the config.update handler; read by runJob when building
|
|
// the runner. <=0 means "no cap" (do not pass --limit-* to restic).
|
|
// Per-job overrides on CommandRunPayload take precedence.
|
|
bwMu sync.Mutex
|
|
bwUpKBps int
|
|
bwDownKBps int
|
|
|
|
// Per-running-job cancellation handles. Populated when runJob
|
|
// spawns the goroutine, removed when it returns. Looked up by
|
|
// the command.cancel handler (server → agent) to abort an
|
|
// in-flight restic invocation.
|
|
cancelMu sync.Mutex
|
|
cancels map[string]context.CancelFunc
|
|
}
|
|
|
|
// trackJob registers a cancel func for an in-flight job and returns a
|
|
// cleanup that removes it. Call cleanup when the job goroutine exits
|
|
// regardless of outcome — runs even on panic.
|
|
func (d *dispatcher) trackJob(jobID string, cancel context.CancelFunc) func() {
|
|
d.cancelMu.Lock()
|
|
if d.cancels == nil {
|
|
d.cancels = make(map[string]context.CancelFunc)
|
|
}
|
|
d.cancels[jobID] = cancel
|
|
d.cancelMu.Unlock()
|
|
return func() {
|
|
d.cancelMu.Lock()
|
|
delete(d.cancels, jobID)
|
|
d.cancelMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// cancelJob fires the cancel func for jobID if there is one and
|
|
// returns whether the job was actually known. The runner is expected
|
|
// to surface the resulting context.Canceled as a JobCancelled status
|
|
// in its job.finished envelope (see runner.sendFinished).
|
|
func (d *dispatcher) cancelJob(jobID string) bool {
|
|
d.cancelMu.Lock()
|
|
cancel, ok := d.cancels[jobID]
|
|
d.cancelMu.Unlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
cancel()
|
|
return true
|
|
}
|
|
|
|
func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error {
|
|
switch env.Type {
|
|
case api.MsgCommandRun:
|
|
var p api.CommandRunPayload
|
|
if err := env.UnmarshalPayload(&p); err != nil {
|
|
return fmt.Errorf("command.run: %w", err)
|
|
}
|
|
return d.runJob(ctx, p, tx)
|
|
|
|
case api.MsgCommandCancel:
|
|
var p api.CommandCancelPayload
|
|
if err := env.UnmarshalPayload(&p); err != nil {
|
|
return fmt.Errorf("command.cancel: %w", err)
|
|
}
|
|
if d.cancelJob(p.JobID) {
|
|
slog.Info("ws agent: command.cancel applied", "job_id", p.JobID)
|
|
} else {
|
|
// Job already finished or was never seen on this agent.
|
|
// Not an error — operator may have raced cancel against
|
|
// natural completion. Server-side state is authoritative.
|
|
slog.Info("ws agent: command.cancel for unknown job (already finished?)", "job_id", p.JobID)
|
|
}
|
|
|
|
case api.MsgTreeList:
|
|
// Synchronous RPC for the restore wizard's tree browser. The
|
|
// server has serialised access; we just run restic ls and reply
|
|
// with the same envelope ID. Run in a goroutine so the WS read
|
|
// loop keeps draining.
|
|
var p api.TreeListRequestPayload
|
|
if err := env.UnmarshalPayload(&p); err != nil {
|
|
return fmt.Errorf("tree.list: %w", err)
|
|
}
|
|
go d.handleTreeList(ctx, env.ID, p, tx)
|
|
|
|
case api.MsgScheduleSet:
|
|
var p api.ScheduleSetPayload
|
|
if err := env.UnmarshalPayload(&p); err != nil {
|
|
return fmt.Errorf("schedule.set: %w", err)
|
|
}
|
|
// scheduler.Apply rebuilds the local cron from scratch and
|
|
// emits schedule.ack via tx. Async-safe: tx may have to wait
|
|
// briefly on the connection's writeMu, but the read loop
|
|
// keeps draining other messages.
|
|
go d.scheduler.Apply(p, tx)
|
|
|
|
case api.MsgConfigUpdate:
|
|
var p api.ConfigUpdatePayload
|
|
_ = env.UnmarshalPayload(&p)
|
|
slot := p.Slot
|
|
if slot == "" {
|
|
slot = "repo"
|
|
}
|
|
switch slot {
|
|
case "repo":
|
|
// Merge with whatever's already in secrets.enc — empty fields
|
|
// in the push mean "leave alone." Atomic write underneath.
|
|
cur, err := d.secrets.Load()
|
|
if err != nil {
|
|
slog.Error("ws agent: load secrets for merge", "err", err)
|
|
return nil
|
|
}
|
|
changed := false
|
|
if p.RepoURL != "" && p.RepoURL != cur.URL {
|
|
cur.URL = p.RepoURL
|
|
changed = true
|
|
}
|
|
if p.RepoUsername != "" && p.RepoUsername != cur.Username {
|
|
cur.Username = p.RepoUsername
|
|
changed = true
|
|
}
|
|
if p.RepoPassword != "" && p.RepoPassword != cur.Password {
|
|
cur.Password = p.RepoPassword
|
|
changed = true
|
|
}
|
|
if changed {
|
|
if err := d.secrets.Save(cur); err != nil {
|
|
slog.Error("ws agent: persist secrets", "err", err)
|
|
return nil
|
|
}
|
|
slog.Info("ws agent: repo credentials updated via config.update")
|
|
}
|
|
case "admin":
|
|
cur, err := d.secrets.LoadAdmin()
|
|
if err != nil && !errors.Is(err, secrets.ErrNoAdmin) {
|
|
slog.Error("ws agent: load admin secrets", "err", err)
|
|
return nil
|
|
}
|
|
// ErrNoAdmin is not an error here — we are creating the slot.
|
|
changed := false
|
|
if p.RepoURL != "" && p.RepoURL != cur.URL {
|
|
cur.URL = p.RepoURL
|
|
changed = true
|
|
}
|
|
if p.RepoUsername != "" && p.RepoUsername != cur.Username {
|
|
cur.Username = p.RepoUsername
|
|
changed = true
|
|
}
|
|
if p.RepoPassword != "" && p.RepoPassword != cur.Password {
|
|
cur.Password = p.RepoPassword
|
|
changed = true
|
|
}
|
|
if changed {
|
|
if err := d.secrets.SaveAdmin(cur); err != nil {
|
|
slog.Error("ws agent: persist admin secrets", "err", err)
|
|
return nil
|
|
}
|
|
slog.Info("ws agent: admin credentials updated via config.update")
|
|
}
|
|
default:
|
|
slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot)
|
|
}
|
|
|
|
// Bandwidth caps ride independently of the slot — they're host-
|
|
// wide and apply to every restic invocation regardless of which
|
|
// credentials slot the job uses. nil pointer = no change in this
|
|
// push; non-nil = set to that value (≤0 clears the cap).
|
|
if p.BandwidthUpKBps != nil || p.BandwidthDownKBps != nil {
|
|
d.bwMu.Lock()
|
|
if p.BandwidthUpKBps != nil {
|
|
d.bwUpKBps = *p.BandwidthUpKBps
|
|
}
|
|
if p.BandwidthDownKBps != nil {
|
|
d.bwDownKBps = *p.BandwidthDownKBps
|
|
}
|
|
up, down := d.bwUpKBps, d.bwDownKBps
|
|
d.bwMu.Unlock()
|
|
slog.Info("ws agent: bandwidth caps updated",
|
|
"up_kbps", up, "down_kbps", down)
|
|
}
|
|
|
|
case api.MsgCommandUpdate:
|
|
var p api.CommandUpdatePayload
|
|
if err := env.UnmarshalPayload(&p); err != nil {
|
|
return fmt.Errorf("command.update: %w", err)
|
|
}
|
|
go d.runUpdate(ctx, p, tx)
|
|
|
|
default:
|
|
slog.Debug("ws agent: ignored message", "type", env.Type)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// handleTreeList runs `restic ls --json <snapshot> <path>` and ships
|
|
// the matching tree.list.result envelope back, correlated by the
|
|
// request envelope's ID. Errors (missing creds, restic failure)
|
|
// surface in the result's Error field rather than as transport-level
|
|
// failures so the server-side waiter can render a sensible message.
|
|
func (d *dispatcher) handleTreeList(ctx context.Context, reqID string, p api.TreeListRequestPayload, tx wsclient.Sender) {
|
|
reply := func(result api.TreeListResultPayload) {
|
|
result.SnapshotID = p.SnapshotID
|
|
result.Path = p.Path
|
|
env, err := api.Marshal(api.MsgTreeListResult, reqID, result)
|
|
if err != nil {
|
|
slog.Warn("ws agent: marshal tree.list.result", "err", err)
|
|
return
|
|
}
|
|
_ = tx.Send(env)
|
|
}
|
|
|
|
if d.resticBin == "" {
|
|
reply(api.TreeListResultPayload{Error: "restic binary not located on this agent"})
|
|
return
|
|
}
|
|
creds, err := d.secrets.Load()
|
|
if err != nil {
|
|
reply(api.TreeListResultPayload{Error: "load credentials: " + err.Error()})
|
|
return
|
|
}
|
|
if creds.Empty() {
|
|
reply(api.TreeListResultPayload{Error: "repo credentials not configured"})
|
|
return
|
|
}
|
|
|
|
d.bwMu.Lock()
|
|
upKBps, downKBps := d.bwUpKBps, d.bwDownKBps
|
|
d.bwMu.Unlock()
|
|
|
|
env := restic.Env{
|
|
Bin: d.resticBin,
|
|
RepoURL: creds.URL,
|
|
RepoUsername: creds.Username,
|
|
RepoPassword: creds.Password,
|
|
LimitUploadKBps: upKBps,
|
|
LimitDownloadKBps: downKBps,
|
|
}
|
|
|
|
// 60s ceiling matches snapshots/stats — restic ls on a single
|
|
// directory is normally sub-second; if the repo is unreachable we
|
|
// want to surface the failure rather than block the wizard.
|
|
listCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
|
|
entries, err := env.ListTreeChildren(listCtx, p.SnapshotID, p.Path)
|
|
if err != nil {
|
|
reply(api.TreeListResultPayload{Error: err.Error()})
|
|
return
|
|
}
|
|
apiEntries := make([]api.TreeListEntry, 0, len(entries))
|
|
for _, e := range entries {
|
|
apiEntries = append(apiEntries, api.TreeListEntry{
|
|
Name: e.Name,
|
|
Type: e.Type,
|
|
Size: e.Size,
|
|
})
|
|
}
|
|
reply(api.TreeListResultPayload{Entries: apiEntries})
|
|
}
|
|
|
|
// failJob ships a synthetic job.started + job.finished(failed) pair
|
|
// for a command.run we couldn't even spawn locally — missing restic
|
|
// binary, missing credentials, or a malformed payload. Without these
|
|
// envelopes the server has no way to know the job will never produce
|
|
// output: the row sits in "running", the live stream stays stuck on
|
|
// "awaiting agent output," and a subsequent command.cancel arrives
|
|
// for a job_id the agent never registered (we log "unknown job"
|
|
// because trackJob was never called). Sending a terminal envelope
|
|
// here closes the loop on both fronts.
|
|
func failJob(p api.CommandRunPayload, tx wsclient.Sender, errMsg string) {
|
|
now := time.Now().UTC()
|
|
if startedEnv, err := api.Marshal(api.MsgJobStarted, p.JobID, api.JobStartedPayload{
|
|
JobID: p.JobID, Kind: p.Kind, StartedAt: now,
|
|
}); err == nil {
|
|
_ = tx.Send(startedEnv)
|
|
}
|
|
if finEnv, err := api.Marshal(api.MsgJobFinished, p.JobID, api.JobFinishedPayload{
|
|
JobID: p.JobID,
|
|
Status: api.JobFailed,
|
|
ExitCode: -1,
|
|
FinishedAt: now,
|
|
Error: errMsg,
|
|
}); err == nil {
|
|
_ = tx.Send(finEnv)
|
|
}
|
|
}
|
|
|
|
// runJob spawns a runner for one job. We launch a goroutine so the
|
|
// WS read loop keeps draining messages while restic chugs along.
|
|
func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsclient.Sender) error {
|
|
if d.resticBin == "" {
|
|
failJob(p, tx, "restic binary not located on this agent")
|
|
return fmt.Errorf("restic binary not located on this agent")
|
|
}
|
|
creds, err := d.secrets.Load()
|
|
if err != nil {
|
|
failJob(p, tx, "load repo credentials: "+err.Error())
|
|
return fmt.Errorf("load repo credentials: %w", err)
|
|
}
|
|
if creds.Empty() {
|
|
failJob(p, tx, "repo credentials not configured (waiting for server config.update push)")
|
|
return fmt.Errorf("repo credentials not configured (waiting for server config.update push)")
|
|
}
|
|
// r is the everyday runner — bound to the host's repo
|
|
// (append-only) credentials. Reused by every kind except
|
|
// JobPrune, which builds its own runner against the
|
|
// admin-credentials slot when p.RequiresAdminCreds is set
|
|
// (admin creds are not loaded for any other kind, so they're
|
|
// not on r). If you find yourself adding a new JobKind that
|
|
// needs delete authority, mirror the JobPrune pattern below
|
|
// — don't try to overload r.
|
|
// Resolve bandwidth caps: per-job override (if set) wins over the
|
|
// host-wide caps last pushed via config.update. <=0 means no cap.
|
|
d.bwMu.Lock()
|
|
upKBps, downKBps := d.bwUpKBps, d.bwDownKBps
|
|
d.bwMu.Unlock()
|
|
if p.BandwidthUpKBps != nil {
|
|
upKBps = *p.BandwidthUpKBps
|
|
}
|
|
if p.BandwidthDownKBps != nil {
|
|
downKBps = *p.BandwidthDownKBps
|
|
}
|
|
|
|
r := runner.New(runner.Config{
|
|
ResticBin: d.resticBin,
|
|
ResticVersion: d.resticVer,
|
|
RepoURL: creds.URL,
|
|
RepoUsername: creds.Username,
|
|
RepoPassword: creds.Password,
|
|
SupportsRestoreNoOwnership: d.resticSupportsNoOwnership,
|
|
LimitUploadKBps: upKBps,
|
|
LimitDownloadKBps: downKBps,
|
|
}, tx, time.Second)
|
|
|
|
// spawn wraps the kind-specific goroutine: derives a per-job
|
|
// cancellable context from the connection-scoped ctx, registers
|
|
// the cancel func so command.cancel can fire it, deregisters on
|
|
// completion. Per-job ctx means canceling one job doesn't kill
|
|
// any other in-flight invocations.
|
|
spawn := func(name string, fn func(ctx context.Context) error) {
|
|
jobCtx, cancel := context.WithCancel(ctx)
|
|
cleanup := d.trackJob(p.JobID, cancel)
|
|
go func() {
|
|
defer cleanup()
|
|
defer cancel() // release ctx resources on goroutine exit
|
|
if err := fn(jobCtx); err != nil {
|
|
slog.Warn("agent: "+name+" job failed", "job_id", p.JobID, "err", err)
|
|
return
|
|
}
|
|
slog.Info("agent: "+name+" job complete", "job_id", p.JobID)
|
|
}()
|
|
}
|
|
|
|
switch p.Kind {
|
|
case api.JobBackup:
|
|
// Includes/Excludes/Tag come from the source group resolved
|
|
// server-side. Args is preserved for backwards compatibility:
|
|
// if the server sends only Args (older shape) we fall back to
|
|
// treating it as the paths list with no tag.
|
|
paths := p.Includes
|
|
if len(paths) == 0 {
|
|
paths = p.Args
|
|
}
|
|
var tags []string
|
|
if p.Tag != "" {
|
|
tags = []string{p.Tag}
|
|
}
|
|
slog.Info("agent: accepting backup job",
|
|
"job_id", p.JobID, "paths", paths, "excludes", p.Excludes, "tag", p.Tag)
|
|
hooks := runner.BackupHooks{Pre: p.PreHook, Post: p.PostHook}
|
|
spawn("backup", func(jobCtx context.Context) error {
|
|
return r.RunBackup(jobCtx, p.JobID, paths, p.Excludes, tags, hooks)
|
|
})
|
|
case api.JobInit:
|
|
slog.Info("agent: accepting init job", "job_id", p.JobID)
|
|
spawn("init", func(jobCtx context.Context) error {
|
|
return r.RunInit(jobCtx, p.JobID)
|
|
})
|
|
case api.JobForget:
|
|
if len(p.ForgetGroups) == 0 {
|
|
// Hard-error rather than fall back to a single-policy form:
|
|
// the server-side dispatch path (maintenance ticker) is the
|
|
// only writer of forget command.run today, and it always
|
|
// populates ForgetGroups. A backwards-compatible single-
|
|
// policy fallback was specced but skipped — see the
|
|
// Phase 5 plan rationale and version.go's lockstep-deploy
|
|
// note for why.
|
|
failJob(p, tx, "forget: command.run carried no forget_groups (server didn't populate them)")
|
|
return fmt.Errorf("forget: command.run carried no forget_groups (server didn't populate them)")
|
|
}
|
|
groups := make([]restic.ForgetGroup, 0, len(p.ForgetGroups))
|
|
for _, g := range p.ForgetGroups {
|
|
groups = append(groups, restic.ForgetGroup{
|
|
Tag: g.Tag,
|
|
Policy: restic.ForgetPolicy{
|
|
KeepLast: g.Policy.KeepLast,
|
|
KeepHourly: g.Policy.KeepHourly,
|
|
KeepDaily: g.Policy.KeepDaily,
|
|
KeepWeekly: g.Policy.KeepWeekly,
|
|
KeepMonthly: g.Policy.KeepMonthly,
|
|
KeepYearly: g.Policy.KeepYearly,
|
|
},
|
|
})
|
|
}
|
|
slog.Info("agent: accepting forget job", "job_id", p.JobID, "groups", len(groups))
|
|
spawn("forget", func(jobCtx context.Context) error {
|
|
return r.RunForget(jobCtx, p.JobID, groups)
|
|
})
|
|
case api.JobPrune:
|
|
// Prune may require admin creds (delete authority on rest-server).
|
|
runCreds := creds
|
|
if p.RequiresAdminCreds {
|
|
ac, err := d.secrets.LoadAdmin()
|
|
if err != nil {
|
|
return fmt.Errorf("prune: admin creds not configured (server didn't push them): %w", err)
|
|
}
|
|
if ac.Empty() {
|
|
return fmt.Errorf("prune: admin creds incomplete")
|
|
}
|
|
runCreds = ac
|
|
}
|
|
prr := runner.New(runner.Config{
|
|
ResticBin: d.resticBin,
|
|
ResticVersion: d.resticVer,
|
|
RepoURL: runCreds.URL,
|
|
RepoUsername: runCreds.Username,
|
|
RepoPassword: runCreds.Password,
|
|
SupportsRestoreNoOwnership: d.resticSupportsNoOwnership,
|
|
LimitUploadKBps: upKBps,
|
|
LimitDownloadKBps: downKBps,
|
|
}, tx, time.Second)
|
|
slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds)
|
|
spawn("prune", func(jobCtx context.Context) error {
|
|
return prr.RunPrune(jobCtx, p.JobID)
|
|
})
|
|
case api.JobCheck:
|
|
subset := 0
|
|
if len(p.Args) > 0 {
|
|
subset, _ = strconv.Atoi(p.Args[0])
|
|
}
|
|
slog.Info("agent: accepting check job", "job_id", p.JobID, "subset_pct", subset)
|
|
spawn("check", func(jobCtx context.Context) error {
|
|
return r.RunCheck(jobCtx, p.JobID, subset)
|
|
})
|
|
case api.JobUnlock:
|
|
slog.Info("agent: accepting unlock job", "job_id", p.JobID)
|
|
spawn("unlock", func(jobCtx context.Context) error {
|
|
return r.RunUnlock(jobCtx, p.JobID)
|
|
})
|
|
case api.JobRestore:
|
|
if p.Restore == nil {
|
|
failJob(p, tx, "restore: command.run carried no restore payload")
|
|
return fmt.Errorf("restore: command.run carried no restore payload")
|
|
}
|
|
rp := *p.Restore
|
|
if rp.SnapshotID == "" {
|
|
failJob(p, tx, "restore: snapshot_id is required")
|
|
return fmt.Errorf("restore: snapshot_id is required")
|
|
}
|
|
if !rp.InPlace && rp.TargetDir == "" {
|
|
failJob(p, tx, "restore: target_dir required for non-in-place restore")
|
|
return fmt.Errorf("restore: target_dir required for non-in-place restore")
|
|
}
|
|
slog.Info("agent: accepting restore job",
|
|
"job_id", p.JobID, "snapshot_id", rp.SnapshotID,
|
|
"paths", rp.Paths, "in_place", rp.InPlace, "target", rp.TargetDir)
|
|
spawn("restore", func(jobCtx context.Context) error {
|
|
return r.RunRestore(jobCtx, p.JobID, rp.SnapshotID, rp.Paths, rp.InPlace, rp.TargetDir)
|
|
})
|
|
case api.JobDiff:
|
|
if p.Diff == nil || p.Diff.SnapshotA == "" || p.Diff.SnapshotB == "" {
|
|
failJob(p, tx, "diff: command.run carried incomplete diff payload")
|
|
return fmt.Errorf("diff: command.run carried incomplete diff payload")
|
|
}
|
|
dp := *p.Diff
|
|
slog.Info("agent: accepting diff job",
|
|
"job_id", p.JobID, "a", dp.SnapshotA, "b", dp.SnapshotB)
|
|
spawn("diff", func(jobCtx context.Context) error {
|
|
return r.RunDiff(jobCtx, p.JobID, dp.SnapshotA, dp.SnapshotB)
|
|
})
|
|
default:
|
|
failJob(p, tx, fmt.Sprintf("kind %q not implemented on this agent", p.Kind))
|
|
return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
|
defer cancel()
|
|
|
|
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
|
|
if err != nil {
|
|
return fmt.Errorf("sysinfo: %w", err)
|
|
}
|
|
res, err := wsclient.Enroll(ctx, serverURL, wsclient.EnrollRequest{
|
|
Token: token,
|
|
HostName: snap.Hostname,
|
|
OS: snap.OS,
|
|
Arch: snap.Arch,
|
|
AgentVersion: agentVersion,
|
|
ResticVersion: snap.ResticVersion,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("enroll: %w", err)
|
|
}
|
|
cfg.ServerURL = serverURL
|
|
cfg.HostID = res.HostID
|
|
cfg.AgentToken = res.AgentToken
|
|
cfg.CertPinSHA256 = res.CertPinSHA256
|
|
if err := cfg.Save(); err != nil {
|
|
return fmt.Errorf("save config: %w", err)
|
|
}
|
|
fmt.Fprintf(os.Stderr, "enrolled as host %s on %s\n", res.HostID, serverURL)
|
|
return nil
|
|
}
|