server: maintenance ticker drives forget/prune/check on cadence
Wires a 60s server-side ticker to the pure-logic maintenance.Decide
introduced in the previous commit. Decisions flow through a new
DispatchMaintenance method on *Server, which:
- skips offline hosts (no pending_runs queueing — maintenance is
not a backup, missed fires shouldn't pile up)
- silently skips prune when admin creds aren't bound
- pushes admin creds before prune, then dispatches with
RequiresAdminCreds=true (same as operator-driven prune)
- persists job rows with actor_kind="system"
Reshapes the forget wire payload from a single RetentionPolicy to a
ForgetGroups list (one tag + per-group keep-* per source group). The
agent walks the groups and runs `restic forget --tag <name> --keep-*`
once per group. Dead-code removed: CommandRunPayload.RetentionPolicy,
the old forget JSON-decode in cmd/agent, and the single-policy form of
restic.RunForget.
This commit is contained in:
+18
-21
@@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -336,28 +335,26 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc
|
|||||||
slog.Info("agent: init job complete", "job_id", p.JobID)
|
slog.Info("agent: init job complete", "job_id", p.JobID)
|
||||||
}()
|
}()
|
||||||
case api.JobForget:
|
case api.JobForget:
|
||||||
var policy restic.ForgetPolicy
|
if len(p.ForgetGroups) == 0 {
|
||||||
if len(p.RetentionPolicy) > 0 {
|
return fmt.Errorf("forget: command.run carried no forget_groups (server didn't populate them)")
|
||||||
var raw struct {
|
|
||||||
KeepLast *int `json:"keep_last,omitempty"`
|
|
||||||
KeepHourly *int `json:"keep_hourly,omitempty"`
|
|
||||||
KeepDaily *int `json:"keep_daily,omitempty"`
|
|
||||||
KeepWeekly *int `json:"keep_weekly,omitempty"`
|
|
||||||
KeepMonthly *int `json:"keep_monthly,omitempty"`
|
|
||||||
KeepYearly *int `json:"keep_yearly,omitempty"`
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(p.RetentionPolicy, &raw); err != nil {
|
|
||||||
return fmt.Errorf("forget: decode retention_policy: %w", err)
|
|
||||||
}
|
|
||||||
policy = restic.ForgetPolicy{
|
|
||||||
KeepLast: raw.KeepLast, KeepHourly: raw.KeepHourly,
|
|
||||||
KeepDaily: raw.KeepDaily, KeepWeekly: raw.KeepWeekly,
|
|
||||||
KeepMonthly: raw.KeepMonthly, KeepYearly: raw.KeepYearly,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
slog.Info("agent: accepting forget job", "job_id", p.JobID, "policy", p.RetentionPolicy)
|
groups := make([]restic.ForgetGroup, 0, len(p.ForgetGroups))
|
||||||
|
for _, g := range p.ForgetGroups {
|
||||||
|
groups = append(groups, restic.ForgetGroup{
|
||||||
|
Tag: g.Tag,
|
||||||
|
Policy: restic.ForgetPolicy{
|
||||||
|
KeepLast: g.Policy.KeepLast,
|
||||||
|
KeepHourly: g.Policy.KeepHourly,
|
||||||
|
KeepDaily: g.Policy.KeepDaily,
|
||||||
|
KeepWeekly: g.Policy.KeepWeekly,
|
||||||
|
KeepMonthly: g.Policy.KeepMonthly,
|
||||||
|
KeepYearly: g.Policy.KeepYearly,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
slog.Info("agent: accepting forget job", "job_id", p.JobID, "groups", len(groups))
|
||||||
go func() {
|
go func() {
|
||||||
if err := r.RunForget(ctx, p.JobID, policy); err != nil {
|
if err := r.RunForget(ctx, p.JobID, groups); err != nil {
|
||||||
slog.Warn("agent: forget job failed", "job_id", p.JobID, "err", err)
|
slog.Warn("agent: forget job failed", "job_id", p.JobID, "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/config"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/config"
|
||||||
rmhttp "gitea.dcglab.co.uk/steve/restic-manager/internal/server/http"
|
rmhttp "gitea.dcglab.co.uk/steve/restic-manager/internal/server/http"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
@@ -139,6 +140,14 @@ func run() error {
|
|||||||
defer purgeTick.Stop()
|
defer purgeTick.Stop()
|
||||||
offlineTick := time.NewTicker(30 * time.Second)
|
offlineTick := time.NewTicker(30 * time.Second)
|
||||||
defer offlineTick.Stop()
|
defer offlineTick.Stop()
|
||||||
|
// Maintenance ticker: drives forget/prune/check on the cadences
|
||||||
|
// operators set per-host. Independent of the agent's local cron
|
||||||
|
// (which only handles backup schedules). 60s cadence — the cron
|
||||||
|
// expressions are minute-grained, so anything finer is wasted
|
||||||
|
// work.
|
||||||
|
maintenanceTick := time.NewTicker(60 * time.Second)
|
||||||
|
defer maintenanceTick.Stop()
|
||||||
|
mt := maintenance.New(st)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -156,6 +165,16 @@ func run() error {
|
|||||||
if n, err := st.MarkHostsOfflineStale(ctx, cutoff); err == nil && n > 0 {
|
if n, err := st.MarkHostsOfflineStale(ctx, cutoff); err == nil && n > 0 {
|
||||||
slog.Info("marked hosts offline (stale heartbeat)", "n", n)
|
slog.Info("marked hosts offline (stale heartbeat)", "n", n)
|
||||||
}
|
}
|
||||||
|
case <-maintenanceTick.C:
|
||||||
|
decisions, err := mt.Decide(ctx, time.Now().UTC())
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("maintenance ticker: decide", "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(decisions) > 0 {
|
||||||
|
slog.Info("maintenance ticker: dispatching", "n", len(decisions))
|
||||||
|
srv.DispatchMaintenance(ctx, decisions)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@@ -206,18 +206,20 @@ func (r *Runner) RunInit(ctx context.Context, jobID string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunForget executes a forget job against the configured repo with
|
// RunForget executes a forget job against the configured repo by
|
||||||
// the given retention policy. Same envelope shape as RunBackup so
|
// invoking `restic forget --tag <Tag> --keep-* …` once per group.
|
||||||
// the live log viewer + job lifecycle work without special-casing.
|
// Same envelope shape as RunBackup so the live log viewer + job
|
||||||
// On success refreshes the snapshot projection (forget rewrites the
|
// lifecycle work without special-casing. On success refreshes the
|
||||||
// snapshot index — the host's snapshot list shrinks).
|
// snapshot projection (forget rewrites the snapshot index — the
|
||||||
func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.ForgetPolicy) error {
|
// host's snapshot list shrinks). Snapshot refresh runs once after
|
||||||
|
// every group completes, not per-group.
|
||||||
|
func (r *Runner) RunForget(ctx context.Context, jobID string, groups []restic.ForgetGroup) error {
|
||||||
startedAt := time.Now().UTC()
|
startedAt := time.Now().UTC()
|
||||||
r.sendStarted(jobID, api.JobForget, startedAt)
|
r.sendStarted(jobID, api.JobForget, startedAt)
|
||||||
|
|
||||||
env := r.resticEnv()
|
env := r.resticEnv()
|
||||||
var seq atomic.Int64
|
var seq atomic.Int64
|
||||||
err := env.RunForget(ctx, policy, r.streamHandler(jobID, &seq))
|
err := env.RunForget(ctx, groups, r.streamHandler(jobID, &seq))
|
||||||
finishedAt := time.Now().UTC()
|
finishedAt := time.Now().UTC()
|
||||||
r.sendFinished(jobID, finishedAt, err, nil)
|
r.sendFinished(jobID, finishedAt, err, nil)
|
||||||
|
|
||||||
|
|||||||
@@ -333,8 +333,11 @@ esac
|
|||||||
tx := &fakeSender{}
|
tx := &fakeSender{}
|
||||||
r := New(Config{ResticBin: bin}, tx, 0)
|
r := New(Config{ResticBin: bin}, tx, 0)
|
||||||
keepLast := 1
|
keepLast := 1
|
||||||
policy := restic.ForgetPolicy{KeepLast: &keepLast}
|
groups := []restic.ForgetGroup{{
|
||||||
if err := r.RunForget(context.Background(), "job-forget", policy); err != nil {
|
Tag: "documents",
|
||||||
|
Policy: restic.ForgetPolicy{KeepLast: &keepLast},
|
||||||
|
}}
|
||||||
|
if err := r.RunForget(context.Background(), "job-forget", groups); err != nil {
|
||||||
t.Fatalf("RunForget: %v", err)
|
t.Fatalf("RunForget: %v", err)
|
||||||
}
|
}
|
||||||
_ = firstEnvOfType(t, tx.envs, api.MsgJobStarted)
|
_ = firstEnvOfType(t, tx.envs, api.MsgJobStarted)
|
||||||
|
|||||||
+39
-13
@@ -77,6 +77,30 @@ const (
|
|||||||
JobCancelled JobStatus = "cancelled" //nolint:misspell // wire format
|
JobCancelled JobStatus = "cancelled" //nolint:misspell // wire format
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ForgetPolicyJSON is the wire shape of a per-group retention policy
|
||||||
|
// shipped with a forget command.run. Mirrors store.RetentionPolicy
|
||||||
|
// JSON tags exactly so a future caller could json-roundtrip between
|
||||||
|
// the two without reshaping. All fields nullable; an empty struct is
|
||||||
|
// rejected by the agent (restic refuses to forget without --keep-*).
|
||||||
|
type ForgetPolicyJSON struct {
|
||||||
|
KeepLast *int `json:"keep_last,omitempty"`
|
||||||
|
KeepHourly *int `json:"keep_hourly,omitempty"`
|
||||||
|
KeepDaily *int `json:"keep_daily,omitempty"`
|
||||||
|
KeepWeekly *int `json:"keep_weekly,omitempty"`
|
||||||
|
KeepMonthly *int `json:"keep_monthly,omitempty"`
|
||||||
|
KeepYearly *int `json:"keep_yearly,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ForgetGroup is one (tag, retention) pair shipped to the agent in a
|
||||||
|
// forget command.run. The agent invokes
|
||||||
|
// `restic forget --tag <Tag> --keep-* …` once per group, with each
|
||||||
|
// group's own policy. The Tag is the source-group name (which is
|
||||||
|
// also the snapshot tag carried at backup time).
|
||||||
|
type ForgetGroup struct {
|
||||||
|
Tag string `json:"tag"`
|
||||||
|
Policy ForgetPolicyJSON `json:"policy"`
|
||||||
|
}
|
||||||
|
|
||||||
// CommandRunPayload is the server → agent dispatch for a run-now job.
|
// CommandRunPayload is the server → agent dispatch for a run-now job.
|
||||||
//
|
//
|
||||||
// For kind=backup, Includes/Excludes/Tag are populated from the source
|
// For kind=backup, Includes/Excludes/Tag are populated from the source
|
||||||
@@ -85,25 +109,27 @@ const (
|
|||||||
// the source group's name) so retention can target it later via
|
// the source group's name) so retention can target it later via
|
||||||
// `restic forget --tag`.
|
// `restic forget --tag`.
|
||||||
//
|
//
|
||||||
// For kind=forget, RetentionPolicy is the typed keep-* set as raw JSON
|
// For kind=forget, ForgetGroups carries one entry per source-group on
|
||||||
// (the agent doesn't share the store package's typed struct).
|
// the host that has a non-empty retention policy. The agent walks the
|
||||||
|
// list and runs `restic forget --tag <Tag> --keep-* …` per group.
|
||||||
//
|
//
|
||||||
// Args is preserved as a generic free-form slice for kinds that don't
|
// Args is preserved as a generic free-form slice for kinds that don't
|
||||||
// fit the structured fields (e.g. unlock takes none; init takes none).
|
// fit the structured fields (e.g. unlock takes none; init takes none;
|
||||||
|
// check carries the subset% as Args[0]).
|
||||||
//
|
//
|
||||||
// RequiresAdminCreds tells the agent to load the admin slot of its
|
// RequiresAdminCreds tells the agent to load the admin slot of its
|
||||||
// secrets store rather than the everyday repo slot. Set by the server
|
// secrets store rather than the everyday repo slot. Set by the server
|
||||||
// only for prune and operator-triggered unlock (kinds that need delete
|
// only for prune (the only kind that needs delete authority on a
|
||||||
// authority on a rest-server repo).
|
// rest-server repo today).
|
||||||
type CommandRunPayload struct {
|
type CommandRunPayload struct {
|
||||||
JobID string `json:"job_id"`
|
JobID string `json:"job_id"`
|
||||||
Kind JobKind `json:"kind"`
|
Kind JobKind `json:"kind"`
|
||||||
Args []string `json:"args,omitempty"`
|
Args []string `json:"args,omitempty"`
|
||||||
Includes []string `json:"includes,omitempty"`
|
Includes []string `json:"includes,omitempty"`
|
||||||
Excludes []string `json:"excludes,omitempty"`
|
Excludes []string `json:"excludes,omitempty"`
|
||||||
Tag string `json:"tag,omitempty"`
|
Tag string `json:"tag,omitempty"`
|
||||||
RetentionPolicy json.RawMessage `json:"retention_policy,omitempty"`
|
ForgetGroups []ForgetGroup `json:"forget_groups,omitempty"`
|
||||||
RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"`
|
RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommandCancelPayload is the server → agent cancel signal.
|
// CommandCancelPayload is the server → agent cancel signal.
|
||||||
|
|||||||
+37
-19
@@ -151,8 +151,7 @@ func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, hand
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ForgetPolicy mirrors restic forget's --keep-* flags. All optional;
|
// ForgetPolicy mirrors restic forget's --keep-* flags. All optional;
|
||||||
// nil/zero means "don't pass that flag." Caller passes whatever the
|
// nil/zero means "don't pass that flag."
|
||||||
// schedule's RetentionPolicy carries.
|
|
||||||
type ForgetPolicy struct {
|
type ForgetPolicy struct {
|
||||||
KeepLast *int
|
KeepLast *int
|
||||||
KeepHourly *int
|
KeepHourly *int
|
||||||
@@ -181,30 +180,49 @@ func (p ForgetPolicy) args() []string {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty reports whether no retention dimensions are set. restic
|
// Empty reports whether no retention dimensions are set.
|
||||||
// forget refuses to run without at least one keep-* flag (it would
|
|
||||||
// delete every snapshot), so the agent rejects empty policies before
|
|
||||||
// invoking restic.
|
|
||||||
func (p ForgetPolicy) Empty() bool {
|
func (p ForgetPolicy) Empty() bool {
|
||||||
return p.KeepLast == nil && p.KeepHourly == nil &&
|
return p.KeepLast == nil && p.KeepHourly == nil &&
|
||||||
p.KeepDaily == nil && p.KeepWeekly == nil &&
|
p.KeepDaily == nil && p.KeepWeekly == nil &&
|
||||||
p.KeepMonthly == nil && p.KeepYearly == nil
|
p.KeepMonthly == nil && p.KeepYearly == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunForget executes `restic forget --keep-* … --json` against the
|
// ForgetGroup is one (tag, retention-policy) pair fed to RunForget.
|
||||||
// configured repo. Does NOT pass --prune — pruning lives behind a
|
// The wrapper invokes `restic forget --tag <Tag> --keep-* …` per
|
||||||
// separate, admin-only credential (see spec §4.3 / P2-06). Restic
|
// group so retention can be targeted at a single source-group's
|
||||||
// just rewrites the snapshot index; the actual data deletion waits
|
// snapshots without disturbing snapshots tagged for other groups.
|
||||||
// for the next prune. Returns nil on a clean exit.
|
type ForgetGroup struct {
|
||||||
func (e Env) RunForget(ctx context.Context, policy ForgetPolicy, handle LineHandler) error {
|
Tag string
|
||||||
if policy.Empty() {
|
Policy ForgetPolicy
|
||||||
return fmt.Errorf("restic forget: refusing to run with empty retention policy (would delete every snapshot)")
|
}
|
||||||
|
|
||||||
|
// RunForget executes one `restic forget --tag <Tag> --keep-* …`
|
||||||
|
// invocation per group. Does NOT pass --prune — pruning lives behind
|
||||||
|
// a separate admin-only credential (see spec §4.3 / P2-06). Restic
|
||||||
|
// rewrites the snapshot index; the actual data deletion waits for
|
||||||
|
// the next prune. Empty groups slice is rejected (would be a no-op);
|
||||||
|
// any group with an empty policy is rejected (restic forget without
|
||||||
|
// any keep-* would delete every snapshot in the tagged set).
|
||||||
|
// Returns the first error encountered, or nil when every group runs
|
||||||
|
// to a clean exit.
|
||||||
|
func (e Env) RunForget(ctx context.Context, groups []ForgetGroup, handle LineHandler) error {
|
||||||
|
if len(groups) == 0 {
|
||||||
|
return fmt.Errorf("restic forget: refusing to run with no groups (would be a no-op)")
|
||||||
}
|
}
|
||||||
args := append([]string{"forget", "--json"}, policy.args()...)
|
for _, g := range groups {
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
if g.Policy.Empty() {
|
||||||
cmd.Env = e.envSlice()
|
return fmt.Errorf("restic forget: group %q has empty retention policy (would delete every snapshot)", g.Tag)
|
||||||
cmd.Dir = e.WorkDir
|
}
|
||||||
return runWithPump(cmd, handle)
|
args := []string{"forget", "--json", "--tag", g.Tag}
|
||||||
|
args = append(args, g.Policy.args()...)
|
||||||
|
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
||||||
|
cmd.Env = e.envSlice()
|
||||||
|
cmd.Dir = e.WorkDir
|
||||||
|
if err := runWithPump(cmd, handle); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunInit executes `restic init` against the configured repo. Returns
|
// RunInit executes `restic init` against the configured repo. Returns
|
||||||
|
|||||||
@@ -0,0 +1,132 @@
|
|||||||
|
// maintenance_dispatch.go bridges the pure-logic maintenance.Ticker
|
||||||
|
// (internal/server/maintenance) to the side-effecting world: checks
|
||||||
|
// online state, builds the per-kind command.run payload, and calls
|
||||||
|
// dispatchJobWithPayload — the same path operator-triggered Run-now
|
||||||
|
// uses. Cadence-driven jobs are persisted with actor_kind="system"
|
||||||
|
// (dispatchJobWithPayload tags it that way when user==nil).
|
||||||
|
//
|
||||||
|
// Maintenance fires deliberately do NOT queue to pending_runs when
|
||||||
|
// the host is offline — five missed prunes on a laptop returning
|
||||||
|
// from a week away is not what the operator wants. Skip + log; the
|
||||||
|
// next 60s tick will re-evaluate.
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DispatchMaintenance acts on each Decision from the ticker. Offline
|
||||||
|
// hosts are skipped (logged); prune dispatches without admin creds
|
||||||
|
// are skipped silently (logged) — the operator hasn't completed the
|
||||||
|
// admin-creds setup yet, and re-trying every minute would just spam
|
||||||
|
// the logs. (Operator-triggered prune via the run-now endpoint
|
||||||
|
// returns a clear error instead — different path, different UX.)
|
||||||
|
func (s *Server) DispatchMaintenance(ctx context.Context, decisions []maintenance.Decision) {
|
||||||
|
for _, d := range decisions {
|
||||||
|
if !s.deps.Hub.Connected(d.HostID) {
|
||||||
|
slog.Info("maintenance: host offline, skipping",
|
||||||
|
"host_id", d.HostID, "kind", d.Kind)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch d.Kind {
|
||||||
|
case "forget":
|
||||||
|
payload, ok := s.buildForgetPayloadForHost(ctx, d.HostID)
|
||||||
|
if !ok {
|
||||||
|
slog.Info("maintenance: forget skipped — no source groups with retention",
|
||||||
|
"host_id", d.HostID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobForget, payload)
|
||||||
|
if code != "" {
|
||||||
|
slog.Warn("maintenance: forget dispatch failed",
|
||||||
|
"host_id", d.HostID, "code", code, "msg", msg)
|
||||||
|
}
|
||||||
|
case "prune":
|
||||||
|
if _, err := s.deps.Store.GetHostCredentials(ctx, d.HostID, store.CredKindAdmin); err != nil {
|
||||||
|
if errors.Is(err, store.ErrNotFound) {
|
||||||
|
slog.Info("maintenance: prune skipped — no admin creds",
|
||||||
|
"host_id", d.HostID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
slog.Warn("maintenance: prune skipped — admin creds error",
|
||||||
|
"host_id", d.HostID, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := s.pushAdminCredsToAgent(ctx, d.HostID); err != nil {
|
||||||
|
slog.Warn("maintenance: prune push admin creds failed",
|
||||||
|
"host_id", d.HostID, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
payload := api.CommandRunPayload{RequiresAdminCreds: true}
|
||||||
|
_, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobPrune, payload)
|
||||||
|
if code != "" {
|
||||||
|
slog.Warn("maintenance: prune dispatch failed",
|
||||||
|
"host_id", d.HostID, "code", code, "msg", msg)
|
||||||
|
}
|
||||||
|
case "check":
|
||||||
|
payload := api.CommandRunPayload{Args: []string{strconv.Itoa(d.SubsetPct)}}
|
||||||
|
_, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobCheck, payload)
|
||||||
|
if code != "" {
|
||||||
|
slog.Warn("maintenance: check dispatch failed",
|
||||||
|
"host_id", d.HostID, "code", code, "msg", msg)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
slog.Warn("maintenance: unknown decision kind",
|
||||||
|
"host_id", d.HostID, "kind", d.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildForgetPayloadForHost collects every source group on the host
|
||||||
|
// that has a non-empty retention policy and builds a CommandRunPayload
|
||||||
|
// with ForgetGroups populated. Returns ok=false if the host has no
|
||||||
|
// such groups (the dispatcher then skips this kind).
|
||||||
|
func (s *Server) buildForgetPayloadForHost(ctx context.Context, hostID string) (api.CommandRunPayload, bool) {
|
||||||
|
groups, err := s.deps.Store.ListSourceGroupsByHost(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("maintenance: list source groups failed", "host_id", hostID, "err", err)
|
||||||
|
return api.CommandRunPayload{}, false
|
||||||
|
}
|
||||||
|
fg := make([]api.ForgetGroup, 0, len(groups))
|
||||||
|
for _, g := range groups {
|
||||||
|
if isEmptyRetention(g.RetentionPolicy) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fg = append(fg, api.ForgetGroup{
|
||||||
|
Tag: g.Name,
|
||||||
|
Policy: forgetPolicyJSONFromStore(g.RetentionPolicy),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if len(fg) == 0 {
|
||||||
|
return api.CommandRunPayload{}, false
|
||||||
|
}
|
||||||
|
return api.CommandRunPayload{ForgetGroups: fg}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func isEmptyRetention(p store.RetentionPolicy) bool {
|
||||||
|
return p.KeepLast == nil && p.KeepHourly == nil &&
|
||||||
|
p.KeepDaily == nil && p.KeepWeekly == nil &&
|
||||||
|
p.KeepMonthly == nil && p.KeepYearly == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// forgetPolicyJSONFromStore copies retention pointers from the store
|
||||||
|
// view to the wire view. Both shapes are field-for-field identical;
|
||||||
|
// this avoids importing store from internal/api (which would invert
|
||||||
|
// the dependency direction).
|
||||||
|
func forgetPolicyJSONFromStore(p store.RetentionPolicy) api.ForgetPolicyJSON {
|
||||||
|
return api.ForgetPolicyJSON{
|
||||||
|
KeepLast: p.KeepLast,
|
||||||
|
KeepHourly: p.KeepHourly,
|
||||||
|
KeepDaily: p.KeepDaily,
|
||||||
|
KeepWeekly: p.KeepWeekly,
|
||||||
|
KeepMonthly: p.KeepMonthly,
|
||||||
|
KeepYearly: p.KeepYearly,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,300 @@
|
|||||||
|
// maintenance_dispatch_test.go — exercises Server.DispatchMaintenance
|
||||||
|
// directly (one Decision at a time). Reuses the same fake-agent
|
||||||
|
// harness as p2r01_ws_test / repo_ops_test: a real Server with a
|
||||||
|
// real Hub, plus a websocket connected as the host. We then push
|
||||||
|
// Decisions through DispatchMaintenance and assert the envelopes
|
||||||
|
// the agent receives + the job rows that land.
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coder/websocket"
|
||||||
|
"github.com/oklog/ulid/v2"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// readNextCommandRun pulls envelopes until a command.run lands or the
|
||||||
|
// deadline passes. Returns nil if the deadline is hit.
|
||||||
|
func readNextCommandRun(t *testing.T, c *websocket.Conn, deadline time.Time) *api.CommandRunPayload {
|
||||||
|
t.Helper()
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
|
||||||
|
mt, raw, err := c.Read(ctx)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if mt != websocket.MessageText {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var env api.Envelope
|
||||||
|
if err := json.Unmarshal(raw, &env); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if env.Type != api.MsgCommandRun {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var p api.CommandRunPayload
|
||||||
|
if err := env.UnmarshalPayload(&p); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return &p
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatchMaintenanceSkipsOfflineHosts: host not connected → no
|
||||||
|
// envelope, no job row.
|
||||||
|
func TestDispatchMaintenanceSkipsOfflineHosts(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, _, st := rawTestServer(t)
|
||||||
|
hostID, _ := enrolHostForWS(t, srv, st, "offline-host")
|
||||||
|
|
||||||
|
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
|
||||||
|
{HostID: hostID, Kind: "check", SubsetPct: 10},
|
||||||
|
})
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT COUNT(*) FROM jobs WHERE host_id = ?`, hostID).Scan(&n); err != nil {
|
||||||
|
t.Fatalf("count: %v", err)
|
||||||
|
}
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("offline host produced %d job rows; want 0", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatchMaintenanceForgetShipsForgetGroups: connected host with
|
||||||
|
// two source groups (one with retention, one without). Decision of
|
||||||
|
// kind=forget → command.run with ForgetGroups containing only the
|
||||||
|
// group that had retention.
|
||||||
|
func TestDispatchMaintenanceForgetShipsForgetGroups(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "forget-host")
|
||||||
|
seedInitJob(t, st, hostID)
|
||||||
|
|
||||||
|
keep := 7
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Name: "documents",
|
||||||
|
Includes: []string{"/home/documents"},
|
||||||
|
RetentionPolicy: store.RetentionPolicy{KeepLast: &keep},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("group docs: %v", err)
|
||||||
|
}
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Name: "ephemeral",
|
||||||
|
Includes: []string{"/tmp"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("group eph: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "forget-host")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
|
||||||
|
{HostID: hostID, Kind: "forget"},
|
||||||
|
})
|
||||||
|
|
||||||
|
got := readNextCommandRun(t, c, time.Now().Add(2*time.Second))
|
||||||
|
if got == nil {
|
||||||
|
t.Fatal("no command.run received")
|
||||||
|
}
|
||||||
|
if got.Kind != api.JobForget {
|
||||||
|
t.Errorf("kind: got %q, want %q", got.Kind, api.JobForget)
|
||||||
|
}
|
||||||
|
if len(got.ForgetGroups) != 1 {
|
||||||
|
t.Fatalf("ForgetGroups: got %d entries (%+v), want 1", len(got.ForgetGroups), got.ForgetGroups)
|
||||||
|
}
|
||||||
|
if got.ForgetGroups[0].Tag != "documents" {
|
||||||
|
t.Errorf("forget group tag: got %q, want %q", got.ForgetGroups[0].Tag, "documents")
|
||||||
|
}
|
||||||
|
if got.ForgetGroups[0].Policy.KeepLast == nil || *got.ForgetGroups[0].Policy.KeepLast != 7 {
|
||||||
|
t.Errorf("forget group policy: got %+v", got.ForgetGroups[0].Policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job row must be persisted with actor_kind=system.
|
||||||
|
var actor string
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'forget'`, hostID).Scan(&actor); err != nil {
|
||||||
|
t.Fatalf("query actor_kind: %v", err)
|
||||||
|
}
|
||||||
|
if actor != "system" {
|
||||||
|
t.Errorf("actor_kind: got %q, want system", actor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatchMaintenanceForgetSkipsHostWithNoRetention: connected
|
||||||
|
// host, but every source group has empty retention → no envelope.
|
||||||
|
func TestDispatchMaintenanceForgetSkipsHostWithNoRetention(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "no-ret-host")
|
||||||
|
seedInitJob(t, st, hostID)
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Name: "ephemeral",
|
||||||
|
Includes: []string{"/tmp"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("group: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "no-ret-host")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
|
||||||
|
{HostID: hostID, Kind: "forget"},
|
||||||
|
})
|
||||||
|
|
||||||
|
if got := readNextCommandRun(t, c, time.Now().Add(800*time.Millisecond)); got != nil {
|
||||||
|
t.Errorf("unexpected command.run: %+v", got)
|
||||||
|
}
|
||||||
|
var n int
|
||||||
|
_ = st.DB().QueryRow(`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'forget'`, hostID).Scan(&n)
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("forget job rows: got %d, want 0", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatchMaintenancePruneSkipsWithoutAdminCreds: no admin creds
|
||||||
|
// row → no envelope, no job row, silent skip.
|
||||||
|
func TestDispatchMaintenancePruneSkipsWithoutAdminCreds(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "no-admin-host")
|
||||||
|
seedInitJob(t, st, hostID)
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "no-admin-host")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
|
||||||
|
{HostID: hostID, Kind: "prune"},
|
||||||
|
})
|
||||||
|
|
||||||
|
if got := readNextCommandRun(t, c, time.Now().Add(800*time.Millisecond)); got != nil {
|
||||||
|
t.Errorf("unexpected command.run: %+v", got)
|
||||||
|
}
|
||||||
|
var n int
|
||||||
|
_ = st.DB().QueryRow(`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'prune'`, hostID).Scan(&n)
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("prune job rows: got %d, want 0", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatchMaintenancePruneShipsConfigUpdateThenCommandRun: with
|
||||||
|
// admin creds set, prune dispatch must push admin config.update first
|
||||||
|
// then command.run(prune, RequiresAdminCreds=true).
|
||||||
|
func TestDispatchMaintenancePruneShipsConfigUpdateThenCommandRun(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "prune-mt-host")
|
||||||
|
setAdminCreds(t, srv, st, hostID)
|
||||||
|
seedInitJob(t, st, hostID)
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "prune-mt-host")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
|
||||||
|
{HostID: hostID, Kind: "prune"},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Read until we've seen both config.update(slot=admin) and the
|
||||||
|
// prune command.run.
|
||||||
|
deadline := time.Now().Add(3 * time.Second)
|
||||||
|
var sawAdminPush bool
|
||||||
|
var prunePayload *api.CommandRunPayload
|
||||||
|
for prunePayload == nil && time.Now().Before(deadline) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
|
||||||
|
mt, raw, err := c.Read(ctx)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if mt != websocket.MessageText {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var env api.Envelope
|
||||||
|
if err := json.Unmarshal(raw, &env); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch env.Type {
|
||||||
|
case api.MsgConfigUpdate:
|
||||||
|
var p api.ConfigUpdatePayload
|
||||||
|
if err := env.UnmarshalPayload(&p); err == nil && p.Slot == "admin" {
|
||||||
|
sawAdminPush = true
|
||||||
|
}
|
||||||
|
case api.MsgCommandRun:
|
||||||
|
var p api.CommandRunPayload
|
||||||
|
if err := env.UnmarshalPayload(&p); err == nil && p.Kind == api.JobPrune {
|
||||||
|
cp := p
|
||||||
|
prunePayload = &cp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !sawAdminPush {
|
||||||
|
t.Error("expected config.update(slot=admin) before prune dispatch")
|
||||||
|
}
|
||||||
|
if prunePayload == nil {
|
||||||
|
t.Fatal("timed out waiting for command.run(prune)")
|
||||||
|
}
|
||||||
|
if !prunePayload.RequiresAdminCreds {
|
||||||
|
t.Error("prune command.run must have RequiresAdminCreds=true")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persisted job must be system actor.
|
||||||
|
var actor string
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'prune'`, hostID).Scan(&actor); err != nil {
|
||||||
|
t.Fatalf("query actor_kind: %v", err)
|
||||||
|
}
|
||||||
|
if actor != "system" {
|
||||||
|
t.Errorf("actor_kind: got %q, want system", actor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatchMaintenanceCheckCarriesSubset: Decision SubsetPct=15 →
|
||||||
|
// command.run.Args == ["15"]. Job row actor_kind=system.
|
||||||
|
func TestDispatchMaintenanceCheckCarriesSubset(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "check-mt-host")
|
||||||
|
seedInitJob(t, st, hostID)
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "check-mt-host")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
|
||||||
|
{HostID: hostID, Kind: "check", SubsetPct: 15},
|
||||||
|
})
|
||||||
|
|
||||||
|
got := readNextCommandRun(t, c, time.Now().Add(2*time.Second))
|
||||||
|
if got == nil {
|
||||||
|
t.Fatal("no command.run received")
|
||||||
|
}
|
||||||
|
if got.Kind != api.JobCheck {
|
||||||
|
t.Errorf("kind: got %q, want %q", got.Kind, api.JobCheck)
|
||||||
|
}
|
||||||
|
if len(got.Args) != 1 || got.Args[0] != "15" {
|
||||||
|
t.Errorf("Args: got %+v, want [15]", got.Args)
|
||||||
|
}
|
||||||
|
|
||||||
|
var actor string
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'check'`, hostID).Scan(&actor); err != nil {
|
||||||
|
t.Fatalf("query actor_kind: %v", err)
|
||||||
|
}
|
||||||
|
if actor != "system" {
|
||||||
|
t.Errorf("actor_kind: got %q, want system", actor)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user