fix(catchup): guard on real in-flight backup check; add scheduler tests
This commit is contained in:
@@ -240,6 +240,9 @@ func (e *Engine) tick(ctx context.Context, now time.Time) {
|
|||||||
e.raiseAndNotify(ctx, h.ID, KindStaleSchedule, "", "warning",
|
e.raiseAndNotify(ctx, h.ID, KindStaleSchedule, "", "warning",
|
||||||
fmt.Sprintf("No backup in %s (threshold %s)",
|
fmt.Sprintf("No backup in %s (threshold %s)",
|
||||||
roundDur(now.Sub(*h.LastBackupAt)), staleBackupThreshold), now)
|
roundDur(now.Sub(*h.LastBackupAt)), staleBackupThreshold), now)
|
||||||
|
// Resolution is handled in handleJobFinished on a successful
|
||||||
|
// backup (and ResolveOnModeChange on toggle) — the tick only
|
||||||
|
// raises, it does not auto-resolve.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Always-on hosts: existing agent_offline re-evaluation.
|
// Always-on hosts: existing agent_offline re-evaluation.
|
||||||
|
|||||||
@@ -89,8 +89,16 @@ func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) {
|
|||||||
if host.AlwaysOn {
|
if host.AlwaysOn {
|
||||||
return // mode flipped during settle window
|
return // mode flipped during settle window
|
||||||
}
|
}
|
||||||
if host.CurrentJobID != nil {
|
// Skip if a backup is already queued or running for this host —
|
||||||
return // a job is already running; don't pile on
|
// don't pile a catch-up on top of in-flight work. (hosts.current_job_id
|
||||||
|
// is not maintained, so we check the jobs table directly.)
|
||||||
|
active, err := s.deps.Store.HasActiveBackupJob(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("catchup: check active backup", "host_id", hostID, "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if active {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
|
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -0,0 +1,246 @@
|
|||||||
|
// catchup_scheduler_test.go — integration tests for the catch-up scheduler.
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid/v2"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestRunCatchupDispatchesOverdue verifies four properties of the
|
||||||
|
// catch-up scheduler in separate sub-tests sharing no state.
|
||||||
|
func TestRunCatchupDispatchesOverdue(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// --- 1. Overdue host with connected agent → backup dispatched -------
|
||||||
|
t.Run("overdue_dispatch", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "catchup-overdue")
|
||||||
|
|
||||||
|
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
|
||||||
|
t.Fatalf("set always_on: %v", err)
|
||||||
|
}
|
||||||
|
// Last backup ~8 days ago → schedule overdue.
|
||||||
|
eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour)
|
||||||
|
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil {
|
||||||
|
t.Fatalf("set last backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := st.CreateJob(context.Background(), store.Job{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
|
||||||
|
ActorKind: "system", CreatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed init: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gid := ulid.Make().String()
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("source group: %v", err)
|
||||||
|
}
|
||||||
|
sid := ulid.Make().String()
|
||||||
|
if err := st.CreateSchedule(context.Background(), &store.Schedule{
|
||||||
|
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
|
||||||
|
SourceGroupIDs: []string{gid},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("schedule: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "catchup-overdue")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
// Arm with a past time so the settle window is already elapsed.
|
||||||
|
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
|
||||||
|
srv.RunCatchupsDue(context.Background())
|
||||||
|
|
||||||
|
// Give the dispatch goroutine a moment to write the job row.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
|
||||||
|
t.Fatalf("count: %v", err)
|
||||||
|
}
|
||||||
|
if n < 1 {
|
||||||
|
t.Errorf("overdue host: want ≥1 backup job, got %d", n)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// --- 2. Not overdue → no dispatch -----------------------------------
|
||||||
|
t.Run("not_overdue_no_dispatch", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "catchup-notoverdue")
|
||||||
|
|
||||||
|
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
|
||||||
|
t.Fatalf("set always_on: %v", err)
|
||||||
|
}
|
||||||
|
// Last backup just now → not overdue.
|
||||||
|
now := time.Now().UTC()
|
||||||
|
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", now); err != nil {
|
||||||
|
t.Fatalf("set last backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := st.CreateJob(context.Background(), store.Job{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
|
||||||
|
ActorKind: "system", CreatedAt: now,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed init: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gid := ulid.Make().String()
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("source group: %v", err)
|
||||||
|
}
|
||||||
|
sid := ulid.Make().String()
|
||||||
|
if err := st.CreateSchedule(context.Background(), &store.Schedule{
|
||||||
|
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
|
||||||
|
SourceGroupIDs: []string{gid},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("schedule: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "catchup-notoverdue")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
|
||||||
|
srv.RunCatchupsDue(context.Background())
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
|
||||||
|
t.Fatalf("count: %v", err)
|
||||||
|
}
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("not-overdue host: want 0 backup jobs, got %d", n)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// --- 3. Active backup in flight → no new dispatch -------------------
|
||||||
|
t.Run("active_backup_blocks_dispatch", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "catchup-active")
|
||||||
|
|
||||||
|
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
|
||||||
|
t.Fatalf("set always_on: %v", err)
|
||||||
|
}
|
||||||
|
eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour)
|
||||||
|
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil {
|
||||||
|
t.Fatalf("set last backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := st.CreateJob(context.Background(), store.Job{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
|
||||||
|
ActorKind: "system", CreatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed init: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gid := ulid.Make().String()
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("source group: %v", err)
|
||||||
|
}
|
||||||
|
sid := ulid.Make().String()
|
||||||
|
if err := st.CreateSchedule(context.Background(), &store.Schedule{
|
||||||
|
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
|
||||||
|
SourceGroupIDs: []string{gid},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("schedule: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seed a queued backup job — this is "already in flight".
|
||||||
|
if err := st.CreateJob(context.Background(), store.Job{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Kind: "backup",
|
||||||
|
ActorKind: "schedule", CreatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed queued backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "catchup-active")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
|
||||||
|
srv.RunCatchupsDue(context.Background())
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
|
||||||
|
t.Fatalf("count: %v", err)
|
||||||
|
}
|
||||||
|
// Count must still be exactly 1 — no second job added.
|
||||||
|
if n != 1 {
|
||||||
|
t.Errorf("active backup guard: want 1 job (the seeded one), got %d", n)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// --- 4. Disconnected host → no dispatch -----------------------------
|
||||||
|
t.Run("disconnected_no_dispatch", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, _, st := rawTestServer(t)
|
||||||
|
hostID, _ := enrolHostForWS(t, srv, st, "catchup-disconnected")
|
||||||
|
|
||||||
|
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
|
||||||
|
t.Fatalf("set always_on: %v", err)
|
||||||
|
}
|
||||||
|
eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour)
|
||||||
|
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil {
|
||||||
|
t.Fatalf("set last backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := st.CreateJob(context.Background(), store.Job{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
|
||||||
|
ActorKind: "system", CreatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed init: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gid := ulid.Make().String()
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("source group: %v", err)
|
||||||
|
}
|
||||||
|
sid := ulid.Make().String()
|
||||||
|
if err := st.CreateSchedule(context.Background(), &store.Schedule{
|
||||||
|
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
|
||||||
|
SourceGroupIDs: []string{gid},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("schedule: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Host is NOT connected — no agentDial.
|
||||||
|
|
||||||
|
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
|
||||||
|
srv.RunCatchupsDue(context.Background())
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := st.DB().QueryRow(
|
||||||
|
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
|
||||||
|
t.Fatalf("count: %v", err)
|
||||||
|
}
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("disconnected host: want 0 backup jobs, got %d", n)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -270,6 +270,22 @@ func (s *Store) LatestJobByKind(ctx context.Context, hostID, kind string) (*Job,
|
|||||||
return &j, nil
|
return &j, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasActiveBackupJob reports whether the host has a backup job that is
|
||||||
|
// still queued or running. The catch-up scheduler uses this to avoid
|
||||||
|
// dispatching a duplicate backup alongside one already in flight
|
||||||
|
// (hosts.current_job_id is not maintained, so this is the authoritative
|
||||||
|
// in-flight check).
|
||||||
|
func (s *Store) HasActiveBackupJob(ctx context.Context, hostID string) (bool, error) {
|
||||||
|
var exists bool
|
||||||
|
err := s.db.QueryRowContext(ctx,
|
||||||
|
`SELECT EXISTS(SELECT 1 FROM jobs WHERE host_id = ? AND kind = 'backup' AND status IN ('queued','running'))`,
|
||||||
|
hostID).Scan(&exists)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("store: has active backup job: %w", err)
|
||||||
|
}
|
||||||
|
return exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
// HasJobOfKind reports whether any job of the given kind exists for
|
// HasJobOfKind reports whether any job of the given kind exists for
|
||||||
// this host, regardless of status. Used by the auto-init path on
|
// this host, regardless of status. Used by the auto-init path on
|
||||||
// agent hello to decide whether to dispatch a fresh `restic init` —
|
// agent hello to decide whether to dispatch a fresh `restic init` —
|
||||||
|
|||||||
Reference in New Issue
Block a user