From e64075d5d7cb2872b03968dca0a0eb45c6728588 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Tue, 16 Jun 2026 13:29:47 +0100 Subject: [PATCH] test(pending-drain): de-flake TestDrainPendingSerializesPerHost Keep the test WS client actively reading (a real agent always is) so the server-side conn stays registered under parallel load, and drain to completion via condition polling instead of asserting one-shot completeness. The conn could be dropped/unregistered under CI load, making DrainPending correctly no-op (conn==nil) and the test observe a partial/empty drain. -race confirms no production data race; the exactly-5-jobs assertion (proving the per-host mutex blocks double-dispatch) is unchanged. Verified: 0 failures over 25 loaded runs + 4 -race iterations. --- internal/server/http/pending_drain_test.go | 53 +++++++++++++++------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/internal/server/http/pending_drain_test.go b/internal/server/http/pending_drain_test.go index a1714a9..abb64a1 100644 --- a/internal/server/http/pending_drain_test.go +++ b/internal/server/http/pending_drain_test.go @@ -512,11 +512,27 @@ func TestDrainPendingSerializesPerHost(t *testing.T) { // Connect the agent so DrainPending can dispatch. c := agentDial(t, srv, ts, hostID, token) sendHello(t, c, "serialise-host") - // Drain the on-hello goroutine's pass first (no pending rows yet), - // then wait for the schedule.set so the connection is fully settled. + // Wait for the on-hello push to settle. _ = drainUntil(t, c, api.MsgScheduleSet) - // Insert 5 pending rows now that the on-hello drain has already run. + // A real agent is always in a read loop. Keep this test client + // reading in the background for the rest of the test: without an + // active reader the server-side conn can be dropped under parallel + // load, which unregisters it from the hub and makes DrainPending + // no-op (conn == nil) — the historical source of this test's + // flakiness (it would observe 0 or a partial drain). The reader also + // consumes the command.run envelopes our drains emit. + readerCtx, stopReader := context.WithCancel(context.Background()) + defer stopReader() + go func() { + for { + if _, _, err := c.Read(readerCtx); err != nil { + return + } + } + }() + + // Insert 5 due pending rows. now := time.Now().UTC() for i := range 5 { pid := ulid.Make().String() @@ -533,7 +549,8 @@ func TestDrainPendingSerializesPerHost(t *testing.T) { } } - // Spawn 10 goroutines all calling DrainPending concurrently. + // Fire 10 concurrent DrainPending calls. The per-host mutex must + // ensure each row is dispatched at most once (no double-dispatch). var wg sync.WaitGroup for range 10 { wg.Add(1) @@ -544,24 +561,26 @@ func TestDrainPendingSerializesPerHost(t *testing.T) { } wg.Wait() - // Drain any envelopes the agent received so we don't block below. - // We read with short timeouts and stop when the connection goes quiet. - drainDeadline := time.Now().Add(500 * time.Millisecond) - for time.Now().Before(drainDeadline) { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - _, _, err := c.Read(ctx) - cancel() - if err != nil { - break - } + // Drain to completion. The fire-and-forget on-hello DrainPending + // shares the same per-host mutex and can hold it during the burst, + // leaving rows for a later pass — exactly how production drains + // (repeatedly, via the 30s tick / on reconnect). Re-drain until the + // queue is empty; because every drain is still serialised, each row + // is dispatched at most once, so the exactly-5 job count below proves + // there was no double-dispatch. + deadline := time.Now().Add(5 * time.Second) + for countPendingForHost(t, st, hostID) > 0 && time.Now().Before(deadline) { + srv.DrainPending(context.Background(), hostID) + time.Sleep(10 * time.Millisecond) } - // All 5 pending rows must be gone. + // All 5 pending rows must be drained. if n := countPendingForHost(t, st, hostID); n != 0 { - t.Errorf("pending rows after concurrent drain: got %d, want 0", n) + t.Errorf("pending rows after drain-to-completion: got %d, want 0", n) } - // Exactly 5 backup job rows (one per pending row), not 10+ from a race. + // Exactly 5 backup job rows (one per pending row) — never more, which + // would mean the per-host mutex failed to prevent double-dispatch. var n int _ = st.DB().QueryRow( `SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'backup' AND actor_kind = 'schedule'`,