e0eae0a96f
GetSourceGroup errors in drainOne now gate on errors.Is(err, store.ErrNotFound) before calling abandonPending, mirroring the existing GetSchedule pattern. Transient errors (SQLITE_BUSY, context cancellation) now log a warning and return without deleting the row. Add regression test TestDrainPendingDropsRowsForGoneSourceGroup confirming the ErrNotFound path still abandons correctly. Also add a comment above the backoff-doubling loop explaining the progression.
180 lines
6.1 KiB
Go
180 lines
6.1 KiB
Go
// pending_drain.go — drains pending_runs rows that are due (or, on
|
|
// agent reconnect, every row for that host).
|
|
//
|
|
// Two trigger paths:
|
|
// 1. The 30s tick in cmd/server (DrainAllDue) — sweeps every host
|
|
// with rows whose next_attempt_at <= now.
|
|
// 2. onAgentHello (DrainPending(hostID)) — when a host comes back,
|
|
// walk all of its pending rows synchronously so the operator
|
|
// sees the queue drain promptly.
|
|
package http
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/oklog/ulid/v2"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
|
)
|
|
|
|
const (
|
|
pendingDrainBatchLimit = 100
|
|
pendingDrainBackoffMax = 30 * time.Minute
|
|
)
|
|
|
|
// DrainPending re-dispatches every pending_runs row for hostID. The
|
|
// host must already be connected (caller's responsibility — typically
|
|
// onAgentHello). Each row's source group + schedule are loaded; if
|
|
// either is gone the row is dropped (audit-logged as abandoned). If
|
|
// the row's attempt count meets/exceeds the group's retry_max, the
|
|
// row is dropped (audit-logged as abandoned). Otherwise we attempt
|
|
// dispatch; success deletes the row, failure bumps the attempt and
|
|
// reschedules with exponential backoff.
|
|
func (s *Server) DrainPending(ctx context.Context, hostID string) {
|
|
runs, err := s.deps.Store.ListPendingRunsForHost(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("drain pending: list", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
if len(runs) == 0 {
|
|
return
|
|
}
|
|
conn := s.deps.Hub.Conn(hostID)
|
|
if conn == nil {
|
|
// Host went offline between the connectedness check and now.
|
|
// Skip — next tick or next reconnect will retry.
|
|
return
|
|
}
|
|
for _, p := range runs {
|
|
s.drainOne(ctx, conn, p)
|
|
}
|
|
}
|
|
|
|
// drainOne handles a single pending row. Refactored out so DrainPending
|
|
// reads cleanly. Side-effects: delete, bump, audit, dispatch — all
|
|
// per-row.
|
|
func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun) {
|
|
sc, err := s.deps.Store.GetSchedule(ctx, p.HostID, p.ScheduleID)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
s.abandonPending(ctx, p, "schedule gone")
|
|
return
|
|
}
|
|
slog.Warn("drain pending: load schedule",
|
|
"host_id", p.HostID, "schedule_id", p.ScheduleID, "err", err)
|
|
return
|
|
}
|
|
if !sc.Enabled {
|
|
s.abandonPending(ctx, p, "schedule disabled")
|
|
return
|
|
}
|
|
g, err := s.deps.Store.GetSourceGroup(ctx, p.HostID, p.SourceGroupID)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
s.abandonPending(ctx, p, "source group gone")
|
|
} else {
|
|
slog.Warn("drain pending: load source group",
|
|
"host_id", p.HostID, "group_id", p.SourceGroupID, "err", err)
|
|
}
|
|
return
|
|
}
|
|
if g.RetryMax > 0 && p.Attempt >= g.RetryMax {
|
|
s.abandonPending(ctx, p, "retry_max exceeded")
|
|
return
|
|
}
|
|
// Calls dispatchBackupForGroupCore (not dispatchBackupForGroup) so a
|
|
// failed Send doesn't double-enqueue: dispatchBackupForGroup's
|
|
// enqueue-on-failure path would create a NEW pending_runs row while
|
|
// this function already bumps the EXISTING row via
|
|
// BumpPendingRunAttempt, producing geometric duplicates on repeated
|
|
// failures.
|
|
jobID, _ := s.dispatchBackupForGroupCore(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt)
|
|
if jobID == "" {
|
|
// Send failed again. Bump attempt with exponential backoff.
|
|
// Exponential backoff doubles immediately on the first drain
|
|
// retry: enqueue at base, attempt=1 → drain → 2*base, attempt=2 →
|
|
// drain → 4*base, etc. Capped at pendingDrainBackoffMax. With
|
|
// defaults (60s base, retry_max=3) the schedule is 60→120→240s.
|
|
baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second
|
|
if baseBackoff <= 0 {
|
|
baseBackoff = 60 * time.Second
|
|
}
|
|
backoff := baseBackoff
|
|
for i := 0; i < p.Attempt; i++ {
|
|
backoff *= 2
|
|
if backoff >= pendingDrainBackoffMax {
|
|
backoff = pendingDrainBackoffMax
|
|
break
|
|
}
|
|
}
|
|
next := time.Now().UTC().Add(backoff)
|
|
if err := s.deps.Store.BumpPendingRunAttempt(ctx, p.ID, next, "drain dispatch failed"); err != nil {
|
|
slog.Warn("drain pending: bump", "host_id", p.HostID, "id", p.ID, "err", err)
|
|
}
|
|
return
|
|
}
|
|
// Success — drop the pending row.
|
|
if err := s.deps.Store.DeletePendingRun(ctx, p.ID); err != nil {
|
|
slog.Warn("drain pending: delete after dispatch", "host_id", p.HostID, "id", p.ID, "err", err)
|
|
}
|
|
slog.Info("drain pending: dispatched",
|
|
"host_id", p.HostID, "schedule_id", p.ScheduleID, "group", g.Name,
|
|
"attempt", p.Attempt, "job_id", jobID)
|
|
}
|
|
|
|
// abandonPending deletes the row and records an audit entry. The row
|
|
// is gone but the audit trail preserves the forensic record of why.
|
|
func (s *Server) abandonPending(ctx context.Context, p store.PendingRun, reason string) {
|
|
slog.Info("drain pending: abandoning",
|
|
"host_id", p.HostID, "schedule_id", p.ScheduleID,
|
|
"attempt", p.Attempt, "reason", reason)
|
|
scheduleID := p.ScheduleID
|
|
if err := s.deps.Store.AppendAudit(ctx, store.AuditEntry{
|
|
ID: ulid.Make().String(),
|
|
Actor: "system",
|
|
Action: "pending_run.abandoned",
|
|
TargetKind: ptr("schedule"),
|
|
TargetID: &scheduleID,
|
|
TS: time.Now().UTC(),
|
|
}); err != nil {
|
|
slog.Warn("drain pending: audit on abandon", "id", p.ID, "err", err)
|
|
}
|
|
if err := s.deps.Store.DeletePendingRun(ctx, p.ID); err != nil {
|
|
slog.Warn("drain pending: delete on abandon", "id", p.ID, "err", err)
|
|
}
|
|
}
|
|
|
|
// DrainAllDue is the 30s-ticker entrypoint. Walks rows whose
|
|
// next_attempt_at <= now (DuePendingRuns), dedupes by host, and calls
|
|
// DrainPending per host. The DrainPending then re-walks the host's
|
|
// rows (same DB hit as the dedupe iteration would have done — keeps
|
|
// the per-host concurrency model simple).
|
|
func (s *Server) DrainAllDue(ctx context.Context) {
|
|
if s.deps.Hub == nil {
|
|
return
|
|
}
|
|
due, err := s.deps.Store.DuePendingRuns(ctx, time.Now().UTC(), pendingDrainBatchLimit)
|
|
if err != nil {
|
|
slog.Warn("drain all due: list", "err", err)
|
|
return
|
|
}
|
|
if len(due) == 0 {
|
|
return
|
|
}
|
|
seen := make(map[string]struct{}, len(due))
|
|
for _, p := range due {
|
|
if _, ok := seen[p.HostID]; ok {
|
|
continue
|
|
}
|
|
seen[p.HostID] = struct{}{}
|
|
if !s.deps.Hub.Connected(p.HostID) {
|
|
continue
|
|
}
|
|
s.DrainPending(ctx, p.HostID)
|
|
}
|
|
}
|