package main import ( "context" "errors" "flag" "fmt" "log/slog" "os" "os/signal" "strconv" "sync" "syscall" "time" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/scheduler" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/secrets" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/service" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/restic" ) var version = "dev" func main() { if err := run(); err != nil { slog.Error("agent fatal", "err", err) os.Exit(1) } } func run() error { // Optional first positional verb for SCM control on Windows. // `restic-manager-agent install|uninstall|start|stop` route into // the service package; everything else falls through to the // flag-driven default (which is what systemd / interactive runs // hit). On non-Windows builds these verbs return a clear error. if len(os.Args) > 1 { switch os.Args[1] { case "install": return service.Install() case "uninstall": return service.Uninstall() case "start": return service.Start() case "stop": return service.Stop() case "run": // Strip the verb so flag.Parse sees the rest unchanged. os.Args = append([]string{os.Args[0]}, os.Args[2:]...) } } configPath := flag.String("config", config.DefaultPath(), "path to agent.yaml") enrollServer := flag.String("enroll-server", "", "server URL (used with -enroll-token to perform first-run enrollment)") enrollToken := flag.String("enroll-token", "", "one-time enrollment token (operator copies this from the UI)") showVersion := flag.Bool("version", false, "print version and exit") flag.Parse() if *showVersion { fmt.Println("restic-manager-agent", version) return nil } logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) slog.SetDefault(logger) cfg, err := config.Load(*configPath) if err != nil { return fmt.Errorf("config: %w", err) } if *enrollToken != "" { if *enrollServer == "" { return errors.New("enrollment: -enroll-server is required with -enroll-token") } return doEnroll(*enrollServer, *enrollToken, cfg, version) } // Announce-and-approve: -enroll-server set, no token, agent not // yet enrolled. Run the announce flow inline; on success the cfg // has the bearer + host_id and we drop into the normal run loop. if !cfg.Enrolled() && *enrollServer != "" { if err := doAnnounce(*enrollServer, cfg, version); err != nil { return fmt.Errorf("announce: %w", err) } } if !cfg.Enrolled() { return fmt.Errorf("agent is not enrolled; run with -enroll-server (and either -enroll-token or wait for admin to accept the announce) first (config %q)", *configPath) } ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() snap, err := sysinfo.Collect(ctx, cfg.ResticPath) if err != nil { return fmt.Errorf("sysinfo: %w", err) } slog.Info("agent starting", "version", version, "host_id", cfg.HostID, "server", cfg.ServerURL, "restic_version", snap.ResticVersion, "protocol_version", snap.ProtocolVersion, ) resticBin, _ := restic.Locate(cfg.ResticPath) // empty is fine; commands fail with a clear error later // Open the secrets store. If the agent is enrolled but has no // secrets key yet (legacy YAML), mint one and migrate any // plaintext repo fields into the encrypted blob. sec, err := openSecretsStore(cfg) if err != nil { return fmt.Errorf("secrets: %w", err) } wsCfg := wsclient.Config{ ServerURL: cfg.ServerURL, AgentToken: cfg.AgentToken, HostID: cfg.HostID, CertPinSHA256: cfg.CertPinSHA256, HelloPayload: api.HelloPayload{ ProtocolVersion: snap.ProtocolVersion, AgentVersion: version, ResticVersion: snap.ResticVersion, Hostname: snap.Hostname, OS: snap.OS, Arch: snap.Arch, }, } d := &dispatcher{ resticBin: resticBin, resticVer: snap.ResticVersion, secrets: sec, scheduler: scheduler.New(), } if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil { return fmt.Errorf("ws run: %w", err) } slog.Info("agent shutting down") return nil } // openSecretsStore opens (or one-time migrates) the agent's encrypted // secrets file. Side effects: // - mints SecretsKey if absent and persists agent.yaml. // - if legacy plaintext repo_url/repo_password sit in agent.yaml, // copies them into secrets.enc and clears the YAML fields on // the next save. func openSecretsStore(cfg *config.Config) (*secrets.Store, error) { if err := cfg.EnsureSecretsKey(); err != nil { return nil, err } keyBytes, err := cfg.SecretsKeyBytes() if err != nil { return nil, err } st, err := secrets.New(cfg.ResolvedSecretsPath(), keyBytes) if err != nil { return nil, err } migrated := false if cfg.LegacyRepoURL != "" || cfg.LegacyRepoPassword != "" { cur, _ := st.Load() // empty Repo on first run is fine if cur.URL == "" { cur.URL = cfg.LegacyRepoURL } if cur.Password == "" { cur.Password = cfg.LegacyRepoPassword } if err := st.Save(cur); err != nil { return nil, fmt.Errorf("migrate legacy creds into secrets.enc: %w", err) } cfg.LegacyRepoURL = "" cfg.LegacyRepoPassword = "" migrated = true slog.Info("agent: migrated legacy plaintext repo creds into secrets.enc") } // Persist key (and the cleared legacy fields) regardless of // whether we migrated, in case we just minted SecretsKey. if migrated || cfg.SecretsKey != "" { if err := cfg.Save(); err != nil { return nil, fmt.Errorf("persist agent config: %w", err) } } return st, nil } // dispatcher closes over the long-lived agent settings (restic path // + secrets handle) so handle() can spawn the runner without // re-loading config every time. Repo creds are read fresh from the // secrets store on each job — config.update writes through to disk, // so a job dispatched in the same session sees the latest values. type dispatcher struct { resticBin string resticVer string // e.g. "0.17.1"; empty if restic isn't installed yet secrets *secrets.Store scheduler *scheduler.Scheduler // Bandwidth caps in KB/s pushed via config.update. Mutated under // bwMu by the config.update handler; read by runJob when building // the runner. <=0 means "no cap" (do not pass --limit-* to restic). // Per-job overrides on CommandRunPayload take precedence. bwMu sync.Mutex bwUpKBps int bwDownKBps int // Per-running-job cancellation handles. Populated when runJob // spawns the goroutine, removed when it returns. Looked up by // the command.cancel handler (server → agent) to abort an // in-flight restic invocation. cancelMu sync.Mutex cancels map[string]context.CancelFunc } // trackJob registers a cancel func for an in-flight job and returns a // cleanup that removes it. Call cleanup when the job goroutine exits // regardless of outcome — runs even on panic. func (d *dispatcher) trackJob(jobID string, cancel context.CancelFunc) func() { d.cancelMu.Lock() if d.cancels == nil { d.cancels = make(map[string]context.CancelFunc) } d.cancels[jobID] = cancel d.cancelMu.Unlock() return func() { d.cancelMu.Lock() delete(d.cancels, jobID) d.cancelMu.Unlock() } } // cancelJob fires the cancel func for jobID if there is one and // returns whether the job was actually known. The runner is expected // to surface the resulting context.Canceled as a JobCancelled status // in its job.finished envelope (see runner.sendFinished). func (d *dispatcher) cancelJob(jobID string) bool { d.cancelMu.Lock() cancel, ok := d.cancels[jobID] d.cancelMu.Unlock() if !ok { return false } cancel() return true } func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error { switch env.Type { case api.MsgCommandRun: var p api.CommandRunPayload if err := env.UnmarshalPayload(&p); err != nil { return fmt.Errorf("command.run: %w", err) } return d.runJob(ctx, p, tx) case api.MsgCommandCancel: var p api.CommandCancelPayload if err := env.UnmarshalPayload(&p); err != nil { return fmt.Errorf("command.cancel: %w", err) } if d.cancelJob(p.JobID) { slog.Info("ws agent: command.cancel applied", "job_id", p.JobID) } else { // Job already finished or was never seen on this agent. // Not an error — operator may have raced cancel against // natural completion. Server-side state is authoritative. slog.Info("ws agent: command.cancel for unknown job (already finished?)", "job_id", p.JobID) } case api.MsgTreeList: // Synchronous RPC for the restore wizard's tree browser. The // server has serialised access; we just run restic ls and reply // with the same envelope ID. Run in a goroutine so the WS read // loop keeps draining. var p api.TreeListRequestPayload if err := env.UnmarshalPayload(&p); err != nil { return fmt.Errorf("tree.list: %w", err) } go d.handleTreeList(ctx, env.ID, p, tx) case api.MsgScheduleSet: var p api.ScheduleSetPayload if err := env.UnmarshalPayload(&p); err != nil { return fmt.Errorf("schedule.set: %w", err) } // scheduler.Apply rebuilds the local cron from scratch and // emits schedule.ack via tx. Async-safe: tx may have to wait // briefly on the connection's writeMu, but the read loop // keeps draining other messages. go d.scheduler.Apply(p, tx) case api.MsgConfigUpdate: var p api.ConfigUpdatePayload _ = env.UnmarshalPayload(&p) slot := p.Slot if slot == "" { slot = "repo" } switch slot { case "repo": // Merge with whatever's already in secrets.enc — empty fields // in the push mean "leave alone." Atomic write underneath. cur, err := d.secrets.Load() if err != nil { slog.Error("ws agent: load secrets for merge", "err", err) return nil } changed := false if p.RepoURL != "" && p.RepoURL != cur.URL { cur.URL = p.RepoURL changed = true } if p.RepoUsername != "" && p.RepoUsername != cur.Username { cur.Username = p.RepoUsername changed = true } if p.RepoPassword != "" && p.RepoPassword != cur.Password { cur.Password = p.RepoPassword changed = true } if changed { if err := d.secrets.Save(cur); err != nil { slog.Error("ws agent: persist secrets", "err", err) return nil } slog.Info("ws agent: repo credentials updated via config.update") } case "admin": cur, err := d.secrets.LoadAdmin() if err != nil && !errors.Is(err, secrets.ErrNoAdmin) { slog.Error("ws agent: load admin secrets", "err", err) return nil } // ErrNoAdmin is not an error here — we are creating the slot. changed := false if p.RepoURL != "" && p.RepoURL != cur.URL { cur.URL = p.RepoURL changed = true } if p.RepoUsername != "" && p.RepoUsername != cur.Username { cur.Username = p.RepoUsername changed = true } if p.RepoPassword != "" && p.RepoPassword != cur.Password { cur.Password = p.RepoPassword changed = true } if changed { if err := d.secrets.SaveAdmin(cur); err != nil { slog.Error("ws agent: persist admin secrets", "err", err) return nil } slog.Info("ws agent: admin credentials updated via config.update") } default: slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot) } // Bandwidth caps ride independently of the slot — they're host- // wide and apply to every restic invocation regardless of which // credentials slot the job uses. nil pointer = no change in this // push; non-nil = set to that value (≤0 clears the cap). if p.BandwidthUpKBps != nil || p.BandwidthDownKBps != nil { d.bwMu.Lock() if p.BandwidthUpKBps != nil { d.bwUpKBps = *p.BandwidthUpKBps } if p.BandwidthDownKBps != nil { d.bwDownKBps = *p.BandwidthDownKBps } up, down := d.bwUpKBps, d.bwDownKBps d.bwMu.Unlock() slog.Info("ws agent: bandwidth caps updated", "up_kbps", up, "down_kbps", down) } case api.MsgAgentUpdateAvail: var p api.AgentUpdateAvailablePayload _ = env.UnmarshalPayload(&p) slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL) default: slog.Debug("ws agent: ignored message", "type", env.Type) } return nil } // handleTreeList runs `restic ls --json ` and ships // the matching tree.list.result envelope back, correlated by the // request envelope's ID. Errors (missing creds, restic failure) // surface in the result's Error field rather than as transport-level // failures so the server-side waiter can render a sensible message. func (d *dispatcher) handleTreeList(ctx context.Context, reqID string, p api.TreeListRequestPayload, tx wsclient.Sender) { reply := func(result api.TreeListResultPayload) { result.SnapshotID = p.SnapshotID result.Path = p.Path env, err := api.Marshal(api.MsgTreeListResult, reqID, result) if err != nil { slog.Warn("ws agent: marshal tree.list.result", "err", err) return } _ = tx.Send(env) } if d.resticBin == "" { reply(api.TreeListResultPayload{Error: "restic binary not located on this agent"}) return } creds, err := d.secrets.Load() if err != nil { reply(api.TreeListResultPayload{Error: "load credentials: " + err.Error()}) return } if creds.Empty() { reply(api.TreeListResultPayload{Error: "repo credentials not configured"}) return } d.bwMu.Lock() upKBps, downKBps := d.bwUpKBps, d.bwDownKBps d.bwMu.Unlock() env := restic.Env{ Bin: d.resticBin, RepoURL: creds.URL, RepoUsername: creds.Username, RepoPassword: creds.Password, LimitUploadKBps: upKBps, LimitDownloadKBps: downKBps, } // 60s ceiling matches snapshots/stats — restic ls on a single // directory is normally sub-second; if the repo is unreachable we // want to surface the failure rather than block the wizard. listCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() entries, err := env.ListTreeChildren(listCtx, p.SnapshotID, p.Path) if err != nil { reply(api.TreeListResultPayload{Error: err.Error()}) return } apiEntries := make([]api.TreeListEntry, 0, len(entries)) for _, e := range entries { apiEntries = append(apiEntries, api.TreeListEntry{ Name: e.Name, Type: e.Type, Size: e.Size, }) } reply(api.TreeListResultPayload{Entries: apiEntries}) } // runJob spawns a runner for one job. We launch a goroutine so the // WS read loop keeps draining messages while restic chugs along. func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsclient.Sender) error { if d.resticBin == "" { return fmt.Errorf("restic binary not located on this agent") } creds, err := d.secrets.Load() if err != nil { return fmt.Errorf("load repo credentials: %w", err) } if creds.Empty() { return fmt.Errorf("repo credentials not configured (waiting for server config.update push)") } // r is the everyday runner — bound to the host's repo // (append-only) credentials. Reused by every kind except // JobPrune, which builds its own runner against the // admin-credentials slot when p.RequiresAdminCreds is set // (admin creds are not loaded for any other kind, so they're // not on r). If you find yourself adding a new JobKind that // needs delete authority, mirror the JobPrune pattern below // — don't try to overload r. // Resolve bandwidth caps: per-job override (if set) wins over the // host-wide caps last pushed via config.update. <=0 means no cap. d.bwMu.Lock() upKBps, downKBps := d.bwUpKBps, d.bwDownKBps d.bwMu.Unlock() if p.BandwidthUpKBps != nil { upKBps = *p.BandwidthUpKBps } if p.BandwidthDownKBps != nil { downKBps = *p.BandwidthDownKBps } r := runner.New(runner.Config{ ResticBin: d.resticBin, ResticVersion: d.resticVer, RepoURL: creds.URL, RepoUsername: creds.Username, RepoPassword: creds.Password, LimitUploadKBps: upKBps, LimitDownloadKBps: downKBps, }, tx, time.Second) // spawn wraps the kind-specific goroutine: derives a per-job // cancellable context from the connection-scoped ctx, registers // the cancel func so command.cancel can fire it, deregisters on // completion. Per-job ctx means canceling one job doesn't kill // any other in-flight invocations. spawn := func(name string, fn func(ctx context.Context) error) { jobCtx, cancel := context.WithCancel(ctx) cleanup := d.trackJob(p.JobID, cancel) go func() { defer cleanup() defer cancel() // release ctx resources on goroutine exit if err := fn(jobCtx); err != nil { slog.Warn("agent: "+name+" job failed", "job_id", p.JobID, "err", err) return } slog.Info("agent: "+name+" job complete", "job_id", p.JobID) }() } switch p.Kind { case api.JobBackup: // Includes/Excludes/Tag come from the source group resolved // server-side. Args is preserved for backwards compatibility: // if the server sends only Args (older shape) we fall back to // treating it as the paths list with no tag. paths := p.Includes if len(paths) == 0 { paths = p.Args } var tags []string if p.Tag != "" { tags = []string{p.Tag} } slog.Info("agent: accepting backup job", "job_id", p.JobID, "paths", paths, "excludes", p.Excludes, "tag", p.Tag) hooks := runner.BackupHooks{Pre: p.PreHook, Post: p.PostHook} spawn("backup", func(jobCtx context.Context) error { return r.RunBackup(jobCtx, p.JobID, paths, p.Excludes, tags, hooks) }) case api.JobInit: slog.Info("agent: accepting init job", "job_id", p.JobID) spawn("init", func(jobCtx context.Context) error { return r.RunInit(jobCtx, p.JobID) }) case api.JobForget: if len(p.ForgetGroups) == 0 { // Hard-error rather than fall back to a single-policy form: // the server-side dispatch path (maintenance ticker) is the // only writer of forget command.run today, and it always // populates ForgetGroups. A backwards-compatible single- // policy fallback was specced but skipped — see the // Phase 5 plan rationale and version.go's lockstep-deploy // note for why. return fmt.Errorf("forget: command.run carried no forget_groups (server didn't populate them)") } groups := make([]restic.ForgetGroup, 0, len(p.ForgetGroups)) for _, g := range p.ForgetGroups { groups = append(groups, restic.ForgetGroup{ Tag: g.Tag, Policy: restic.ForgetPolicy{ KeepLast: g.Policy.KeepLast, KeepHourly: g.Policy.KeepHourly, KeepDaily: g.Policy.KeepDaily, KeepWeekly: g.Policy.KeepWeekly, KeepMonthly: g.Policy.KeepMonthly, KeepYearly: g.Policy.KeepYearly, }, }) } slog.Info("agent: accepting forget job", "job_id", p.JobID, "groups", len(groups)) spawn("forget", func(jobCtx context.Context) error { return r.RunForget(jobCtx, p.JobID, groups) }) case api.JobPrune: // Prune may require admin creds (delete authority on rest-server). runCreds := creds if p.RequiresAdminCreds { ac, err := d.secrets.LoadAdmin() if err != nil { return fmt.Errorf("prune: admin creds not configured (server didn't push them): %w", err) } if ac.Empty() { return fmt.Errorf("prune: admin creds incomplete") } runCreds = ac } prr := runner.New(runner.Config{ ResticBin: d.resticBin, ResticVersion: d.resticVer, RepoURL: runCreds.URL, RepoUsername: runCreds.Username, RepoPassword: runCreds.Password, LimitUploadKBps: upKBps, LimitDownloadKBps: downKBps, }, tx, time.Second) slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds) spawn("prune", func(jobCtx context.Context) error { return prr.RunPrune(jobCtx, p.JobID) }) case api.JobCheck: subset := 0 if len(p.Args) > 0 { subset, _ = strconv.Atoi(p.Args[0]) } slog.Info("agent: accepting check job", "job_id", p.JobID, "subset_pct", subset) spawn("check", func(jobCtx context.Context) error { return r.RunCheck(jobCtx, p.JobID, subset) }) case api.JobUnlock: slog.Info("agent: accepting unlock job", "job_id", p.JobID) spawn("unlock", func(jobCtx context.Context) error { return r.RunUnlock(jobCtx, p.JobID) }) case api.JobRestore: if p.Restore == nil { return fmt.Errorf("restore: command.run carried no restore payload") } rp := *p.Restore if rp.SnapshotID == "" { return fmt.Errorf("restore: snapshot_id is required") } if !rp.InPlace && rp.TargetDir == "" { return fmt.Errorf("restore: target_dir required for non-in-place restore") } slog.Info("agent: accepting restore job", "job_id", p.JobID, "snapshot_id", rp.SnapshotID, "paths", rp.Paths, "in_place", rp.InPlace, "target", rp.TargetDir) spawn("restore", func(jobCtx context.Context) error { return r.RunRestore(jobCtx, p.JobID, rp.SnapshotID, rp.Paths, rp.InPlace, rp.TargetDir) }) case api.JobDiff: if p.Diff == nil || p.Diff.SnapshotA == "" || p.Diff.SnapshotB == "" { return fmt.Errorf("diff: command.run carried incomplete diff payload") } dp := *p.Diff slog.Info("agent: accepting diff job", "job_id", p.JobID, "a", dp.SnapshotA, "b", dp.SnapshotB) spawn("diff", func(jobCtx context.Context) error { return r.RunDiff(jobCtx, p.JobID, dp.SnapshotA, dp.SnapshotB) }) default: return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind) } return nil } func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() snap, err := sysinfo.Collect(ctx, cfg.ResticPath) if err != nil { return fmt.Errorf("sysinfo: %w", err) } res, err := wsclient.Enroll(ctx, serverURL, wsclient.EnrollRequest{ Token: token, HostName: snap.Hostname, OS: snap.OS, Arch: snap.Arch, AgentVersion: agentVersion, ResticVersion: snap.ResticVersion, }) if err != nil { return fmt.Errorf("enroll: %w", err) } cfg.ServerURL = serverURL cfg.HostID = res.HostID cfg.AgentToken = res.AgentToken cfg.CertPinSHA256 = res.CertPinSHA256 if err := cfg.Save(); err != nil { return fmt.Errorf("save config: %w", err) } fmt.Fprintf(os.Stderr, "enrolled as host %s on %s\n", res.HostID, serverURL) return nil }