maintenance: pure-logic ticker decides forget/prune/check fires

This commit is contained in:
2026-05-03 23:36:48 +01:00
parent 6f204a6877
commit ae96983877
5 changed files with 647 additions and 0 deletions
+67
View File
@@ -193,6 +193,73 @@ func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
return &j, nil
}
// LatestJobByKind returns the most recent terminal job (status in
// 'succeeded','failed','cancelled' — UK spelling matches the wire/DB
// literal, see api.JobCancelled) of the given kind for the host, or
// (nil, ErrNotFound) if no such job exists. Used by the maintenance
// ticker to compute "last fire" anchors for the cron-due check;
// queued and running jobs are excluded so an in-flight run doesn't
// suppress its own cron tick from firing. //nolint:misspell // wire format
func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, error) {
row := s.db.QueryRowContext(ctx,
`SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
started_at, finished_at, exit_code, stats, error, created_at
FROM jobs
WHERE host_id = ? AND kind = ?
AND status IN ('succeeded','failed','cancelled')
ORDER BY created_at DESC
LIMIT 1`, hostID, kind)
var (
j Job
schedID sql.NullString
actorID sql.NullString
startedAt sql.NullString
finishedAt sql.NullString
exitCode sql.NullInt64
stats sql.NullString
errMsg sql.NullString
createdAt string
)
if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID,
&j.ActorKind, &actorID, &startedAt, &finishedAt,
&exitCode, &stats, &errMsg, &createdAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("store: scan latest job by kind: %w", err)
}
if schedID.Valid {
s := schedID.String
j.ScheduledID = &s
}
if actorID.Valid {
s := actorID.String
j.ActorID = &s
}
if startedAt.Valid {
t, _ := time.Parse(time.RFC3339Nano, startedAt.String)
j.StartedAt = &t
}
if finishedAt.Valid {
t, _ := time.Parse(time.RFC3339Nano, finishedAt.String)
j.FinishedAt = &t
}
if exitCode.Valid {
i := int(exitCode.Int64)
j.ExitCode = &i
}
if stats.Valid && stats.String != "" {
j.Stats = json.RawMessage(stats.String)
}
if errMsg.Valid {
s := errMsg.String
j.Error = &s
}
t, _ := time.Parse(time.RFC3339Nano, createdAt)
j.CreatedAt = t
return &j, nil
}
// HasJobOfKind reports whether any job of the given kind exists for
// this host, regardless of status. Used by the auto-init path on
// agent hello to decide whether to dispatch a fresh `restic init` —
+115
View File
@@ -0,0 +1,115 @@
package store
import (
"context"
"errors"
"testing"
"time"
)
func TestLatestJobByKind(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostID := makeSchedHost(t, s)
// No jobs yet → ErrNotFound.
if _, err := s.LatestJobByKind(ctx, hostID, "forget"); !errors.Is(err, ErrNotFound) {
t.Fatalf("expected ErrNotFound on empty, got %v", err)
}
// Insert two finished jobs of kind=forget; the newer one should win.
older := time.Now().UTC().Add(-2 * time.Hour)
newer := time.Now().UTC().Add(-1 * time.Hour)
if err := s.CreateJob(ctx, Job{
ID: "j-old", HostID: hostID, Kind: "forget",
ActorKind: "system", CreatedAt: older,
}); err != nil {
t.Fatalf("create older: %v", err)
}
if err := s.MarkJobFinished(ctx, "j-old", "succeeded", 0, nil, "", older.Add(time.Minute)); err != nil {
t.Fatalf("finish older: %v", err)
}
if err := s.CreateJob(ctx, Job{
ID: "j-new", HostID: hostID, Kind: "forget",
ActorKind: "system", CreatedAt: newer,
}); err != nil {
t.Fatalf("create newer: %v", err)
}
if err := s.MarkJobFinished(ctx, "j-new", "failed", 1, nil, "boom", newer.Add(time.Minute)); err != nil {
t.Fatalf("finish newer: %v", err)
}
got, err := s.LatestJobByKind(ctx, hostID, "forget")
if err != nil {
t.Fatalf("LatestJobByKind: %v", err)
}
if got.ID != "j-new" {
t.Errorf("want j-new, got %q", got.ID)
}
// A queued job should be ignored — terminal-status filter.
queuedAt := time.Now().UTC()
if err := s.CreateJob(ctx, Job{
ID: "j-queued", HostID: hostID, Kind: "forget",
ActorKind: "system", CreatedAt: queuedAt,
}); err != nil {
t.Fatalf("create queued: %v", err)
}
got2, err := s.LatestJobByKind(ctx, hostID, "forget")
if err != nil {
t.Fatalf("LatestJobByKind 2: %v", err)
}
if got2.ID != "j-new" {
t.Errorf("queued job should be ignored; want j-new, got %q", got2.ID)
}
// Different kind → ErrNotFound.
if _, err := s.LatestJobByKind(ctx, hostID, "prune"); !errors.Is(err, ErrNotFound) {
t.Fatalf("expected ErrNotFound for prune, got %v", err)
}
}
func TestListAllMaintenance(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
// Empty case.
rows, err := s.ListAllMaintenance(ctx)
if err != nil {
t.Fatalf("empty list: %v", err)
}
if len(rows) != 0 {
t.Errorf("want empty, got %+v", rows)
}
// Seed two hosts with maintenance rows.
h1 := "01HMAINTHOST00000000000A1"
h2 := "01HMAINTHOST00000000000A2"
for i, id := range []string{h1, h2} {
if err := s.CreateHost(ctx, Host{
ID: id, Name: "maint-host-" + string(rune('a'+i)),
OS: "linux", Arch: "amd64",
AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1,
EnrolledAt: time.Now().UTC(),
}, "th-"+id, ""); err != nil {
t.Fatalf("create host %s: %v", id, err)
}
}
if err := s.CreateDefaultRepoMaintenance(ctx, h1); err != nil {
t.Fatalf("seed h1: %v", err)
}
if err := s.CreateDefaultRepoMaintenance(ctx, h2); err != nil {
t.Fatalf("seed h2: %v", err)
}
rows, err = s.ListAllMaintenance(ctx)
if err != nil {
t.Fatalf("list: %v", err)
}
if len(rows) != 2 {
t.Errorf("want 2 rows, got %d", len(rows))
}
}
+34
View File
@@ -50,6 +50,40 @@ func (st *Store) GetRepoMaintenance(ctx context.Context, hostID string) (*HostRe
return &m, nil
}
// ListAllMaintenance returns every host_repo_maintenance row.
// Used by the server-side maintenance ticker to iterate every
// host on each tick. Order is unspecified (the ticker doesn't
// care).
func (st *Store) ListAllMaintenance(ctx context.Context) ([]HostRepoMaintenance, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT host_id, forget_cron, forget_enabled,
prune_cron, prune_enabled,
check_cron, check_enabled, check_subset_pct
FROM host_repo_maintenance`)
if err != nil {
return nil, fmt.Errorf("store: list all maintenance: %w", err)
}
defer func() { _ = rows.Close() }()
var out []HostRepoMaintenance
for rows.Next() {
var (
m HostRepoMaintenance
forgetEnabled, pruneEnabled, checkEnabled int
)
if err := rows.Scan(&m.HostID,
&m.ForgetCron, &forgetEnabled,
&m.PruneCron, &pruneEnabled,
&m.CheckCron, &checkEnabled, &m.CheckSubsetPct); err != nil {
return nil, fmt.Errorf("store: scan maintenance: %w", err)
}
m.ForgetEnabled = forgetEnabled != 0
m.PruneEnabled = pruneEnabled != 0
m.CheckEnabled = checkEnabled != 0
out = append(out, m)
}
return out, rows.Err()
}
// UpdateRepoMaintenance replaces every editable field. Doesn't bump
// the schedule version — these run on the server's own ticker, not
// the agent's local cron, so the agent doesn't need to know.