From dd35133459775710c235852086e9dcdfbe95ec0f Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sun, 3 May 2026 23:36:48 +0100 Subject: [PATCH] maintenance: pure-logic ticker decides forget/prune/check fires --- internal/server/maintenance/ticker.go | 116 ++++++++ internal/server/maintenance/ticker_test.go | 315 +++++++++++++++++++++ internal/store/jobs.go | 67 +++++ internal/store/jobs_test.go | 115 ++++++++ internal/store/maintenance.go | 34 +++ 5 files changed, 647 insertions(+) create mode 100644 internal/server/maintenance/ticker.go create mode 100644 internal/server/maintenance/ticker_test.go create mode 100644 internal/store/jobs_test.go diff --git a/internal/server/maintenance/ticker.go b/internal/server/maintenance/ticker.go new file mode 100644 index 0000000..a29d8a2 --- /dev/null +++ b/internal/server/maintenance/ticker.go @@ -0,0 +1,116 @@ +// Package maintenance owns the server-side scheduler that fires +// forget/prune/check on the cadences operators set on +// host_repo_maintenance rows. Independent of the agent's local cron +// (which now only handles backup schedules). +// +// The ticker is intentionally side-effect-free at the package +// boundary: it asks an injected Backend for current state and emits +// a list of Decisions for the caller to act on. Easy to unit-test +// without a running server. +package maintenance + +import ( + "context" + "errors" + "time" + + "github.com/robfig/cron/v3" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// Decision is one cadence-driven dispatch the ticker recommends. +// SubsetPct is populated only when Kind == "check"; ignored for +// "forget" and "prune". +type Decision struct { + HostID string + Kind string // "forget" | "prune" | "check" + SubsetPct int +} + +// Backend is the subset of *store.Store the ticker depends on. +// Constrained interface so tests can pass a fake. +type Backend interface { + ListAllMaintenance(ctx context.Context) ([]store.HostRepoMaintenance, error) + LatestJobByKind(ctx context.Context, hostID, kind string) (*store.Job, error) +} + +// Ticker decides which cadence-driven jobs are due to fire at a +// given instant. Stateless — the only state lives in the Backend. +type Ticker struct { + backend Backend + parser cron.Parser +} + +// New builds a Ticker bound to the given Backend. +func New(b Backend) *Ticker { + return &Ticker{ + backend: b, + parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow), + } +} + +// Decide returns the set of jobs the ticker would dispatch at `now`. +// The caller is responsible for: checking host online state, +// persisting the job row, and shipping command.run. Returns nil +// (not an error) when the maintenance table is empty — a fresh +// install is the most common case. +func (t *Ticker) Decide(ctx context.Context, now time.Time) ([]Decision, error) { + rows, err := t.backend.ListAllMaintenance(ctx) + if err != nil { + return nil, err + } + var out []Decision + for _, m := range rows { + if d, ok := t.dueFor(ctx, now, m.HostID, "forget", m.ForgetCron, m.ForgetEnabled, 0); ok { + out = append(out, d) + } + if d, ok := t.dueFor(ctx, now, m.HostID, "prune", m.PruneCron, m.PruneEnabled, 0); ok { + out = append(out, d) + } + if d, ok := t.dueFor(ctx, now, m.HostID, "check", m.CheckCron, m.CheckEnabled, m.CheckSubsetPct); ok { + out = append(out, d) + } + } + return out, nil +} + +// dueFor returns true if the cron has a fire-instant strictly after +// the latest persisted job's created_at and at-or-before now. +// +// Anchor selection: +// - When LatestJobByKind returns a job: anchor = j.CreatedAt. +// - When LatestJobByKind returns ErrNotFound: anchor = now - 24h +// (first-run case — cap the lookback so a brand-new host doesn't +// fire 30 days of missed monthly-checks on first tick). +// - When LatestJobByKind returns a hard error: skip this kind for +// this host on this tick. +// +// Disabled (`enabled == false`) or empty cron skips silently. +// Cron parse failures skip silently — the schedule/maintenance +// routes already validate cron at write time, so this is defensive. +func (t *Ticker) dueFor(ctx context.Context, now time.Time, hostID, kind, expr string, enabled bool, subset int) (Decision, bool) { + if !enabled || expr == "" { + return Decision{}, false + } + sched, err := t.parser.Parse(expr) + if err != nil { + return Decision{}, false + } + j, err := t.backend.LatestJobByKind(ctx, hostID, kind) + var anchor time.Time + switch { + case err == nil && j != nil: + anchor = j.CreatedAt + case errors.Is(err, store.ErrNotFound): + anchor = now.Add(-24 * time.Hour) + default: + // Hard error — skip this kind on this tick. + return Decision{}, false + } + next := sched.Next(anchor) + if next.IsZero() || next.After(now) { + return Decision{}, false + } + return Decision{HostID: hostID, Kind: kind, SubsetPct: subset}, true +} diff --git a/internal/server/maintenance/ticker_test.go b/internal/server/maintenance/ticker_test.go new file mode 100644 index 0000000..c0631df --- /dev/null +++ b/internal/server/maintenance/ticker_test.go @@ -0,0 +1,315 @@ +package maintenance + +import ( + "context" + "errors" + "testing" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// fakeBackend implements Backend with table-driven canned responses. +type fakeBackend struct { + rows []store.HostRepoMaintenance + // jobs[hostID][kind] -> job (if present, returned). If absent, + // fakeBackend returns ErrNotFound by default. + jobs map[string]map[string]*store.Job + // hardErr forces a non-ErrNotFound failure for a given (host, kind). + hardErr map[string]map[string]error + // listErr forces ListAllMaintenance to fail. + listErr error +} + +func (f *fakeBackend) ListAllMaintenance(_ context.Context) ([]store.HostRepoMaintenance, error) { + if f.listErr != nil { + return nil, f.listErr + } + return f.rows, nil +} + +func (f *fakeBackend) LatestJobByKind(_ context.Context, hostID, kind string) (*store.Job, error) { + if hostErrs, ok := f.hardErr[hostID]; ok { + if err := hostErrs[kind]; err != nil { + return nil, err + } + } + if hostJobs, ok := f.jobs[hostID]; ok { + if j := hostJobs[kind]; j != nil { + return j, nil + } + } + return nil, store.ErrNotFound +} + +// mustTime parses an RFC3339 string, fatal on failure. +func mustTime(t *testing.T, s string) time.Time { + t.Helper() + out, err := time.Parse(time.RFC3339, s) + if err != nil { + t.Fatalf("parse %q: %v", s, err) + } + return out +} + +func TestTickerSkipsDisabled(t *testing.T) { + t.Parallel() + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "0 3 * * *", + ForgetEnabled: false, + PruneCron: "0 4 * * *", + PruneEnabled: false, + CheckCron: "0 5 * * *", + CheckEnabled: false, + }}, + } + tk := New(be) + now := mustTime(t, "2026-05-04T04:00:00Z") + got, err := tk.Decide(context.Background(), now) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 0 { + t.Errorf("expected no decisions, got %+v", got) + } +} + +func TestTickerSkipsEmptyCron(t *testing.T) { + t.Parallel() + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "", + ForgetEnabled: true, + PruneCron: "", + PruneEnabled: true, + CheckCron: "", + CheckEnabled: true, + }}, + } + tk := New(be) + got, err := tk.Decide(context.Background(), mustTime(t, "2026-05-04T04:00:00Z")) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 0 { + t.Errorf("expected no decisions, got %+v", got) + } +} + +func TestTickerFiresWhenOverdue(t *testing.T) { + t.Parallel() + now := mustTime(t, "2026-05-04T04:00:00Z") + // Latest forget job 25h ago. + last := now.Add(-25 * time.Hour) + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "0 3 * * *", + ForgetEnabled: true, + }}, + jobs: map[string]map[string]*store.Job{ + "h1": {"forget": &store.Job{ID: "j1", HostID: "h1", Kind: "forget", CreatedAt: last}}, + }, + } + tk := New(be) + got, err := tk.Decide(context.Background(), now) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 1 || got[0].Kind != "forget" || got[0].HostID != "h1" { + t.Errorf("expected one forget decision, got %+v", got) + } +} + +func TestTickerSuppressesWhenRecent(t *testing.T) { + t.Parallel() + now := mustTime(t, "2026-05-04T04:00:00Z") + last := mustTime(t, "2026-05-04T03:30:00Z") + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "0 3 * * *", + ForgetEnabled: true, + }}, + jobs: map[string]map[string]*store.Job{ + "h1": {"forget": &store.Job{ID: "j1", HostID: "h1", Kind: "forget", CreatedAt: last}}, + }, + } + tk := New(be) + got, err := tk.Decide(context.Background(), now) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 0 { + t.Errorf("expected no decisions, got %+v", got) + } +} + +func TestTickerFirstRunAnchorBoundedAt24h(t *testing.T) { + t.Parallel() + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "0 3 * * *", + ForgetEnabled: true, + }}, + } + tk := New(be) + + // Case 1: now=04:00. Anchor=04:00 - 24h = previous-day 04:00. Next + // fire after that is today 03:00 — within window → fire. + now1 := mustTime(t, "2026-05-04T04:00:00Z") + got, err := tk.Decide(context.Background(), now1) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 1 { + t.Errorf("case1: expected 1 decision, got %+v", got) + } + + // Case 2: a cron firing less often than once per 24h with a + // no-prior-job anchor must not fire when the most recent fire is + // outside the 24h lookback window. Use a weekly cron (Mondays at + // 03:00) and `now` on a Tuesday: anchor=now-24h lands on Monday, + // so cron.Next(Monday) = next-week Monday → after now → no fire. + // 2026-05-04 is a Monday, 2026-05-05 a Tuesday. + be2 := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h2", + ForgetCron: "0 3 * * 1", + ForgetEnabled: true, + }}, + } + tk2 := New(be2) + now2 := mustTime(t, "2026-05-05T03:00:00Z") + got2, err := tk2.Decide(context.Background(), now2) + if err != nil { + t.Fatalf("Decide case2: %v", err) + } + if len(got2) != 0 { + t.Errorf("case2: expected no decisions (cron fires < once/24h, prior fire was Monday 03:00 which is exactly 24h ago and anchor=now-24h means next-after is next Monday), got %+v", got2) + } +} + +func TestTickerCheckDecisionCarriesSubset(t *testing.T) { + t.Parallel() + now := mustTime(t, "2026-05-04T04:00:00Z") + last := now.Add(-30 * 24 * time.Hour) + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + CheckCron: "0 3 * * *", + CheckEnabled: true, + CheckSubsetPct: 25, + }}, + jobs: map[string]map[string]*store.Job{ + "h1": {"check": &store.Job{ID: "j1", HostID: "h1", Kind: "check", CreatedAt: last}}, + }, + } + tk := New(be) + got, err := tk.Decide(context.Background(), now) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 1 || got[0].Kind != "check" || got[0].SubsetPct != 25 { + t.Errorf("expected check decision with SubsetPct=25, got %+v", got) + } +} + +func TestTickerHardJobErrorSkipsKind(t *testing.T) { + t.Parallel() + now := mustTime(t, "2026-05-04T04:00:00Z") + last := now.Add(-25 * time.Hour) + hardErr := errors.New("synthetic db error") + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "0 3 * * *", + ForgetEnabled: true, + CheckCron: "0 3 * * *", + CheckEnabled: true, + }}, + jobs: map[string]map[string]*store.Job{ + // check has a normal latest-job; should still fire. + "h1": {"check": &store.Job{ID: "jc", HostID: "h1", Kind: "check", CreatedAt: last}}, + }, + hardErr: map[string]map[string]error{ + "h1": {"forget": hardErr}, + }, + } + tk := New(be) + got, err := tk.Decide(context.Background(), now) + if err != nil { + t.Fatalf("Decide: %v", err) + } + // Only the check decision should land — forget is skipped. + if len(got) != 1 || got[0].Kind != "check" { + t.Errorf("expected only check decision, got %+v", got) + } +} + +func TestTickerHandlesMultipleHosts(t *testing.T) { + t.Parallel() + now := mustTime(t, "2026-05-04T04:00:00Z") + last := now.Add(-25 * time.Hour) + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{ + { + HostID: "ha", + ForgetCron: "0 3 * * *", + ForgetEnabled: true, + }, + { + HostID: "hb", + CheckCron: "0 3 * * *", + CheckEnabled: true, + PruneCron: "0 4 * * *", + PruneEnabled: false, // disabled — should not fire + }, + }, + jobs: map[string]map[string]*store.Job{ + "ha": {"forget": &store.Job{ID: "j1", HostID: "ha", Kind: "forget", CreatedAt: last}}, + "hb": {"check": &store.Job{ID: "j2", HostID: "hb", Kind: "check", CreatedAt: last}}, + }, + } + tk := New(be) + got, err := tk.Decide(context.Background(), now) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 2 { + t.Fatalf("expected 2 decisions, got %d: %+v", len(got), got) + } + kinds := map[string]string{} + for _, d := range got { + kinds[d.HostID] = d.Kind + } + if kinds["ha"] != "forget" { + t.Errorf("ha: expected forget, got %q", kinds["ha"]) + } + if kinds["hb"] != "check" { + t.Errorf("hb: expected check, got %q", kinds["hb"]) + } +} + +func TestTickerInvalidCronSkipsSilently(t *testing.T) { + t.Parallel() + be := &fakeBackend{ + rows: []store.HostRepoMaintenance{{ + HostID: "h1", + ForgetCron: "not a cron", + ForgetEnabled: true, + }}, + } + tk := New(be) + got, err := tk.Decide(context.Background(), mustTime(t, "2026-05-04T04:00:00Z")) + if err != nil { + t.Fatalf("Decide: %v", err) + } + if len(got) != 0 { + t.Errorf("expected no decisions for invalid cron, got %+v", got) + } +} diff --git a/internal/store/jobs.go b/internal/store/jobs.go index 9438902..1cec113 100644 --- a/internal/store/jobs.go +++ b/internal/store/jobs.go @@ -193,6 +193,73 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { return &j, nil } +// LatestJobByKind returns the most recent terminal job (status in +// 'succeeded','failed','cancelled' — UK spelling matches the wire/DB +// literal, see api.JobCancelled) of the given kind for the host, or +// (nil, ErrNotFound) if no such job exists. Used by the maintenance +// ticker to compute "last fire" anchors for the cron-due check; +// queued and running jobs are excluded so an in-flight run doesn't +// suppress its own cron tick from firing. //nolint:misspell // wire format +func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, error) { + row := s.db.QueryRowContext(ctx, + `SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id, + started_at, finished_at, exit_code, stats, error, created_at + FROM jobs + WHERE host_id = ? AND kind = ? + AND status IN ('succeeded','failed','cancelled') + ORDER BY created_at DESC + LIMIT 1`, hostID, kind) + var ( + j Job + schedID sql.NullString + actorID sql.NullString + startedAt sql.NullString + finishedAt sql.NullString + exitCode sql.NullInt64 + stats sql.NullString + errMsg sql.NullString + createdAt string + ) + if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, + &j.ActorKind, &actorID, &startedAt, &finishedAt, + &exitCode, &stats, &errMsg, &createdAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("store: scan latest job by kind: %w", err) + } + if schedID.Valid { + s := schedID.String + j.ScheduledID = &s + } + if actorID.Valid { + s := actorID.String + j.ActorID = &s + } + if startedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, startedAt.String) + j.StartedAt = &t + } + if finishedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, finishedAt.String) + j.FinishedAt = &t + } + if exitCode.Valid { + i := int(exitCode.Int64) + j.ExitCode = &i + } + if stats.Valid && stats.String != "" { + j.Stats = json.RawMessage(stats.String) + } + if errMsg.Valid { + s := errMsg.String + j.Error = &s + } + t, _ := time.Parse(time.RFC3339Nano, createdAt) + j.CreatedAt = t + return &j, nil +} + // HasJobOfKind reports whether any job of the given kind exists for // this host, regardless of status. Used by the auto-init path on // agent hello to decide whether to dispatch a fresh `restic init` — diff --git a/internal/store/jobs_test.go b/internal/store/jobs_test.go new file mode 100644 index 0000000..46af279 --- /dev/null +++ b/internal/store/jobs_test.go @@ -0,0 +1,115 @@ +package store + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestLatestJobByKind(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + + // No jobs yet → ErrNotFound. + if _, err := s.LatestJobByKind(ctx, hostID, "forget"); !errors.Is(err, ErrNotFound) { + t.Fatalf("expected ErrNotFound on empty, got %v", err) + } + + // Insert two finished jobs of kind=forget; the newer one should win. + older := time.Now().UTC().Add(-2 * time.Hour) + newer := time.Now().UTC().Add(-1 * time.Hour) + + if err := s.CreateJob(ctx, Job{ + ID: "j-old", HostID: hostID, Kind: "forget", + ActorKind: "system", CreatedAt: older, + }); err != nil { + t.Fatalf("create older: %v", err) + } + if err := s.MarkJobFinished(ctx, "j-old", "succeeded", 0, nil, "", older.Add(time.Minute)); err != nil { + t.Fatalf("finish older: %v", err) + } + if err := s.CreateJob(ctx, Job{ + ID: "j-new", HostID: hostID, Kind: "forget", + ActorKind: "system", CreatedAt: newer, + }); err != nil { + t.Fatalf("create newer: %v", err) + } + if err := s.MarkJobFinished(ctx, "j-new", "failed", 1, nil, "boom", newer.Add(time.Minute)); err != nil { + t.Fatalf("finish newer: %v", err) + } + + got, err := s.LatestJobByKind(ctx, hostID, "forget") + if err != nil { + t.Fatalf("LatestJobByKind: %v", err) + } + if got.ID != "j-new" { + t.Errorf("want j-new, got %q", got.ID) + } + + // A queued job should be ignored — terminal-status filter. + queuedAt := time.Now().UTC() + if err := s.CreateJob(ctx, Job{ + ID: "j-queued", HostID: hostID, Kind: "forget", + ActorKind: "system", CreatedAt: queuedAt, + }); err != nil { + t.Fatalf("create queued: %v", err) + } + got2, err := s.LatestJobByKind(ctx, hostID, "forget") + if err != nil { + t.Fatalf("LatestJobByKind 2: %v", err) + } + if got2.ID != "j-new" { + t.Errorf("queued job should be ignored; want j-new, got %q", got2.ID) + } + + // Different kind → ErrNotFound. + if _, err := s.LatestJobByKind(ctx, hostID, "prune"); !errors.Is(err, ErrNotFound) { + t.Fatalf("expected ErrNotFound for prune, got %v", err) + } +} + +func TestListAllMaintenance(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + + // Empty case. + rows, err := s.ListAllMaintenance(ctx) + if err != nil { + t.Fatalf("empty list: %v", err) + } + if len(rows) != 0 { + t.Errorf("want empty, got %+v", rows) + } + + // Seed two hosts with maintenance rows. + h1 := "01HMAINTHOST00000000000A1" + h2 := "01HMAINTHOST00000000000A2" + for i, id := range []string{h1, h2} { + if err := s.CreateHost(ctx, Host{ + ID: id, Name: "maint-host-" + string(rune('a'+i)), + OS: "linux", Arch: "amd64", + AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, + EnrolledAt: time.Now().UTC(), + }, "th-"+id, ""); err != nil { + t.Fatalf("create host %s: %v", id, err) + } + } + if err := s.CreateDefaultRepoMaintenance(ctx, h1); err != nil { + t.Fatalf("seed h1: %v", err) + } + if err := s.CreateDefaultRepoMaintenance(ctx, h2); err != nil { + t.Fatalf("seed h2: %v", err) + } + + rows, err = s.ListAllMaintenance(ctx) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(rows) != 2 { + t.Errorf("want 2 rows, got %d", len(rows)) + } +} diff --git a/internal/store/maintenance.go b/internal/store/maintenance.go index 795a6b0..ddeb5e0 100644 --- a/internal/store/maintenance.go +++ b/internal/store/maintenance.go @@ -50,6 +50,40 @@ func (st *Store) GetRepoMaintenance(ctx context.Context, hostID string) (*HostRe return &m, nil } +// ListAllMaintenance returns every host_repo_maintenance row. +// Used by the server-side maintenance ticker to iterate every +// host on each tick. Order is unspecified (the ticker doesn't +// care). +func (st *Store) ListAllMaintenance(ctx context.Context) ([]HostRepoMaintenance, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT host_id, forget_cron, forget_enabled, + prune_cron, prune_enabled, + check_cron, check_enabled, check_subset_pct + FROM host_repo_maintenance`) + if err != nil { + return nil, fmt.Errorf("store: list all maintenance: %w", err) + } + defer func() { _ = rows.Close() }() + var out []HostRepoMaintenance + for rows.Next() { + var ( + m HostRepoMaintenance + forgetEnabled, pruneEnabled, checkEnabled int + ) + if err := rows.Scan(&m.HostID, + &m.ForgetCron, &forgetEnabled, + &m.PruneCron, &pruneEnabled, + &m.CheckCron, &checkEnabled, &m.CheckSubsetPct); err != nil { + return nil, fmt.Errorf("store: scan maintenance: %w", err) + } + m.ForgetEnabled = forgetEnabled != 0 + m.PruneEnabled = pruneEnabled != 0 + m.CheckEnabled = checkEnabled != 0 + out = append(out, m) + } + return out, rows.Err() +} + // UpdateRepoMaintenance replaces every editable field. Doesn't bump // the schedule version — these run on the server's own ticker, not // the agent's local cron, so the agent doesn't need to know.