Files
restic-manager/cmd/agent/main.go
T
steve 6a171596f1 P2-05: forget command with retention policy
End-to-end forget plumbing — operator can create a forget schedule
with keep-* values, agent runs restic forget --keep-* … on the
schedule's cron (or via per-row Run-now), snapshot list shrinks,
UI updates.

* api.CommandRunPayload gains retention_policy json.RawMessage so
  the agent doesn't need a typed copy of the server-side struct.
* restic.ForgetPolicy mirrors restic's --keep-* flags. Empty()
  reports zero dimensions; restic wrapper RunForget refuses to
  run an empty policy (would delete every snapshot). Does NOT
  pass --prune — pruning lives behind a separate admin-only
  credential (P2-06); forget just rewrites the snapshot index.
* runner.RunForget mirrors RunBackup's envelope shape so the
  live log viewer works without special-casing. On success
  triggers reportSnapshots (forget shrinks the index, the host's
  snapshot count almost certainly changed).
* cmd/agent dispatcher handles MsgCommandRun with kind=forget,
  decodes RetentionPolicy from the wire, builds restic.ForgetPolicy.
* Server dispatchScheduleNow marshals the schedule's
  RetentionPolicy into the wire payload for kind=forget jobs.
  Refuses to dispatch a forget schedule with empty retention.
* validateSchedule rejects kind=forget without at least one keep-*
  dimension (new error code: missing_retention).
* UI schedule edit form gains a Kind dropdown (backup or forget;
  immutable on edit). Paths block toggles by kind via inline
  data-kind attributes. Form help-text explains the prune
  separation.

Other kinds (prune, check, unlock) deferred to P2-06..08; the
Kind dropdown only offers backup and forget today.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 14:07:42 +01:00

346 lines
11 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/scheduler"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/secrets"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
)
var version = "dev"
func main() {
if err := run(); err != nil {
slog.Error("agent fatal", "err", err)
os.Exit(1)
}
}
func run() error {
configPath := flag.String("config", config.DefaultPath(), "path to agent.yaml")
enrollServer := flag.String("enroll-server", "", "server URL (used with -enroll-token to perform first-run enrollment)")
enrollToken := flag.String("enroll-token", "", "one-time enrollment token (operator copies this from the UI)")
showVersion := flag.Bool("version", false, "print version and exit")
flag.Parse()
if *showVersion {
fmt.Println("restic-manager-agent", version)
return nil
}
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)
cfg, err := config.Load(*configPath)
if err != nil {
return fmt.Errorf("config: %w", err)
}
if *enrollToken != "" {
if *enrollServer == "" {
return errors.New("enrollment: -enroll-server is required with -enroll-token")
}
return doEnroll(*enrollServer, *enrollToken, cfg, version)
}
if !cfg.Enrolled() {
return fmt.Errorf("agent is not enrolled; run with -enroll-server and -enroll-token first (config %q)", *configPath)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
if err != nil {
return fmt.Errorf("sysinfo: %w", err)
}
slog.Info("agent starting",
"version", version,
"host_id", cfg.HostID,
"server", cfg.ServerURL,
"restic_version", snap.ResticVersion,
"protocol_version", snap.ProtocolVersion,
)
resticBin, _ := restic.Locate(cfg.ResticPath) // empty is fine; commands fail with a clear error later
// Open the secrets store. If the agent is enrolled but has no
// secrets key yet (legacy YAML), mint one and migrate any
// plaintext repo fields into the encrypted blob.
sec, err := openSecretsStore(cfg)
if err != nil {
return fmt.Errorf("secrets: %w", err)
}
wsCfg := wsclient.Config{
ServerURL: cfg.ServerURL,
AgentToken: cfg.AgentToken,
HostID: cfg.HostID,
CertPinSHA256: cfg.CertPinSHA256,
HelloPayload: api.HelloPayload{
ProtocolVersion: snap.ProtocolVersion,
AgentVersion: version,
ResticVersion: snap.ResticVersion,
Hostname: snap.Hostname,
OS: snap.OS,
Arch: snap.Arch,
},
}
d := &dispatcher{
resticBin: resticBin,
secrets: sec,
scheduler: scheduler.New(),
}
if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil {
return fmt.Errorf("ws run: %w", err)
}
slog.Info("agent shutting down")
return nil
}
// openSecretsStore opens (or one-time migrates) the agent's encrypted
// secrets file. Side effects:
// - mints SecretsKey if absent and persists agent.yaml.
// - if legacy plaintext repo_url/repo_password sit in agent.yaml,
// copies them into secrets.enc and clears the YAML fields on
// the next save.
func openSecretsStore(cfg *config.Config) (*secrets.Store, error) {
if err := cfg.EnsureSecretsKey(); err != nil {
return nil, err
}
keyBytes, err := cfg.SecretsKeyBytes()
if err != nil {
return nil, err
}
st, err := secrets.New(cfg.ResolvedSecretsPath(), keyBytes)
if err != nil {
return nil, err
}
migrated := false
if cfg.LegacyRepoURL != "" || cfg.LegacyRepoPassword != "" {
cur, _ := st.Load() // empty Repo on first run is fine
if cur.URL == "" {
cur.URL = cfg.LegacyRepoURL
}
if cur.Password == "" {
cur.Password = cfg.LegacyRepoPassword
}
if err := st.Save(cur); err != nil {
return nil, fmt.Errorf("migrate legacy creds into secrets.enc: %w", err)
}
cfg.LegacyRepoURL = ""
cfg.LegacyRepoPassword = ""
migrated = true
slog.Info("agent: migrated legacy plaintext repo creds into secrets.enc")
}
// Persist key (and the cleared legacy fields) regardless of
// whether we migrated, in case we just minted SecretsKey.
if migrated || cfg.SecretsKey != "" {
if err := cfg.Save(); err != nil {
return nil, fmt.Errorf("persist agent config: %w", err)
}
}
return st, nil
}
// dispatcher closes over the long-lived agent settings (restic path
// + secrets handle) so handle() can spawn the runner without
// re-loading config every time. Repo creds are read fresh from the
// secrets store on each job — config.update writes through to disk,
// so a job dispatched in the same session sees the latest values.
type dispatcher struct {
resticBin string
secrets *secrets.Store
scheduler *scheduler.Scheduler
}
func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error {
switch env.Type {
case api.MsgCommandRun:
var p api.CommandRunPayload
if err := env.UnmarshalPayload(&p); err != nil {
return fmt.Errorf("command.run: %w", err)
}
return d.runJob(ctx, p, tx)
case api.MsgCommandCancel:
// TODO(P2): cancellation requires keeping a job→cancelFunc map.
slog.Info("ws agent: command.cancel received (cancellation lands in P2)", "id", env.ID)
case api.MsgScheduleSet:
var p api.ScheduleSetPayload
if err := env.UnmarshalPayload(&p); err != nil {
return fmt.Errorf("schedule.set: %w", err)
}
// scheduler.Apply rebuilds the local cron from scratch and
// emits schedule.ack via tx. Async-safe: tx may have to wait
// briefly on the connection's writeMu, but the read loop
// keeps draining other messages.
go d.scheduler.Apply(p, tx)
case api.MsgConfigUpdate:
var p api.ConfigUpdatePayload
_ = env.UnmarshalPayload(&p)
// Merge with whatever's already in secrets.enc — empty fields
// in the push mean "leave alone." Atomic write underneath.
cur, err := d.secrets.Load()
if err != nil {
slog.Error("ws agent: load secrets for merge", "err", err)
return nil
}
changed := false
if p.RepoURL != "" && p.RepoURL != cur.URL {
cur.URL = p.RepoURL
changed = true
}
if p.RepoUsername != "" && p.RepoUsername != cur.Username {
cur.Username = p.RepoUsername
changed = true
}
if p.RepoPassword != "" && p.RepoPassword != cur.Password {
cur.Password = p.RepoPassword
changed = true
}
if changed {
if err := d.secrets.Save(cur); err != nil {
slog.Error("ws agent: persist secrets", "err", err)
return nil
}
slog.Info("ws agent: repo credentials updated via config.update")
}
case api.MsgAgentUpdateAvail:
var p api.AgentUpdateAvailablePayload
_ = env.UnmarshalPayload(&p)
slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL)
default:
slog.Debug("ws agent: ignored message", "type", env.Type)
}
return nil
}
// runJob spawns a runner for one job. We launch a goroutine so the
// WS read loop keeps draining messages while restic chugs along.
func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsclient.Sender) error {
if d.resticBin == "" {
return fmt.Errorf("restic binary not located on this agent")
}
creds, err := d.secrets.Load()
if err != nil {
return fmt.Errorf("load repo credentials: %w", err)
}
if creds.Empty() {
return fmt.Errorf("repo credentials not configured (waiting for server config.update push)")
}
r := runner.New(runner.Config{
ResticBin: d.resticBin,
RepoURL: creds.URL,
RepoUsername: creds.Username,
RepoPassword: creds.Password,
}, tx, time.Second)
switch p.Kind {
case api.JobBackup:
// Agent.Args carries [paths...]. Excludes/tags are not yet
// surfaced over the wire; they come with P2 schedule support.
slog.Info("agent: accepting backup job",
"job_id", p.JobID, "paths", p.Args)
go func() {
if err := r.RunBackup(ctx, p.JobID, p.Args, nil, nil); err != nil {
slog.Warn("agent: backup job failed", "job_id", p.JobID, "err", err)
return
}
slog.Info("agent: backup job complete", "job_id", p.JobID)
}()
case api.JobInit:
slog.Info("agent: accepting init job", "job_id", p.JobID)
go func() {
if err := r.RunInit(ctx, p.JobID); err != nil {
slog.Warn("agent: init job failed", "job_id", p.JobID, "err", err)
return
}
slog.Info("agent: init job complete", "job_id", p.JobID)
}()
case api.JobForget:
var policy restic.ForgetPolicy
if len(p.RetentionPolicy) > 0 {
var raw struct {
KeepLast *int `json:"keep_last,omitempty"`
KeepHourly *int `json:"keep_hourly,omitempty"`
KeepDaily *int `json:"keep_daily,omitempty"`
KeepWeekly *int `json:"keep_weekly,omitempty"`
KeepMonthly *int `json:"keep_monthly,omitempty"`
KeepYearly *int `json:"keep_yearly,omitempty"`
}
if err := json.Unmarshal(p.RetentionPolicy, &raw); err != nil {
return fmt.Errorf("forget: decode retention_policy: %w", err)
}
policy = restic.ForgetPolicy{
KeepLast: raw.KeepLast, KeepHourly: raw.KeepHourly,
KeepDaily: raw.KeepDaily, KeepWeekly: raw.KeepWeekly,
KeepMonthly: raw.KeepMonthly, KeepYearly: raw.KeepYearly,
}
}
slog.Info("agent: accepting forget job", "job_id", p.JobID, "policy", p.RetentionPolicy)
go func() {
if err := r.RunForget(ctx, p.JobID, policy); err != nil {
slog.Warn("agent: forget job failed", "job_id", p.JobID, "err", err)
return
}
slog.Info("agent: forget job complete", "job_id", p.JobID)
}()
default:
return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind)
}
return nil
}
func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
if err != nil {
return fmt.Errorf("sysinfo: %w", err)
}
res, err := wsclient.Enroll(ctx, serverURL, wsclient.EnrollRequest{
Token: token,
HostName: snap.Hostname,
OS: snap.OS,
Arch: snap.Arch,
AgentVersion: agentVersion,
ResticVersion: snap.ResticVersion,
})
if err != nil {
return fmt.Errorf("enroll: %w", err)
}
cfg.ServerURL = serverURL
cfg.HostID = res.HostID
cfg.AgentToken = res.AgentToken
cfg.CertPinSHA256 = res.CertPinSHA256
if err := cfg.Save(); err != nil {
return fmt.Errorf("save config: %w", err)
}
fmt.Fprintf(os.Stderr, "enrolled as host %s on %s\n", res.HostID, serverURL)
return nil
}