Files
restic-manager/internal/server/http/pending_drain.go
T
steve 02dbe59d68 server: drainer uses dispatch-core to avoid duplicate pending_run enqueue
Extract dispatchBackupForGroupCore (persist+marshal+send, no enqueue on
failure) from dispatchBackupForGroup. drainOne now calls the core
directly so a failed Send only bumps the existing pending_runs row via
BumpPendingRunAttempt — not create a second row — stopping the
geometric duplication on repeated drain failures.

dispatchBackupForGroup (schedule.fire path) wraps the core and keeps
its enqueue-on-failure behaviour unchanged.

TestDrainPendingBumpsOnSendFailure strengthened: asserts exactly 1 row
remains after a send failure (was tolerating >=1 duplicate rows).
2026-05-04 10:15:18 +01:00

171 lines
5.6 KiB
Go

// pending_drain.go — drains pending_runs rows that are due (or, on
// agent reconnect, every row for that host).
//
// Two trigger paths:
// 1. The 30s tick in cmd/server (DrainAllDue) — sweeps every host
// with rows whose next_attempt_at <= now.
// 2. onAgentHello (DrainPending(hostID)) — when a host comes back,
// walk all of its pending rows synchronously so the operator
// sees the queue drain promptly.
package http
import (
"context"
"errors"
"log/slog"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
const (
pendingDrainBatchLimit = 100
pendingDrainBackoffMax = 30 * time.Minute
)
// DrainPending re-dispatches every pending_runs row for hostID. The
// host must already be connected (caller's responsibility — typically
// onAgentHello). Each row's source group + schedule are loaded; if
// either is gone the row is dropped (audit-logged as abandoned). If
// the row's attempt count meets/exceeds the group's retry_max, the
// row is dropped (audit-logged as abandoned). Otherwise we attempt
// dispatch; success deletes the row, failure bumps the attempt and
// reschedules with exponential backoff.
func (s *Server) DrainPending(ctx context.Context, hostID string) {
runs, err := s.deps.Store.ListPendingRunsForHost(ctx, hostID)
if err != nil {
slog.Warn("drain pending: list", "host_id", hostID, "err", err)
return
}
if len(runs) == 0 {
return
}
conn := s.deps.Hub.Conn(hostID)
if conn == nil {
// Host went offline between the connectedness check and now.
// Skip — next tick or next reconnect will retry.
return
}
for _, p := range runs {
s.drainOne(ctx, conn, p)
}
}
// drainOne handles a single pending row. Refactored out so DrainPending
// reads cleanly. Side-effects: delete, bump, audit, dispatch — all
// per-row.
func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun) {
sc, err := s.deps.Store.GetSchedule(ctx, p.HostID, p.ScheduleID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
s.abandonPending(ctx, p, "schedule gone")
return
}
slog.Warn("drain pending: load schedule",
"host_id", p.HostID, "schedule_id", p.ScheduleID, "err", err)
return
}
if !sc.Enabled {
s.abandonPending(ctx, p, "schedule disabled")
return
}
g, err := s.deps.Store.GetSourceGroup(ctx, p.HostID, p.SourceGroupID)
if err != nil {
s.abandonPending(ctx, p, "source group gone")
return
}
if g.RetryMax > 0 && p.Attempt >= g.RetryMax {
s.abandonPending(ctx, p, "retry_max exceeded")
return
}
// Calls dispatchBackupForGroupCore (not dispatchBackupForGroup) so a
// failed Send doesn't double-enqueue: dispatchBackupForGroup's
// enqueue-on-failure path would create a NEW pending_runs row while
// this function already bumps the EXISTING row via
// BumpPendingRunAttempt, producing geometric duplicates on repeated
// failures.
jobID, _ := s.dispatchBackupForGroupCore(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt)
if jobID == "" {
// Send failed again. Bump attempt with exponential backoff.
baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second
if baseBackoff <= 0 {
baseBackoff = 60 * time.Second
}
backoff := baseBackoff
for i := 0; i < p.Attempt; i++ {
backoff *= 2
if backoff >= pendingDrainBackoffMax {
backoff = pendingDrainBackoffMax
break
}
}
next := time.Now().UTC().Add(backoff)
if err := s.deps.Store.BumpPendingRunAttempt(ctx, p.ID, next, "drain dispatch failed"); err != nil {
slog.Warn("drain pending: bump", "host_id", p.HostID, "id", p.ID, "err", err)
}
return
}
// Success — drop the pending row.
if err := s.deps.Store.DeletePendingRun(ctx, p.ID); err != nil {
slog.Warn("drain pending: delete after dispatch", "host_id", p.HostID, "id", p.ID, "err", err)
}
slog.Info("drain pending: dispatched",
"host_id", p.HostID, "schedule_id", p.ScheduleID, "group", g.Name,
"attempt", p.Attempt, "job_id", jobID)
}
// abandonPending deletes the row and records an audit entry. The row
// is gone but the audit trail preserves the forensic record of why.
func (s *Server) abandonPending(ctx context.Context, p store.PendingRun, reason string) {
slog.Info("drain pending: abandoning",
"host_id", p.HostID, "schedule_id", p.ScheduleID,
"attempt", p.Attempt, "reason", reason)
scheduleID := p.ScheduleID
if err := s.deps.Store.AppendAudit(ctx, store.AuditEntry{
ID: ulid.Make().String(),
Actor: "system",
Action: "pending_run.abandoned",
TargetKind: ptr("schedule"),
TargetID: &scheduleID,
TS: time.Now().UTC(),
}); err != nil {
slog.Warn("drain pending: audit on abandon", "id", p.ID, "err", err)
}
if err := s.deps.Store.DeletePendingRun(ctx, p.ID); err != nil {
slog.Warn("drain pending: delete on abandon", "id", p.ID, "err", err)
}
}
// DrainAllDue is the 30s-ticker entrypoint. Walks rows whose
// next_attempt_at <= now (DuePendingRuns), dedupes by host, and calls
// DrainPending per host. The DrainPending then re-walks the host's
// rows (same DB hit as the dedupe iteration would have done — keeps
// the per-host concurrency model simple).
func (s *Server) DrainAllDue(ctx context.Context) {
if s.deps.Hub == nil {
return
}
due, err := s.deps.Store.DuePendingRuns(ctx, time.Now().UTC(), pendingDrainBatchLimit)
if err != nil {
slog.Warn("drain all due: list", "err", err)
return
}
if len(due) == 0 {
return
}
seen := make(map[string]struct{}, len(due))
for _, p := range due {
if _, ok := seen[p.HostID]; ok {
continue
}
seen[p.HostID] = struct{}{}
if !s.deps.Hub.Connected(p.HostID) {
continue
}
s.DrainPending(ctx, p.HostID)
}
}