From 02dbe59d68d49789a63a0bb5ef706b9f64934708 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 4 May 2026 00:01:42 +0100 Subject: [PATCH] server: drainer uses dispatch-core to avoid duplicate pending_run enqueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract dispatchBackupForGroupCore (persist+marshal+send, no enqueue on failure) from dispatchBackupForGroup. drainOne now calls the core directly so a failed Send only bumps the existing pending_runs row via BumpPendingRunAttempt — not create a second row — stopping the geometric duplication on repeated drain failures. dispatchBackupForGroup (schedule.fire path) wraps the core and keeps its enqueue-on-failure behaviour unchanged. TestDrainPendingBumpsOnSendFailure strengthened: asserts exactly 1 row remains after a send failure (was tolerating >=1 duplicate rows). --- internal/server/http/pending_drain.go | 13 ++-- internal/server/http/pending_drain_test.go | 18 ++--- internal/server/http/schedule_push.go | 89 +++++++++++++--------- 3 files changed, 67 insertions(+), 53 deletions(-) diff --git a/internal/server/http/pending_drain.go b/internal/server/http/pending_drain.go index 3419894..04dcd60 100644 --- a/internal/server/http/pending_drain.go +++ b/internal/server/http/pending_drain.go @@ -81,14 +81,15 @@ func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun s.abandonPending(ctx, p, "retry_max exceeded") return } - jobID := s.dispatchBackupForGroup(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt) + // Calls dispatchBackupForGroupCore (not dispatchBackupForGroup) so a + // failed Send doesn't double-enqueue: dispatchBackupForGroup's + // enqueue-on-failure path would create a NEW pending_runs row while + // this function already bumps the EXISTING row via + // BumpPendingRunAttempt, producing geometric duplicates on repeated + // failures. + jobID, _ := s.dispatchBackupForGroupCore(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt) if jobID == "" { // Send failed again. Bump attempt with exponential backoff. - // Note: dispatchBackupForGroup's failure path *also* enqueues a - // fresh pending_runs row (G1.1). That's a duplicate, but harmless: - // it'll be drained the same way and either succeed or hit - // retry_max alongside this one. The bump below preserves this - // row's history (attempt count, last error) for forensics. baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second if baseBackoff <= 0 { baseBackoff = 60 * time.Second diff --git a/internal/server/http/pending_drain_test.go b/internal/server/http/pending_drain_test.go index 1028bea..488bcfc 100644 --- a/internal/server/http/pending_drain_test.go +++ b/internal/server/http/pending_drain_test.go @@ -255,11 +255,13 @@ func TestDrainPendingBumpsOnSendFailure(t *testing.T) { Attempt: 1, NextAttemptAt: now.Add(-time.Second), ScheduledAt: now.Add(-time.Minute), }) - // The original row was bumped (attempt=2) — the G1.1 path may have - // also enqueued a duplicate row from inside dispatchBackupForGroup's - // failed Send. So we expect exactly the original row updated, plus - // possibly one duplicate. Either way: pending count >= 1, no row - // deleted, and the original row's attempt bumped to 2. + // The original row must be bumped to attempt=2 with a non-empty + // last_error. Critically, NO duplicate row should have been created: + // drainOne calls dispatchBackupForGroupCore (not dispatchBackupForGroup) + // so the enqueue-on-failure path is bypassed and the count stays at 1. + if n := countPendingForHost(t, st, hostID); n != 1 { + t.Errorf("pending rows after send failure: got %d, want 1 (no duplicate enqueue)", n) + } var attempt int var lastErr string if err := st.DB().QueryRow( @@ -273,12 +275,6 @@ func TestDrainPendingBumpsOnSendFailure(t *testing.T) { if lastErr == "" { t.Errorf("last_error empty after bump") } - // No successful backup job persisted via DrainPending. - // (dispatchBackupForGroup *does* create a job row before attempting - // the send and leaves it on send-failure; that row exists. The - // "successful job" we care about would be one that wasn't followed - // by an enqueue — there aren't any here. Asserting on the bump is - // the cleaner signal.) } func TestDrainPendingDropsRowsForGoneSchedule(t *testing.T) { diff --git a/internal/server/http/schedule_push.go b/internal/server/http/schedule_push.go index 42fadfe..6bcffb6 100644 --- a/internal/server/http/schedule_push.go +++ b/internal/server/http/schedule_push.go @@ -164,15 +164,19 @@ func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn * } } -// dispatchBackupForGroup builds and sends a single backup command.run -// envelope on conn for the given group. Persists the job row first so -// the live log viewer can subscribe to it. -// dispatchBackupForGroup persists a backup job row, sends the -// command.run envelope to the agent, and audit-logs the dispatch. -// Returns the persisted job ID on success, or "" on any failure -// (failures are slog.Warn-ed). Callers may use the returned ID to, -// e.g., redirect the UI to the live job log. -func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) string { +// dispatchBackupForGroupCore persists a backup job row, marshals and +// sends the command.run envelope, and audit-logs the dispatch. It does +// NOT enqueue a PendingRun on failure — that responsibility belongs to +// the caller when appropriate. +// +// Returns (jobID, nil) on success. Returns ("", err) on any failure; +// the error is also slog.Warn-ed inside this function so callers don't +// need to log it again. +// +// Used by both dispatchBackupForGroup (schedule.fire path, which adds +// enqueue-on-failure) and drainOne (which handles failure via +// BumpPendingRunAttempt on the existing row, avoiding double-enqueue). +func (s *Server) dispatchBackupForGroupCore(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) (string, error) { jobID := ulid.Make().String() now := time.Now().UTC() scheduleRef := scheduleID @@ -186,7 +190,7 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host }); err != nil { slog.Warn("schedule.fire: persist job", "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err) - return "" + return "", err } // Backup ignores RetentionPolicy — the forget cadence lives on // host_repo_maintenance and is driven by the server-side ticker @@ -201,36 +205,17 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host if err != nil { slog.Warn("schedule.fire: marshal command.run", "host_id", hostID, "schedule_id", scheduleID, "err", err) - return "" + return "", err } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.Send(sendCtx, env); err != nil { - slog.Warn("schedule.fire: send command.run failed, queueing for retry", + slog.Warn("schedule.fire: send command.run failed", "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err) - backoff := time.Duration(g.RetryBackoffSeconds) * time.Second - if backoff <= 0 { - backoff = 60 * time.Second - } - if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{ - ID: ulid.Make().String(), - ScheduleID: scheduleID, - SourceGroupID: g.ID, - HostID: hostID, - Attempt: 1, - NextAttemptAt: time.Now().UTC().Add(backoff), - ScheduledAt: scheduledAt, - LastError: err.Error(), - }); enqueueErr != nil { - slog.Warn("schedule.fire: enqueue pending run failed", - "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr) - } - // The job row was already persisted earlier in this function — leave - // it in `queued` status. The drainer will re-dispatch (creating a - // new job row) and the orphaned queued row stays for forensic - // visibility. Don't delete it: the audit trail still wants to know - // "we tried and the wire was wedged." - return "" + // The job row was already persisted — leave it in `queued` status. + // The drainer will re-dispatch (creating a new job row) and the + // orphaned queued row stays for forensic visibility. + return "", err } _ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ ID: ulid.Make().String(), @@ -243,5 +228,37 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host slog.Info("schedule.fire: dispatched backup", "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "job_id", jobID, "scheduled_at", scheduledAt) - return jobID + return jobID, nil +} + +// dispatchBackupForGroup is the schedule.fire entry point. Wraps +// dispatchBackupForGroupCore with enqueue-on-failure: a failed Send +// queues a fresh PendingRun for the drainer to retry later. +// +// Returns the persisted job ID on success, or "" on any failure. +func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) string { + jobID, err := s.dispatchBackupForGroupCore(ctx, conn, hostID, scheduleID, g, scheduledAt) + if err == nil { + return jobID + } + // Send (or an earlier step) failed — err was already logged inside + // the core. Enqueue a fresh PendingRun for the drainer to retry. + backoff := time.Duration(g.RetryBackoffSeconds) * time.Second + if backoff <= 0 { + backoff = 60 * time.Second + } + if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{ + ID: ulid.Make().String(), + ScheduleID: scheduleID, + SourceGroupID: g.ID, + HostID: hostID, + Attempt: 1, + NextAttemptAt: time.Now().UTC().Add(backoff), + ScheduledAt: scheduledAt, + LastError: err.Error(), + }); enqueueErr != nil { + slog.Warn("schedule.fire: enqueue pending run failed", + "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr) + } + return "" }