From 9371b7b7773c83b8287e5c6c09b71de2ab62ec57 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 15 Jun 2026 21:45:01 +0100 Subject: [PATCH] fix(catchup): guard on real in-flight backup check; add scheduler tests --- internal/alert/engine.go | 3 + internal/server/http/catchup.go | 12 +- .../server/http/catchup_scheduler_test.go | 246 ++++++++++++++++++ internal/store/jobs.go | 16 ++ 4 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 internal/server/http/catchup_scheduler_test.go diff --git a/internal/alert/engine.go b/internal/alert/engine.go index 0f94b5a..ab6b27b 100644 --- a/internal/alert/engine.go +++ b/internal/alert/engine.go @@ -240,6 +240,9 @@ func (e *Engine) tick(ctx context.Context, now time.Time) { e.raiseAndNotify(ctx, h.ID, KindStaleSchedule, "", "warning", fmt.Sprintf("No backup in %s (threshold %s)", roundDur(now.Sub(*h.LastBackupAt)), staleBackupThreshold), now) + // Resolution is handled in handleJobFinished on a successful + // backup (and ResolveOnModeChange on toggle) — the tick only + // raises, it does not auto-resolve. continue } // Always-on hosts: existing agent_offline re-evaluation. diff --git a/internal/server/http/catchup.go b/internal/server/http/catchup.go index 9c67def..6bbaafa 100644 --- a/internal/server/http/catchup.go +++ b/internal/server/http/catchup.go @@ -89,8 +89,16 @@ func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) { if host.AlwaysOn { return // mode flipped during settle window } - if host.CurrentJobID != nil { - return // a job is already running; don't pile on + // Skip if a backup is already queued or running for this host — + // don't pile a catch-up on top of in-flight work. (hosts.current_job_id + // is not maintained, so we check the jobs table directly.) + active, err := s.deps.Store.HasActiveBackupJob(ctx, hostID) + if err != nil { + slog.Warn("catchup: check active backup", "host_id", hostID, "err", err) + return + } + if active { + return } schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) if err != nil { diff --git a/internal/server/http/catchup_scheduler_test.go b/internal/server/http/catchup_scheduler_test.go new file mode 100644 index 0000000..f2f13c3 --- /dev/null +++ b/internal/server/http/catchup_scheduler_test.go @@ -0,0 +1,246 @@ +// catchup_scheduler_test.go — integration tests for the catch-up scheduler. +package http + +import ( + "context" + "testing" + "time" + + "github.com/oklog/ulid/v2" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// TestRunCatchupDispatchesOverdue verifies four properties of the +// catch-up scheduler in separate sub-tests sharing no state. +func TestRunCatchupDispatchesOverdue(t *testing.T) { + t.Parallel() + + // --- 1. Overdue host with connected agent → backup dispatched ------- + t.Run("overdue_dispatch", func(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "catchup-overdue") + + if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil { + t.Fatalf("set always_on: %v", err) + } + // Last backup ~8 days ago → schedule overdue. + eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour) + if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil { + t.Fatalf("set last backup: %v", err) + } + + if err := st.CreateJob(context.Background(), store.Job{ + ID: ulid.Make().String(), HostID: hostID, Kind: "init", + ActorKind: "system", CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("seed init: %v", err) + } + + gid := ulid.Make().String() + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"}, + }); err != nil { + t.Fatalf("source group: %v", err) + } + sid := ulid.Make().String() + if err := st.CreateSchedule(context.Background(), &store.Schedule{ + ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true, + SourceGroupIDs: []string{gid}, + }); err != nil { + t.Fatalf("schedule: %v", err) + } + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "catchup-overdue") + _ = drainUntil(t, c, api.MsgScheduleSet) + + // Arm with a past time so the settle window is already elapsed. + srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute)) + srv.RunCatchupsDue(context.Background()) + + // Give the dispatch goroutine a moment to write the job row. + time.Sleep(100 * time.Millisecond) + + var n int + if err := st.DB().QueryRow( + `SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil { + t.Fatalf("count: %v", err) + } + if n < 1 { + t.Errorf("overdue host: want ≥1 backup job, got %d", n) + } + }) + + // --- 2. Not overdue → no dispatch ----------------------------------- + t.Run("not_overdue_no_dispatch", func(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "catchup-notoverdue") + + if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil { + t.Fatalf("set always_on: %v", err) + } + // Last backup just now → not overdue. + now := time.Now().UTC() + if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", now); err != nil { + t.Fatalf("set last backup: %v", err) + } + + if err := st.CreateJob(context.Background(), store.Job{ + ID: ulid.Make().String(), HostID: hostID, Kind: "init", + ActorKind: "system", CreatedAt: now, + }); err != nil { + t.Fatalf("seed init: %v", err) + } + + gid := ulid.Make().String() + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"}, + }); err != nil { + t.Fatalf("source group: %v", err) + } + sid := ulid.Make().String() + if err := st.CreateSchedule(context.Background(), &store.Schedule{ + ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true, + SourceGroupIDs: []string{gid}, + }); err != nil { + t.Fatalf("schedule: %v", err) + } + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "catchup-notoverdue") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute)) + srv.RunCatchupsDue(context.Background()) + + time.Sleep(100 * time.Millisecond) + + var n int + if err := st.DB().QueryRow( + `SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil { + t.Fatalf("count: %v", err) + } + if n != 0 { + t.Errorf("not-overdue host: want 0 backup jobs, got %d", n) + } + }) + + // --- 3. Active backup in flight → no new dispatch ------------------- + t.Run("active_backup_blocks_dispatch", func(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "catchup-active") + + if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil { + t.Fatalf("set always_on: %v", err) + } + eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour) + if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil { + t.Fatalf("set last backup: %v", err) + } + + if err := st.CreateJob(context.Background(), store.Job{ + ID: ulid.Make().String(), HostID: hostID, Kind: "init", + ActorKind: "system", CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("seed init: %v", err) + } + + gid := ulid.Make().String() + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"}, + }); err != nil { + t.Fatalf("source group: %v", err) + } + sid := ulid.Make().String() + if err := st.CreateSchedule(context.Background(), &store.Schedule{ + ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true, + SourceGroupIDs: []string{gid}, + }); err != nil { + t.Fatalf("schedule: %v", err) + } + + // Seed a queued backup job — this is "already in flight". + if err := st.CreateJob(context.Background(), store.Job{ + ID: ulid.Make().String(), HostID: hostID, Kind: "backup", + ActorKind: "schedule", CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("seed queued backup: %v", err) + } + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "catchup-active") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute)) + srv.RunCatchupsDue(context.Background()) + + time.Sleep(100 * time.Millisecond) + + var n int + if err := st.DB().QueryRow( + `SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil { + t.Fatalf("count: %v", err) + } + // Count must still be exactly 1 — no second job added. + if n != 1 { + t.Errorf("active backup guard: want 1 job (the seeded one), got %d", n) + } + }) + + // --- 4. Disconnected host → no dispatch ----------------------------- + t.Run("disconnected_no_dispatch", func(t *testing.T) { + t.Parallel() + srv, _, st := rawTestServer(t) + hostID, _ := enrolHostForWS(t, srv, st, "catchup-disconnected") + + if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil { + t.Fatalf("set always_on: %v", err) + } + eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour) + if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil { + t.Fatalf("set last backup: %v", err) + } + + if err := st.CreateJob(context.Background(), store.Job{ + ID: ulid.Make().String(), HostID: hostID, Kind: "init", + ActorKind: "system", CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("seed init: %v", err) + } + + gid := ulid.Make().String() + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"}, + }); err != nil { + t.Fatalf("source group: %v", err) + } + sid := ulid.Make().String() + if err := st.CreateSchedule(context.Background(), &store.Schedule{ + ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true, + SourceGroupIDs: []string{gid}, + }); err != nil { + t.Fatalf("schedule: %v", err) + } + + // Host is NOT connected — no agentDial. + + srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute)) + srv.RunCatchupsDue(context.Background()) + + time.Sleep(100 * time.Millisecond) + + var n int + if err := st.DB().QueryRow( + `SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil { + t.Fatalf("count: %v", err) + } + if n != 0 { + t.Errorf("disconnected host: want 0 backup jobs, got %d", n) + } + }) +} diff --git a/internal/store/jobs.go b/internal/store/jobs.go index 8493e5d..15b71e2 100644 --- a/internal/store/jobs.go +++ b/internal/store/jobs.go @@ -270,6 +270,22 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job, return &j, nil } +// HasActiveBackupJob reports whether the host has a backup job that is +// still queued or running. The catch-up scheduler uses this to avoid +// dispatching a duplicate backup alongside one already in flight +// (hosts.current_job_id is not maintained, so this is the authoritative +// in-flight check). +func (s *Store) HasActiveBackupJob(ctx context.Context, hostID string) (bool, error) { + var exists bool + err := s.db.QueryRowContext(ctx, + `SELECT EXISTS(SELECT 1 FROM jobs WHERE host_id = ? AND kind = 'backup' AND status IN ('queued','running'))`, + hostID).Scan(&exists) + if err != nil { + return false, fmt.Errorf("store: has active backup job: %w", err) + } + return exists, nil +} + // HasJobOfKind reports whether any job of the given kind exists for // this host, regardless of status. Used by the auto-init path on // agent hello to decide whether to dispatch a fresh `restic init` —