From e0eae0a96fa70887d1f55b15516bef4b81211d28 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 4 May 2026 00:07:33 +0100 Subject: [PATCH] server: drainer abandons only on ErrNotFound, not transient errors GetSourceGroup errors in drainOne now gate on errors.Is(err, store.ErrNotFound) before calling abandonPending, mirroring the existing GetSchedule pattern. Transient errors (SQLITE_BUSY, context cancellation) now log a warning and return without deleting the row. Add regression test TestDrainPendingDropsRowsForGoneSourceGroup confirming the ErrNotFound path still abandons correctly. Also add a comment above the backoff-doubling loop explaining the progression. --- internal/server/http/pending_drain.go | 11 ++++- internal/server/http/pending_drain_test.go | 56 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/internal/server/http/pending_drain.go b/internal/server/http/pending_drain.go index 04dcd60..5433794 100644 --- a/internal/server/http/pending_drain.go +++ b/internal/server/http/pending_drain.go @@ -74,7 +74,12 @@ func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun } g, err := s.deps.Store.GetSourceGroup(ctx, p.HostID, p.SourceGroupID) if err != nil { - s.abandonPending(ctx, p, "source group gone") + if errors.Is(err, store.ErrNotFound) { + s.abandonPending(ctx, p, "source group gone") + } else { + slog.Warn("drain pending: load source group", + "host_id", p.HostID, "group_id", p.SourceGroupID, "err", err) + } return } if g.RetryMax > 0 && p.Attempt >= g.RetryMax { @@ -90,6 +95,10 @@ func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun jobID, _ := s.dispatchBackupForGroupCore(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt) if jobID == "" { // Send failed again. Bump attempt with exponential backoff. + // Exponential backoff doubles immediately on the first drain + // retry: enqueue at base, attempt=1 → drain → 2*base, attempt=2 → + // drain → 4*base, etc. Capped at pendingDrainBackoffMax. With + // defaults (60s base, retry_max=3) the schedule is 60→120→240s. baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second if baseBackoff <= 0 { baseBackoff = 60 * time.Second diff --git a/internal/server/http/pending_drain_test.go b/internal/server/http/pending_drain_test.go index 488bcfc..12c435b 100644 --- a/internal/server/http/pending_drain_test.go +++ b/internal/server/http/pending_drain_test.go @@ -341,6 +341,62 @@ func TestDrainPendingDropsRowsForGoneSchedule(t *testing.T) { } } +// TestDrainPendingDropsRowsForGoneSourceGroup verifies that when a +// source group is gone (ErrNotFound) the pending row is abandoned and +// an audit entry is written. Transient-error paths (SQLITE_BUSY, +// context cancellation) are not covered here because the real *Store +// doesn't expose a fault-injection seam; the code-review check above +// is the gate for that path. +func TestDrainPendingDropsRowsForGoneSourceGroup(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "gone-group-host") + _, sid := seedSchedAndGroup(t, st, hostID, 5) + + // Use a source_group_id that never existed. pending_runs carries a + // FK to source_groups, so we must bypass FK enforcement for this + // insert. PRAGMA foreign_keys is connection-scoped and can only be + // changed outside a transaction; DB().Exec runs on an arbitrary + // pooled connection, so we pin it with a dedicated *sql.Conn. + fakeGroupID := ulid.Make().String() + pendingID := ulid.Make().String() + now := time.Now().UTC() + conn, err := st.DB().Conn(context.Background()) + if err != nil { + t.Fatalf("db conn: %v", err) + } + defer conn.Close() + if _, err := conn.ExecContext(context.Background(), `PRAGMA foreign_keys = OFF`); err != nil { + t.Fatalf("fk off: %v", err) + } + if _, err := conn.ExecContext(context.Background(), + `INSERT INTO pending_runs (id, schedule_id, source_group_id, host_id, attempt, next_attempt_at, scheduled_at) + VALUES (?, ?, ?, ?, 1, ?, ?)`, + pendingID, sid, fakeGroupID, hostID, + now.Add(-time.Second), now.Add(-time.Minute), + ); err != nil { + t.Fatalf("insert pending: %v", err) + } + if _, err := conn.ExecContext(context.Background(), `PRAGMA foreign_keys = ON`); err != nil { + t.Fatalf("fk on: %v", err) + } + + auditBefore := countAuditAction(t, st, "pending_run.abandoned") + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "gone-group-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.DrainPending(context.Background(), hostID) + + if n := countPendingForHost(t, st, hostID); n != 0 { + t.Errorf("pending rows after source-group-gone abandon: got %d, want 0", n) + } + if d := countAuditAction(t, st, "pending_run.abandoned") - auditBefore; d != 1 { + t.Errorf("audit delta: got %d, want 1", d) + } +} + func TestDrainAllDueSkipsOfflineHosts(t *testing.T) { t.Parallel() srv, _, st := rawTestServer(t)