server: drainer uses dispatch-core to avoid duplicate pending_run enqueue

Extract dispatchBackupForGroupCore (persist+marshal+send, no enqueue on
failure) from dispatchBackupForGroup. drainOne now calls the core
directly so a failed Send only bumps the existing pending_runs row via
BumpPendingRunAttempt — not create a second row — stopping the
geometric duplication on repeated drain failures.

dispatchBackupForGroup (schedule.fire path) wraps the core and keeps
its enqueue-on-failure behaviour unchanged.

TestDrainPendingBumpsOnSendFailure strengthened: asserts exactly 1 row
remains after a send failure (was tolerating >=1 duplicate rows).
This commit is contained in:
2026-05-04 00:01:42 +01:00
parent 5b4a590508
commit d6dcdd5ec4
3 changed files with 67 additions and 53 deletions
+53 -36
View File
@@ -164,15 +164,19 @@ func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *
}
}
// dispatchBackupForGroup builds and sends a single backup command.run
// envelope on conn for the given group. Persists the job row first so
// the live log viewer can subscribe to it.
// dispatchBackupForGroup persists a backup job row, sends the
// command.run envelope to the agent, and audit-logs the dispatch.
// Returns the persisted job ID on success, or "" on any failure
// (failures are slog.Warn-ed). Callers may use the returned ID to,
// e.g., redirect the UI to the live job log.
func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) string {
// dispatchBackupForGroupCore persists a backup job row, marshals and
// sends the command.run envelope, and audit-logs the dispatch. It does
// NOT enqueue a PendingRun on failure — that responsibility belongs to
// the caller when appropriate.
//
// Returns (jobID, nil) on success. Returns ("", err) on any failure;
// the error is also slog.Warn-ed inside this function so callers don't
// need to log it again.
//
// Used by both dispatchBackupForGroup (schedule.fire path, which adds
// enqueue-on-failure) and drainOne (which handles failure via
// BumpPendingRunAttempt on the existing row, avoiding double-enqueue).
func (s *Server) dispatchBackupForGroupCore(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) (string, error) {
jobID := ulid.Make().String()
now := time.Now().UTC()
scheduleRef := scheduleID
@@ -186,7 +190,7 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host
}); err != nil {
slog.Warn("schedule.fire: persist job", "host_id", hostID,
"schedule_id", scheduleID, "group", g.Name, "err", err)
return ""
return "", err
}
// Backup ignores RetentionPolicy — the forget cadence lives on
// host_repo_maintenance and is driven by the server-side ticker
@@ -201,36 +205,17 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host
if err != nil {
slog.Warn("schedule.fire: marshal command.run",
"host_id", hostID, "schedule_id", scheduleID, "err", err)
return ""
return "", err
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := conn.Send(sendCtx, env); err != nil {
slog.Warn("schedule.fire: send command.run failed, queueing for retry",
slog.Warn("schedule.fire: send command.run failed",
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err)
backoff := time.Duration(g.RetryBackoffSeconds) * time.Second
if backoff <= 0 {
backoff = 60 * time.Second
}
if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{
ID: ulid.Make().String(),
ScheduleID: scheduleID,
SourceGroupID: g.ID,
HostID: hostID,
Attempt: 1,
NextAttemptAt: time.Now().UTC().Add(backoff),
ScheduledAt: scheduledAt,
LastError: err.Error(),
}); enqueueErr != nil {
slog.Warn("schedule.fire: enqueue pending run failed",
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr)
}
// The job row was already persisted earlier in this function — leave
// it in `queued` status. The drainer will re-dispatch (creating a
// new job row) and the orphaned queued row stays for forensic
// visibility. Don't delete it: the audit trail still wants to know
// "we tried and the wire was wedged."
return ""
// The job row was already persisted — leave it in `queued` status.
// The drainer will re-dispatch (creating a new job row) and the
// orphaned queued row stays for forensic visibility.
return "", err
}
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{
ID: ulid.Make().String(),
@@ -243,5 +228,37 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host
slog.Info("schedule.fire: dispatched backup",
"host_id", hostID, "schedule_id", scheduleID,
"group", g.Name, "job_id", jobID, "scheduled_at", scheduledAt)
return jobID
return jobID, nil
}
// dispatchBackupForGroup is the schedule.fire entry point. Wraps
// dispatchBackupForGroupCore with enqueue-on-failure: a failed Send
// queues a fresh PendingRun for the drainer to retry later.
//
// Returns the persisted job ID on success, or "" on any failure.
func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) string {
jobID, err := s.dispatchBackupForGroupCore(ctx, conn, hostID, scheduleID, g, scheduledAt)
if err == nil {
return jobID
}
// Send (or an earlier step) failed — err was already logged inside
// the core. Enqueue a fresh PendingRun for the drainer to retry.
backoff := time.Duration(g.RetryBackoffSeconds) * time.Second
if backoff <= 0 {
backoff = 60 * time.Second
}
if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{
ID: ulid.Make().String(),
ScheduleID: scheduleID,
SourceGroupID: g.ID,
HostID: hostID,
Attempt: 1,
NextAttemptAt: time.Now().UTC().Add(backoff),
ScheduledAt: scheduledAt,
LastError: err.Error(),
}); enqueueErr != nil {
slog.Warn("schedule.fire: enqueue pending run failed",
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr)
}
return ""
}