Files
restic-manager/internal/server/http/catchup_scheduler_test.go
T

247 lines
8.1 KiB
Go

// catchup_scheduler_test.go — integration tests for the catch-up scheduler.
package http
import (
"context"
"testing"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// TestRunCatchupDispatchesOverdue verifies four properties of the
// catch-up scheduler in separate sub-tests sharing no state.
func TestRunCatchupDispatchesOverdue(t *testing.T) {
t.Parallel()
// --- 1. Overdue host with connected agent → backup dispatched -------
t.Run("overdue_dispatch", func(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "catchup-overdue")
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
t.Fatalf("set always_on: %v", err)
}
// Last backup ~8 days ago → schedule overdue.
eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour)
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil {
t.Fatalf("set last backup: %v", err)
}
if err := st.CreateJob(context.Background(), store.Job{
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
ActorKind: "system", CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("seed init: %v", err)
}
gid := ulid.Make().String()
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
}); err != nil {
t.Fatalf("source group: %v", err)
}
sid := ulid.Make().String()
if err := st.CreateSchedule(context.Background(), &store.Schedule{
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
SourceGroupIDs: []string{gid},
}); err != nil {
t.Fatalf("schedule: %v", err)
}
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "catchup-overdue")
_ = drainUntil(t, c, api.MsgScheduleSet)
// Arm with a past time so the settle window is already elapsed.
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
srv.RunCatchupsDue(context.Background())
// Give the dispatch goroutine a moment to write the job row.
time.Sleep(100 * time.Millisecond)
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
if n < 1 {
t.Errorf("overdue host: want ≥1 backup job, got %d", n)
}
})
// --- 2. Not overdue → no dispatch -----------------------------------
t.Run("not_overdue_no_dispatch", func(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "catchup-notoverdue")
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
t.Fatalf("set always_on: %v", err)
}
// Last backup just now → not overdue.
now := time.Now().UTC()
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", now); err != nil {
t.Fatalf("set last backup: %v", err)
}
if err := st.CreateJob(context.Background(), store.Job{
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
ActorKind: "system", CreatedAt: now,
}); err != nil {
t.Fatalf("seed init: %v", err)
}
gid := ulid.Make().String()
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
}); err != nil {
t.Fatalf("source group: %v", err)
}
sid := ulid.Make().String()
if err := st.CreateSchedule(context.Background(), &store.Schedule{
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
SourceGroupIDs: []string{gid},
}); err != nil {
t.Fatalf("schedule: %v", err)
}
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "catchup-notoverdue")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
srv.RunCatchupsDue(context.Background())
time.Sleep(100 * time.Millisecond)
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
if n != 0 {
t.Errorf("not-overdue host: want 0 backup jobs, got %d", n)
}
})
// --- 3. Active backup in flight → no new dispatch -------------------
t.Run("active_backup_blocks_dispatch", func(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "catchup-active")
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
t.Fatalf("set always_on: %v", err)
}
eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour)
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil {
t.Fatalf("set last backup: %v", err)
}
if err := st.CreateJob(context.Background(), store.Job{
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
ActorKind: "system", CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("seed init: %v", err)
}
gid := ulid.Make().String()
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
}); err != nil {
t.Fatalf("source group: %v", err)
}
sid := ulid.Make().String()
if err := st.CreateSchedule(context.Background(), &store.Schedule{
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
SourceGroupIDs: []string{gid},
}); err != nil {
t.Fatalf("schedule: %v", err)
}
// Seed a queued backup job — this is "already in flight".
if err := st.CreateJob(context.Background(), store.Job{
ID: ulid.Make().String(), HostID: hostID, Kind: "backup",
ActorKind: "schedule", CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("seed queued backup: %v", err)
}
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "catchup-active")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
srv.RunCatchupsDue(context.Background())
time.Sleep(100 * time.Millisecond)
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
// Count must still be exactly 1 — no second job added.
if n != 1 {
t.Errorf("active backup guard: want 1 job (the seeded one), got %d", n)
}
})
// --- 4. Disconnected host → no dispatch -----------------------------
t.Run("disconnected_no_dispatch", func(t *testing.T) {
t.Parallel()
srv, _, st := rawTestServer(t)
hostID, _ := enrolHostForWS(t, srv, st, "catchup-disconnected")
if err := st.SetHostAlwaysOn(context.Background(), hostID, false); err != nil {
t.Fatalf("set always_on: %v", err)
}
eightDaysAgo := time.Now().UTC().Add(-8 * 24 * time.Hour)
if err := st.SetHostLastBackup(context.Background(), hostID, "succeeded", eightDaysAgo); err != nil {
t.Fatalf("set last backup: %v", err)
}
if err := st.CreateJob(context.Background(), store.Job{
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
ActorKind: "system", CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("seed init: %v", err)
}
gid := ulid.Make().String()
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: gid, HostID: hostID, Name: "home", Includes: []string{"/home"},
}); err != nil {
t.Fatalf("source group: %v", err)
}
sid := ulid.Make().String()
if err := st.CreateSchedule(context.Background(), &store.Schedule{
ID: sid, HostID: hostID, CronExpr: "0 2 * * *", Enabled: true,
SourceGroupIDs: []string{gid},
}); err != nil {
t.Fatalf("schedule: %v", err)
}
// Host is NOT connected — no agentDial.
srv.ArmCatchup(hostID, time.Now().UTC().Add(-2*time.Minute))
srv.RunCatchupsDue(context.Background())
time.Sleep(100 * time.Millisecond)
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`, hostID).Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
if n != 0 {
t.Errorf("disconnected host: want 0 backup jobs, got %d", n)
}
})
}