Files
restic-manager/internal/server/http/pending_drain_test.go
T
steve 3e337dfb3c server: drain pending_runs on tick + on agent reconnect
Two trigger paths land here:

- A 30s ticker in cmd/server calls Server.DrainAllDue(ctx). It
  walks pending_runs rows whose next_attempt_at <= now, dedupes by
  host, skips offline hosts, and per online host runs DrainPending.

- onAgentHello spawns a background DrainPending(hostID). When a
  host comes back, every pending row for it is dispatchable now —
  due-ness becomes irrelevant once the wire is back.

Each row's schedule + group are reloaded; ErrNotFound or
disabled-schedule or gone-group abandons the row with a
pending_run.abandoned audit. attempt >= retry_max also abandons.
Otherwise dispatchBackupForGroup is invoked; success deletes the
row, failure bumps attempt with exponential backoff capped at
30m.
2026-05-04 10:19:15 +01:00

420 lines
14 KiB
Go

// pending_drain_test.go — covers DrainPending / DrainAllDue and the
// onAgentHello goroutine spawn that drains a freshly-reconnected
// host's queue.
package http
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/coder/websocket"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// seedSchedAndGroup wires up a host with one source group + one
// schedule pointing at it. Returns (groupID, scheduleID).
func seedSchedAndGroup(t *testing.T, st *store.Store, hostID string, retryMax int) (string, string) {
t.Helper()
gid := ulid.Make().String()
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
ID: gid, HostID: hostID, Name: "default",
Includes: []string{"/etc"},
RetryMax: retryMax, RetryBackoffSeconds: 60,
}); err != nil {
t.Fatalf("create group: %v", err)
}
sid := ulid.Make().String()
if err := st.CreateSchedule(context.Background(), &store.Schedule{
ID: sid, HostID: hostID,
CronExpr: "0 3 * * *", Enabled: true,
SourceGroupIDs: []string{gid},
}); err != nil {
t.Fatalf("create schedule: %v", err)
}
// Mark a successful init job so auto-init doesn't pollute reads.
if err := st.CreateJob(context.Background(), store.Job{
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
ActorKind: "system", CreatedAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("seed init: %v", err)
}
return gid, sid
}
// countPendingForHost returns the number of pending_runs rows for hostID.
func countPendingForHost(t *testing.T, st *store.Store, hostID string) int {
t.Helper()
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM pending_runs WHERE host_id = ?`, hostID).Scan(&n); err != nil {
t.Fatalf("count pending: %v", err)
}
return n
}
// countAuditAction returns the number of audit_log rows with the given action.
func countAuditAction(t *testing.T, st *store.Store, action string) int {
t.Helper()
var n int
if err := st.DB().QueryRow(
`SELECT COUNT(*) FROM audit_log WHERE action = ?`, action).Scan(&n); err != nil {
t.Fatalf("count audit: %v", err)
}
return n
}
func TestDrainPendingDispatchesOnReconnect(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "drain-host")
gid, sid := seedSchedAndGroup(t, st, hostID, 5)
// Pre-insert a pending row that's already due. The on-hello
// goroutine should drain it after we connect.
pendingID := ulid.Make().String()
now := time.Now().UTC()
if err := st.EnqueuePendingRun(context.Background(), &store.PendingRun{
ID: pendingID, ScheduleID: sid, SourceGroupID: gid, HostID: hostID,
Attempt: 1, NextAttemptAt: now.Add(-time.Second),
ScheduledAt: now.Add(-time.Minute),
}); err != nil {
t.Fatalf("enqueue: %v", err)
}
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "drain-host")
// Walk envelopes looking for a backup command.run carrying the
// group's includes.
var got *api.CommandRunPayload
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
mt, raw, err := c.Read(ctx)
cancel()
if err != nil {
break
}
if mt != websocket.MessageText {
continue
}
var env api.Envelope
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
if env.Type != api.MsgCommandRun {
continue
}
var p api.CommandRunPayload
_ = env.UnmarshalPayload(&p)
if p.Kind == api.JobBackup {
got = &p
break
}
}
if got == nil {
t.Fatalf("no backup command.run dispatched after reconnect drain")
}
if !equalStrings(got.Includes, []string{"/etc"}) {
t.Errorf("backup includes: %v", got.Includes)
}
if got.Tag != "default" {
t.Errorf("backup tag: %q", got.Tag)
}
// Pending row should be gone.
if n := countPendingForHost(t, st, hostID); n != 0 {
t.Errorf("pending rows after drain: got %d, want 0", n)
}
// One backup job row landed (in addition to the seeded init).
var n int
_ = st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup' AND actor_kind = 'schedule'`,
hostID).Scan(&n)
if n != 1 {
t.Errorf("backup job rows: got %d, want 1", n)
}
}
func TestDrainPendingAbandonsOnRetryMax(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "abandon-retry-host")
gid, sid := seedSchedAndGroup(t, st, hostID, 2)
pendingID := ulid.Make().String()
now := time.Now().UTC()
if err := st.EnqueuePendingRun(context.Background(), &store.PendingRun{
ID: pendingID, ScheduleID: sid, SourceGroupID: gid, HostID: hostID,
Attempt: 2, NextAttemptAt: now.Add(-time.Second),
ScheduledAt: now.Add(-time.Minute),
}); err != nil {
t.Fatalf("enqueue: %v", err)
}
auditBefore := countAuditAction(t, st, "pending_run.abandoned")
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "abandon-retry-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
// Call DrainPending directly — gives us deterministic completion.
conn := connFromHub(t, srv, hostID)
_ = conn // just to ensure conn was registered
srv.DrainPending(context.Background(), hostID)
if n := countPendingForHost(t, st, hostID); n != 0 {
t.Errorf("pending rows after abandon: got %d, want 0", n)
}
if d := countAuditAction(t, st, "pending_run.abandoned") - auditBefore; d != 1 {
t.Errorf("audit pending_run.abandoned delta: got %d, want 1", d)
}
// No backup command.run should have been sent.
deadline := time.Now().Add(400 * time.Millisecond)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
mt, raw, err := c.Read(ctx)
cancel()
if err != nil {
break
}
if mt != websocket.MessageText {
continue
}
var env api.Envelope
_ = json.Unmarshal(raw, &env)
if env.Type == api.MsgCommandRun {
var p api.CommandRunPayload
_ = env.UnmarshalPayload(&p)
if p.Kind == api.JobBackup {
t.Fatalf("abandoned row still dispatched a backup: %+v", p)
}
}
}
// No backup job row.
var n int
_ = st.DB().QueryRow(
`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup'`,
hostID).Scan(&n)
if n != 0 {
t.Errorf("abandon path created a backup job: %d rows", n)
}
}
func TestDrainPendingBumpsOnSendFailure(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "bump-host")
gid, sid := seedSchedAndGroup(t, st, hostID, 5)
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "bump-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
// Capture the conn before closing the client side. Hub.Conn still
// returns it after the client-side close — the server's Unregister
// fires when its read loop sees the close, but the conn ptr remains
// valid; subsequent Sends just fail.
conn := connFromHub(t, srv, hostID)
if conn == nil {
t.Fatal("conn never registered")
}
// Insert the pending row AFTER the on-hello drain goroutine has
// already scanned (an empty list) — otherwise we race the on-hello
// drain dispatching the row over the still-live socket.
pendingID := ulid.Make().String()
now := time.Now().UTC()
if err := c.Close(websocket.StatusNormalClosure, "test"); err != nil {
t.Fatalf("close: %v", err)
}
// Brief settle so the close is observed by the server's read loop.
time.Sleep(150 * time.Millisecond)
if err := st.EnqueuePendingRun(context.Background(), &store.PendingRun{
ID: pendingID, ScheduleID: sid, SourceGroupID: gid, HostID: hostID,
Attempt: 1, NextAttemptAt: now.Add(-time.Second),
ScheduledAt: now.Add(-time.Minute),
}); err != nil {
t.Fatalf("enqueue: %v", err)
}
// DrainPending uses Hub.Conn(hostID); after the client close the
// server may have unregistered already. Call drainOne directly
// against the captured conn so we deterministically exercise the
// "Send fails" branch rather than the "host gone" branch.
srv.drainOne(context.Background(), conn, store.PendingRun{
ID: pendingID, ScheduleID: sid, SourceGroupID: gid, HostID: hostID,
Attempt: 1, NextAttemptAt: now.Add(-time.Second), ScheduledAt: now.Add(-time.Minute),
})
// The original row was bumped (attempt=2) — the G1.1 path may have
// also enqueued a duplicate row from inside dispatchBackupForGroup's
// failed Send. So we expect exactly the original row updated, plus
// possibly one duplicate. Either way: pending count >= 1, no row
// deleted, and the original row's attempt bumped to 2.
var attempt int
var lastErr string
if err := st.DB().QueryRow(
`SELECT attempt, COALESCE(last_error,'') FROM pending_runs WHERE id = ?`,
pendingID).Scan(&attempt, &lastErr); err != nil {
t.Fatalf("scan original row: %v", err)
}
if attempt != 2 {
t.Errorf("attempt after bump: got %d, want 2", attempt)
}
if lastErr == "" {
t.Errorf("last_error empty after bump")
}
// No successful backup job persisted via DrainPending.
// (dispatchBackupForGroup *does* create a job row before attempting
// the send and leaves it on send-failure; that row exists. The
// "successful job" we care about would be one that wasn't followed
// by an enqueue — there aren't any here. Asserting on the bump is
// the cleaner signal.)
}
func TestDrainPendingDropsRowsForGoneSchedule(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "gone-sched-host")
gid, sid := seedSchedAndGroup(t, st, hostID, 5)
pendingID := ulid.Make().String()
now := time.Now().UTC()
if err := st.EnqueuePendingRun(context.Background(), &store.PendingRun{
ID: pendingID, ScheduleID: sid, SourceGroupID: gid, HostID: hostID,
Attempt: 1, NextAttemptAt: now.Add(-time.Second),
ScheduledAt: now.Add(-time.Minute),
}); err != nil {
t.Fatalf("enqueue: %v", err)
}
// Disable the schedule. (Deleting it would FK-cascade-delete the
// pending_runs row out from under the drainer, which is fine for
// production but defeats the point of the test. The
// disabled-schedule path goes through the same abandonPending code,
// so it's an equivalent assertion.)
if _, err := st.DB().Exec(
`UPDATE schedules SET enabled = 0 WHERE id = ?`, sid); err != nil {
t.Fatalf("disable schedule: %v", err)
}
auditBefore := countAuditAction(t, st, "pending_run.abandoned")
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "gone-sched-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
srv.DrainPending(context.Background(), hostID)
if n := countPendingForHost(t, st, hostID); n != 0 {
t.Errorf("pending rows after schedule-gone abandon: got %d, want 0", n)
}
if d := countAuditAction(t, st, "pending_run.abandoned") - auditBefore; d != 1 {
t.Errorf("audit delta: got %d, want 1", d)
}
// Drain produced no backup envelope.
deadline := time.Now().Add(400 * time.Millisecond)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
mt, raw, err := c.Read(ctx)
cancel()
if err != nil {
break
}
if mt != websocket.MessageText {
continue
}
var env api.Envelope
_ = json.Unmarshal(raw, &env)
if env.Type == api.MsgCommandRun {
var p api.CommandRunPayload
_ = env.UnmarshalPayload(&p)
if p.Kind == api.JobBackup {
t.Fatalf("gone-schedule abandon still dispatched: %+v", p)
}
}
}
}
func TestDrainAllDueSkipsOfflineHosts(t *testing.T) {
t.Parallel()
srv, _, st := rawTestServer(t)
// Don't dial — host is enrolled but never connected.
hostID, _ := enrolHostForWS(t, srv, st, "offline-host")
gid, sid := seedSchedAndGroup(t, st, hostID, 5)
pendingID := ulid.Make().String()
now := time.Now().UTC()
if err := st.EnqueuePendingRun(context.Background(), &store.PendingRun{
ID: pendingID, ScheduleID: sid, SourceGroupID: gid, HostID: hostID,
Attempt: 1, NextAttemptAt: now.Add(-time.Second),
ScheduledAt: now.Add(-time.Minute),
}); err != nil {
t.Fatalf("enqueue: %v", err)
}
auditBefore := countAuditAction(t, st, "pending_run.abandoned")
srv.DrainAllDue(context.Background())
// Row still there (host offline, drainer skips).
if n := countPendingForHost(t, st, hostID); n != 1 {
t.Errorf("pending rows after DrainAllDue against offline host: got %d, want 1", n)
}
if d := countAuditAction(t, st, "pending_run.abandoned") - auditBefore; d != 0 {
t.Errorf("audit unexpectedly changed: delta %d", d)
}
}
func TestEnqueueOnDispatchFailure(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "enqueue-host")
_, sid := seedSchedAndGroup(t, st, hostID, 5)
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "enqueue-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
conn := connFromHub(t, srv, hostID)
_ = conn
// Close the client side so the server's next Send errors.
if err := c.Close(websocket.StatusNormalClosure, "test"); err != nil {
t.Fatalf("close: %v", err)
}
time.Sleep(100 * time.Millisecond)
scheduledAt := time.Now().UTC().Add(-30 * time.Second)
srv.dispatchScheduledJob(context.Background(), hostID, conn, sid, scheduledAt)
// One pending row should have been enqueued (attempt=1) with the
// scheduled_at preserved.
rows, err := st.ListPendingRunsForHost(context.Background(), hostID)
if err != nil {
t.Fatalf("list: %v", err)
}
if len(rows) != 1 {
t.Fatalf("pending rows: got %d, want 1", len(rows))
}
if rows[0].Attempt != 1 {
t.Errorf("attempt: got %d, want 1", rows[0].Attempt)
}
// scheduled_at preserved (within RFC3339Nano round-trip tolerance).
if rows[0].ScheduledAt.Sub(scheduledAt).Abs() > time.Microsecond {
t.Errorf("scheduled_at drift: %v vs %v", rows[0].ScheduledAt, scheduledAt)
}
if rows[0].LastError == "" {
t.Errorf("last_error empty")
}
}