server: maintenance ticker drives forget/prune/check on cadence

Wires a 60s server-side ticker to the pure-logic maintenance.Decide
introduced in the previous commit. Decisions flow through a new
DispatchMaintenance method on *Server, which:

  - skips offline hosts (no pending_runs queueing — maintenance is
    not a backup, missed fires shouldn't pile up)
  - silently skips prune when admin creds aren't bound
  - pushes admin creds before prune, then dispatches with
    RequiresAdminCreds=true (same as operator-driven prune)
  - persists job rows with actor_kind="system"

Reshapes the forget wire payload from a single RetentionPolicy to a
ForgetGroups list (one tag + per-group keep-* per source group). The
agent walks the groups and runs `restic forget --tag <name> --keep-*`
once per group. Dead-code removed: CommandRunPayload.RetentionPolicy,
the old forget JSON-decode in cmd/agent, and the single-policy form of
restic.RunForget.
This commit is contained in:
2026-05-03 23:40:35 +01:00
parent ae96983877
commit 14b703be58
8 changed files with 559 additions and 62 deletions
@@ -0,0 +1,132 @@
// maintenance_dispatch.go bridges the pure-logic maintenance.Ticker
// (internal/server/maintenance) to the side-effecting world: checks
// online state, builds the per-kind command.run payload, and calls
// dispatchJobWithPayload — the same path operator-triggered Run-now
// uses. Cadence-driven jobs are persisted with actor_kind="system"
// (dispatchJobWithPayload tags it that way when user==nil).
//
// Maintenance fires deliberately do NOT queue to pending_runs when
// the host is offline — five missed prunes on a laptop returning
// from a week away is not what the operator wants. Skip + log; the
// next 60s tick will re-evaluate.
package http
import (
"context"
"errors"
"log/slog"
"strconv"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// DispatchMaintenance acts on each Decision from the ticker. Offline
// hosts are skipped (logged); prune dispatches without admin creds
// are skipped silently (logged) — the operator hasn't completed the
// admin-creds setup yet, and re-trying every minute would just spam
// the logs. (Operator-triggered prune via the run-now endpoint
// returns a clear error instead — different path, different UX.)
func (s *Server) DispatchMaintenance(ctx context.Context, decisions []maintenance.Decision) {
for _, d := range decisions {
if !s.deps.Hub.Connected(d.HostID) {
slog.Info("maintenance: host offline, skipping",
"host_id", d.HostID, "kind", d.Kind)
continue
}
switch d.Kind {
case "forget":
payload, ok := s.buildForgetPayloadForHost(ctx, d.HostID)
if !ok {
slog.Info("maintenance: forget skipped — no source groups with retention",
"host_id", d.HostID)
continue
}
_, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobForget, payload)
if code != "" {
slog.Warn("maintenance: forget dispatch failed",
"host_id", d.HostID, "code", code, "msg", msg)
}
case "prune":
if _, err := s.deps.Store.GetHostCredentials(ctx, d.HostID, store.CredKindAdmin); err != nil {
if errors.Is(err, store.ErrNotFound) {
slog.Info("maintenance: prune skipped — no admin creds",
"host_id", d.HostID)
continue
}
slog.Warn("maintenance: prune skipped — admin creds error",
"host_id", d.HostID, "err", err)
continue
}
if err := s.pushAdminCredsToAgent(ctx, d.HostID); err != nil {
slog.Warn("maintenance: prune push admin creds failed",
"host_id", d.HostID, "err", err)
continue
}
payload := api.CommandRunPayload{RequiresAdminCreds: true}
_, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobPrune, payload)
if code != "" {
slog.Warn("maintenance: prune dispatch failed",
"host_id", d.HostID, "code", code, "msg", msg)
}
case "check":
payload := api.CommandRunPayload{Args: []string{strconv.Itoa(d.SubsetPct)}}
_, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobCheck, payload)
if code != "" {
slog.Warn("maintenance: check dispatch failed",
"host_id", d.HostID, "code", code, "msg", msg)
}
default:
slog.Warn("maintenance: unknown decision kind",
"host_id", d.HostID, "kind", d.Kind)
}
}
}
// buildForgetPayloadForHost collects every source group on the host
// that has a non-empty retention policy and builds a CommandRunPayload
// with ForgetGroups populated. Returns ok=false if the host has no
// such groups (the dispatcher then skips this kind).
func (s *Server) buildForgetPayloadForHost(ctx context.Context, hostID string) (api.CommandRunPayload, bool) {
groups, err := s.deps.Store.ListSourceGroupsByHost(ctx, hostID)
if err != nil {
slog.Warn("maintenance: list source groups failed", "host_id", hostID, "err", err)
return api.CommandRunPayload{}, false
}
fg := make([]api.ForgetGroup, 0, len(groups))
for _, g := range groups {
if isEmptyRetention(g.RetentionPolicy) {
continue
}
fg = append(fg, api.ForgetGroup{
Tag: g.Name,
Policy: forgetPolicyJSONFromStore(g.RetentionPolicy),
})
}
if len(fg) == 0 {
return api.CommandRunPayload{}, false
}
return api.CommandRunPayload{ForgetGroups: fg}, true
}
func isEmptyRetention(p store.RetentionPolicy) bool {
return p.KeepLast == nil && p.KeepHourly == nil &&
p.KeepDaily == nil && p.KeepWeekly == nil &&
p.KeepMonthly == nil && p.KeepYearly == nil
}
// forgetPolicyJSONFromStore copies retention pointers from the store
// view to the wire view. Both shapes are field-for-field identical;
// this avoids importing store from internal/api (which would invert
// the dependency direction).
func forgetPolicyJSONFromStore(p store.RetentionPolicy) api.ForgetPolicyJSON {
return api.ForgetPolicyJSON{
KeepLast: p.KeepLast,
KeepHourly: p.KeepHourly,
KeepDaily: p.KeepDaily,
KeepWeekly: p.KeepWeekly,
KeepMonthly: p.KeepMonthly,
KeepYearly: p.KeepYearly,
}
}
@@ -0,0 +1,300 @@
// maintenance_dispatch_test.go — exercises Server.DispatchMaintenance
// directly (one Decision at a time). Reuses the same fake-agent
// harness as p2r01_ws_test / repo_ops_test: a real Server with a
// real Hub, plus a websocket connected as the host. We then push
// Decisions through DispatchMaintenance and assert the envelopes
// the agent receives + the job rows that land.
package http
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/coder/websocket"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// readNextCommandRun pulls envelopes until a command.run lands or the
// deadline passes. Returns nil if the deadline is hit.
func readNextCommandRun(t *testing.T, c *websocket.Conn, deadline time.Time) *api.CommandRunPayload {
t.Helper()
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
mt, raw, err := c.Read(ctx)
cancel()
if err != nil {
return nil
}
if mt != websocket.MessageText {
continue
}
var env api.Envelope
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
if env.Type != api.MsgCommandRun {
continue
}
var p api.CommandRunPayload
if err := env.UnmarshalPayload(&p); err != nil {
continue
}
return &p
}
return nil
}
// TestDispatchMaintenanceSkipsOfflineHosts: host not connected → no
// envelope, no job row.
func TestDispatchMaintenanceSkipsOfflineHosts(t *testing.T) {
t.Parallel()
srv, _, st := rawTestServer(t)
hostID, _ := enrolHostForWS(t, srv, st, "offline-host")
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
{HostID: hostID, Kind: "check", SubsetPct: 10},
})
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ?`, hostID).Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
if n != 0 {
t.Errorf("offline host produced %d job rows; want 0", n)
}
}
// TestDispatchMaintenanceForgetShipsForgetGroups: connected host with
// two source groups (one with retention, one without). Decision of
// kind=forget → command.run with ForgetGroups containing only the
// group that had retention.
func TestDispatchMaintenanceForgetShipsForgetGroups(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "forget-host")
seedInitJob(t, st, hostID)
keep := 7
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: ulid.Make().String(), HostID: hostID, Name: "documents",
Includes: []string{"/home/documents"},
RetentionPolicy: store.RetentionPolicy{KeepLast: &keep},
}); err != nil {
t.Fatalf("group docs: %v", err)
}
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: ulid.Make().String(), HostID: hostID, Name: "ephemeral",
Includes: []string{"/tmp"},
}); err != nil {
t.Fatalf("group eph: %v", err)
}
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "forget-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
{HostID: hostID, Kind: "forget"},
})
got := readNextCommandRun(t, c, time.Now().Add(2*time.Second))
if got == nil {
t.Fatal("no command.run received")
}
if got.Kind != api.JobForget {
t.Errorf("kind: got %q, want %q", got.Kind, api.JobForget)
}
if len(got.ForgetGroups) != 1 {
t.Fatalf("ForgetGroups: got %d entries (%+v), want 1", len(got.ForgetGroups), got.ForgetGroups)
}
if got.ForgetGroups[0].Tag != "documents" {
t.Errorf("forget group tag: got %q, want %q", got.ForgetGroups[0].Tag, "documents")
}
if got.ForgetGroups[0].Policy.KeepLast == nil || *got.ForgetGroups[0].Policy.KeepLast != 7 {
t.Errorf("forget group policy: got %+v", got.ForgetGroups[0].Policy)
}
// Job row must be persisted with actor_kind=system.
var actor string
if err := st.DB().QueryRow(
`SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'forget'`, hostID).Scan(&actor); err != nil {
t.Fatalf("query actor_kind: %v", err)
}
if actor != "system" {
t.Errorf("actor_kind: got %q, want system", actor)
}
}
// TestDispatchMaintenanceForgetSkipsHostWithNoRetention: connected
// host, but every source group has empty retention → no envelope.
func TestDispatchMaintenanceForgetSkipsHostWithNoRetention(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "no-ret-host")
seedInitJob(t, st, hostID)
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: ulid.Make().String(), HostID: hostID, Name: "ephemeral",
Includes: []string{"/tmp"},
}); err != nil {
t.Fatalf("group: %v", err)
}
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "no-ret-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
{HostID: hostID, Kind: "forget"},
})
if got := readNextCommandRun(t, c, time.Now().Add(800*time.Millisecond)); got != nil {
t.Errorf("unexpected command.run: %+v", got)
}
var n int
_ = st.DB().QueryRow(`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'forget'`, hostID).Scan(&n)
if n != 0 {
t.Errorf("forget job rows: got %d, want 0", n)
}
}
// TestDispatchMaintenancePruneSkipsWithoutAdminCreds: no admin creds
// row → no envelope, no job row, silent skip.
func TestDispatchMaintenancePruneSkipsWithoutAdminCreds(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "no-admin-host")
seedInitJob(t, st, hostID)
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "no-admin-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
{HostID: hostID, Kind: "prune"},
})
if got := readNextCommandRun(t, c, time.Now().Add(800*time.Millisecond)); got != nil {
t.Errorf("unexpected command.run: %+v", got)
}
var n int
_ = st.DB().QueryRow(`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'prune'`, hostID).Scan(&n)
if n != 0 {
t.Errorf("prune job rows: got %d, want 0", n)
}
}
// TestDispatchMaintenancePruneShipsConfigUpdateThenCommandRun: with
// admin creds set, prune dispatch must push admin config.update first
// then command.run(prune, RequiresAdminCreds=true).
func TestDispatchMaintenancePruneShipsConfigUpdateThenCommandRun(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "prune-mt-host")
setAdminCreds(t, srv, st, hostID)
seedInitJob(t, st, hostID)
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "prune-mt-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
{HostID: hostID, Kind: "prune"},
})
// Read until we've seen both config.update(slot=admin) and the
// prune command.run.
deadline := time.Now().Add(3 * time.Second)
var sawAdminPush bool
var prunePayload *api.CommandRunPayload
for prunePayload == nil && time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
mt, raw, err := c.Read(ctx)
cancel()
if err != nil {
break
}
if mt != websocket.MessageText {
continue
}
var env api.Envelope
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
switch env.Type {
case api.MsgConfigUpdate:
var p api.ConfigUpdatePayload
if err := env.UnmarshalPayload(&p); err == nil && p.Slot == "admin" {
sawAdminPush = true
}
case api.MsgCommandRun:
var p api.CommandRunPayload
if err := env.UnmarshalPayload(&p); err == nil && p.Kind == api.JobPrune {
cp := p
prunePayload = &cp
}
}
}
if !sawAdminPush {
t.Error("expected config.update(slot=admin) before prune dispatch")
}
if prunePayload == nil {
t.Fatal("timed out waiting for command.run(prune)")
}
if !prunePayload.RequiresAdminCreds {
t.Error("prune command.run must have RequiresAdminCreds=true")
}
// Persisted job must be system actor.
var actor string
if err := st.DB().QueryRow(
`SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'prune'`, hostID).Scan(&actor); err != nil {
t.Fatalf("query actor_kind: %v", err)
}
if actor != "system" {
t.Errorf("actor_kind: got %q, want system", actor)
}
}
// TestDispatchMaintenanceCheckCarriesSubset: Decision SubsetPct=15 →
// command.run.Args == ["15"]. Job row actor_kind=system.
func TestDispatchMaintenanceCheckCarriesSubset(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "check-mt-host")
seedInitJob(t, st, hostID)
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "check-mt-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.DispatchMaintenance(context.Background(), []maintenance.Decision{
{HostID: hostID, Kind: "check", SubsetPct: 15},
})
got := readNextCommandRun(t, c, time.Now().Add(2*time.Second))
if got == nil {
t.Fatal("no command.run received")
}
if got.Kind != api.JobCheck {
t.Errorf("kind: got %q, want %q", got.Kind, api.JobCheck)
}
if len(got.Args) != 1 || got.Args[0] != "15" {
t.Errorf("Args: got %+v, want [15]", got.Args)
}
var actor string
if err := st.DB().QueryRow(
`SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'check'`, hostID).Scan(&actor); err != nil {
t.Fatalf("query actor_kind: %v", err)
}
if actor != "system" {
t.Errorf("actor_kind: got %q, want system", actor)
}
}