server: drain pending_runs on tick + on agent reconnect
Two trigger paths land here: - A 30s ticker in cmd/server calls Server.DrainAllDue(ctx). It walks pending_runs rows whose next_attempt_at <= now, dedupes by host, skips offline hosts, and per online host runs DrainPending. - onAgentHello spawns a background DrainPending(hostID). When a host comes back, every pending row for it is dispatchable now — due-ness becomes irrelevant once the wire is back. Each row's schedule + group are reloaded; ErrNotFound or disabled-schedule or gone-group abandons the row with a pending_run.abandoned audit. attempt >= retry_max also abandons. Otherwise dispatchBackupForGroup is invoked; success deletes the row, failure bumps attempt with exponential backoff capped at 30m.
This commit is contained in:
@@ -0,0 +1,169 @@
|
||||
// pending_drain.go — drains pending_runs rows that are due (or, on
|
||||
// agent reconnect, every row for that host).
|
||||
//
|
||||
// Two trigger paths:
|
||||
// 1. The 30s tick in cmd/server (DrainAllDue) — sweeps every host
|
||||
// with rows whose next_attempt_at <= now.
|
||||
// 2. onAgentHello (DrainPending(hostID)) — when a host comes back,
|
||||
// walk all of its pending rows synchronously so the operator
|
||||
// sees the queue drain promptly.
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||
)
|
||||
|
||||
const (
|
||||
pendingDrainBatchLimit = 100
|
||||
pendingDrainBackoffMax = 30 * time.Minute
|
||||
)
|
||||
|
||||
// DrainPending re-dispatches every pending_runs row for hostID. The
|
||||
// host must already be connected (caller's responsibility — typically
|
||||
// onAgentHello). Each row's source group + schedule are loaded; if
|
||||
// either is gone the row is dropped (audit-logged as abandoned). If
|
||||
// the row's attempt count meets/exceeds the group's retry_max, the
|
||||
// row is dropped (audit-logged as abandoned). Otherwise we attempt
|
||||
// dispatch; success deletes the row, failure bumps the attempt and
|
||||
// reschedules with exponential backoff.
|
||||
func (s *Server) DrainPending(ctx context.Context, hostID string) {
|
||||
runs, err := s.deps.Store.ListPendingRunsForHost(ctx, hostID)
|
||||
if err != nil {
|
||||
slog.Warn("drain pending: list", "host_id", hostID, "err", err)
|
||||
return
|
||||
}
|
||||
if len(runs) == 0 {
|
||||
return
|
||||
}
|
||||
conn := s.deps.Hub.Conn(hostID)
|
||||
if conn == nil {
|
||||
// Host went offline between the connectedness check and now.
|
||||
// Skip — next tick or next reconnect will retry.
|
||||
return
|
||||
}
|
||||
for _, p := range runs {
|
||||
s.drainOne(ctx, conn, p)
|
||||
}
|
||||
}
|
||||
|
||||
// drainOne handles a single pending row. Refactored out so DrainPending
|
||||
// reads cleanly. Side-effects: delete, bump, audit, dispatch — all
|
||||
// per-row.
|
||||
func (s *Server) drainOne(ctx context.Context, conn *ws.Conn, p store.PendingRun) {
|
||||
sc, err := s.deps.Store.GetSchedule(ctx, p.HostID, p.ScheduleID)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
s.abandonPending(ctx, p, "schedule gone")
|
||||
return
|
||||
}
|
||||
slog.Warn("drain pending: load schedule",
|
||||
"host_id", p.HostID, "schedule_id", p.ScheduleID, "err", err)
|
||||
return
|
||||
}
|
||||
if !sc.Enabled {
|
||||
s.abandonPending(ctx, p, "schedule disabled")
|
||||
return
|
||||
}
|
||||
g, err := s.deps.Store.GetSourceGroup(ctx, p.HostID, p.SourceGroupID)
|
||||
if err != nil {
|
||||
s.abandonPending(ctx, p, "source group gone")
|
||||
return
|
||||
}
|
||||
if g.RetryMax > 0 && p.Attempt >= g.RetryMax {
|
||||
s.abandonPending(ctx, p, "retry_max exceeded")
|
||||
return
|
||||
}
|
||||
jobID := s.dispatchBackupForGroup(ctx, conn, p.HostID, p.ScheduleID, g, p.ScheduledAt)
|
||||
if jobID == "" {
|
||||
// Send failed again. Bump attempt with exponential backoff.
|
||||
// Note: dispatchBackupForGroup's failure path *also* enqueues a
|
||||
// fresh pending_runs row (G1.1). That's a duplicate, but harmless:
|
||||
// it'll be drained the same way and either succeed or hit
|
||||
// retry_max alongside this one. The bump below preserves this
|
||||
// row's history (attempt count, last error) for forensics.
|
||||
baseBackoff := time.Duration(g.RetryBackoffSeconds) * time.Second
|
||||
if baseBackoff <= 0 {
|
||||
baseBackoff = 60 * time.Second
|
||||
}
|
||||
backoff := baseBackoff
|
||||
for i := 0; i < p.Attempt; i++ {
|
||||
backoff *= 2
|
||||
if backoff >= pendingDrainBackoffMax {
|
||||
backoff = pendingDrainBackoffMax
|
||||
break
|
||||
}
|
||||
}
|
||||
next := time.Now().UTC().Add(backoff)
|
||||
if err := s.deps.Store.BumpPendingRunAttempt(ctx, p.ID, next, "drain dispatch failed"); err != nil {
|
||||
slog.Warn("drain pending: bump", "host_id", p.HostID, "id", p.ID, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Success — drop the pending row.
|
||||
if err := s.deps.Store.DeletePendingRun(ctx, p.ID); err != nil {
|
||||
slog.Warn("drain pending: delete after dispatch", "host_id", p.HostID, "id", p.ID, "err", err)
|
||||
}
|
||||
slog.Info("drain pending: dispatched",
|
||||
"host_id", p.HostID, "schedule_id", p.ScheduleID, "group", g.Name,
|
||||
"attempt", p.Attempt, "job_id", jobID)
|
||||
}
|
||||
|
||||
// abandonPending deletes the row and records an audit entry. The row
|
||||
// is gone but the audit trail preserves the forensic record of why.
|
||||
func (s *Server) abandonPending(ctx context.Context, p store.PendingRun, reason string) {
|
||||
slog.Info("drain pending: abandoning",
|
||||
"host_id", p.HostID, "schedule_id", p.ScheduleID,
|
||||
"attempt", p.Attempt, "reason", reason)
|
||||
scheduleID := p.ScheduleID
|
||||
if err := s.deps.Store.AppendAudit(ctx, store.AuditEntry{
|
||||
ID: ulid.Make().String(),
|
||||
Actor: "system",
|
||||
Action: "pending_run.abandoned",
|
||||
TargetKind: ptr("schedule"),
|
||||
TargetID: &scheduleID,
|
||||
TS: time.Now().UTC(),
|
||||
}); err != nil {
|
||||
slog.Warn("drain pending: audit on abandon", "id", p.ID, "err", err)
|
||||
}
|
||||
if err := s.deps.Store.DeletePendingRun(ctx, p.ID); err != nil {
|
||||
slog.Warn("drain pending: delete on abandon", "id", p.ID, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DrainAllDue is the 30s-ticker entrypoint. Walks rows whose
|
||||
// next_attempt_at <= now (DuePendingRuns), dedupes by host, and calls
|
||||
// DrainPending per host. The DrainPending then re-walks the host's
|
||||
// rows (same DB hit as the dedupe iteration would have done — keeps
|
||||
// the per-host concurrency model simple).
|
||||
func (s *Server) DrainAllDue(ctx context.Context) {
|
||||
if s.deps.Hub == nil {
|
||||
return
|
||||
}
|
||||
due, err := s.deps.Store.DuePendingRuns(ctx, time.Now().UTC(), pendingDrainBatchLimit)
|
||||
if err != nil {
|
||||
slog.Warn("drain all due: list", "err", err)
|
||||
return
|
||||
}
|
||||
if len(due) == 0 {
|
||||
return
|
||||
}
|
||||
seen := make(map[string]struct{}, len(due))
|
||||
for _, p := range due {
|
||||
if _, ok := seen[p.HostID]; ok {
|
||||
continue
|
||||
}
|
||||
seen[p.HostID] = struct{}{}
|
||||
if !s.deps.Hub.Connected(p.HostID) {
|
||||
continue
|
||||
}
|
||||
s.DrainPending(ctx, p.HostID)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user