server: drainer abandons only on ErrNotFound, not transient errors
GetSourceGroup errors in drainOne now gate on errors.Is(err, store.ErrNotFound) before calling abandonPending, mirroring the existing GetSchedule pattern. Transient errors (SQLITE_BUSY, context cancellation) now log a warning and return without deleting the row. Add regression test TestDrainPendingDropsRowsForGoneSourceGroup confirming the ErrNotFound path still abandons correctly. Also add a comment above the backoff-doubling loop explaining the progression.
This commit is contained in:
@@ -74,7 +74,12 @@ func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun
|
|||||||
}
|
}
|
||||||
g, err := s.deps.Store.GetSourceGroup(ctx, p.HostID, p.SourceGroupID)
|
g, err := s.deps.Store.GetSourceGroup(ctx, p.HostID, p.SourceGroupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.abandonPending(ctx, p, "source group gone")
|
if errors.Is(err, store.ErrNotFound) {
|
||||||
|
s.abandonPending(ctx, p, "source group gone")
|
||||||
|
} else {
|
||||||
|
slog.Warn("drain pending: load source group",
|
||||||
|
"host_id", p.HostID, "group_id", p.SourceGroupID, "err", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if g.RetryMax > 0 && p.Attempt >= g.RetryMax {
|
if g.RetryMax > 0 && p.Attempt >= g.RetryMax {
|
||||||
@@ -90,6 +95,10 @@ func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun
|
|||||||
jobID, _ := s.dispatchBackupForGroupCore(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt)
|
jobID, _ := s.dispatchBackupForGroupCore(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt)
|
||||||
if jobID == "" {
|
if jobID == "" {
|
||||||
// Send failed again. Bump attempt with exponential backoff.
|
// Send failed again. Bump attempt with exponential backoff.
|
||||||
|
// Exponential backoff doubles immediately on the first drain
|
||||||
|
// retry: enqueue at base, attempt=1 → drain → 2*base, attempt=2 →
|
||||||
|
// drain → 4*base, etc. Capped at pendingDrainBackoffMax. With
|
||||||
|
// defaults (60s base, retry_max=3) the schedule is 60→120→240s.
|
||||||
baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second
|
baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second
|
||||||
if baseBackoff <= 0 {
|
if baseBackoff <= 0 {
|
||||||
baseBackoff = 60 * time.Second
|
baseBackoff = 60 * time.Second
|
||||||
|
|||||||
@@ -341,6 +341,62 @@ func TestDrainPendingDropsRowsForGoneSchedule(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDrainPendingDropsRowsForGoneSourceGroup verifies that when a
|
||||||
|
// source group is gone (ErrNotFound) the pending row is abandoned and
|
||||||
|
// an audit entry is written. Transient-error paths (SQLITE_BUSY,
|
||||||
|
// context cancellation) are not covered here because the real *Store
|
||||||
|
// doesn't expose a fault-injection seam; the code-review check above
|
||||||
|
// is the gate for that path.
|
||||||
|
func TestDrainPendingDropsRowsForGoneSourceGroup(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "gone-group-host")
|
||||||
|
_, sid := seedSchedAndGroup(t, st, hostID, 5)
|
||||||
|
|
||||||
|
// Use a source_group_id that never existed. pending_runs carries a
|
||||||
|
// FK to source_groups, so we must bypass FK enforcement for this
|
||||||
|
// insert. PRAGMA foreign_keys is connection-scoped and can only be
|
||||||
|
// changed outside a transaction; DB().Exec runs on an arbitrary
|
||||||
|
// pooled connection, so we pin it with a dedicated *sql.Conn.
|
||||||
|
fakeGroupID := ulid.Make().String()
|
||||||
|
pendingID := ulid.Make().String()
|
||||||
|
now := time.Now().UTC()
|
||||||
|
conn, err := st.DB().Conn(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("db conn: %v", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err := conn.ExecContext(context.Background(), `PRAGMA foreign_keys = OFF`); err != nil {
|
||||||
|
t.Fatalf("fk off: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := conn.ExecContext(context.Background(),
|
||||||
|
`INSERT INTO pending_runs (id, schedule_id, source_group_id, host_id, attempt, next_attempt_at, scheduled_at)
|
||||||
|
VALUES (?, ?, ?, ?, 1, ?, ?)`,
|
||||||
|
pendingID, sid, fakeGroupID, hostID,
|
||||||
|
now.Add(-time.Second), now.Add(-time.Minute),
|
||||||
|
); err != nil {
|
||||||
|
t.Fatalf("insert pending: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := conn.ExecContext(context.Background(), `PRAGMA foreign_keys = ON`); err != nil {
|
||||||
|
t.Fatalf("fk on: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
auditBefore := countAuditAction(t, st, "pending_run.abandoned")
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "gone-group-host")
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
srv.DrainPending(context.Background(), hostID)
|
||||||
|
|
||||||
|
if n := countPendingForHost(t, st, hostID); n != 0 {
|
||||||
|
t.Errorf("pending rows after source-group-gone abandon: got %d, want 0", n)
|
||||||
|
}
|
||||||
|
if d := countAuditAction(t, st, "pending_run.abandoned") - auditBefore; d != 1 {
|
||||||
|
t.Errorf("audit delta: got %d, want 1", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDrainAllDueSkipsOfflineHosts(t *testing.T) {
|
func TestDrainAllDueSkipsOfflineHosts(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
srv, _, st := rawTestServer(t)
|
srv, _, st := rawTestServer(t)
|
||||||
|
|||||||
Reference in New Issue
Block a user