feat(catchup): arm on hello, fire missed-window backups on tick
This commit is contained in:
@@ -227,6 +227,7 @@ func run() error {
|
||||
}
|
||||
case <-pendingDrainTick.C:
|
||||
srv.DrainAllDue(ctx)
|
||||
srv.RunCatchupsDue(ctx)
|
||||
case <-pendingExpiryTick.C:
|
||||
if n, err := st.DeleteExpiredPendingHosts(ctx, time.Now().UTC()); err == nil && n > 0 {
|
||||
slog.Info("expired pending hosts swept", "n", n)
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -27,3 +29,99 @@ func scheduleOverdue(cronExpr string, lastBackup *time.Time, now time.Time) bool
|
||||
next := sched.Next(*lastBackup)
|
||||
return !next.After(now)
|
||||
}
|
||||
|
||||
// catchupSettle is how long after a reconnect we wait before evaluating
|
||||
// catch-up, so a laptop that wakes briefly and sleeps again doesn't
|
||||
// trigger a backup it can't finish. ~1 minute per the spec.
|
||||
const catchupSettle = 60 * time.Second
|
||||
|
||||
// ArmCatchup records that an intermittent host just reconnected and
|
||||
// should be evaluated for a missed backup after the settle window.
|
||||
// No-op for always-on hosts (caller passes only intermittent hosts).
|
||||
// Re-arming overwrites the timer (debounce — flapping doesn't stack).
|
||||
func (s *Server) ArmCatchup(hostID string, now time.Time) {
|
||||
s.catchupMu.Lock()
|
||||
defer s.catchupMu.Unlock()
|
||||
if s.catchupDueAt == nil {
|
||||
s.catchupDueAt = make(map[string]time.Time)
|
||||
}
|
||||
s.catchupDueAt[hostID] = now.Add(catchupSettle)
|
||||
}
|
||||
|
||||
// dueCatchups returns the hostIDs whose settle window has elapsed and
|
||||
// removes them from the map. Caller evaluates each.
|
||||
func (s *Server) dueCatchups(now time.Time) []string {
|
||||
s.catchupMu.Lock()
|
||||
defer s.catchupMu.Unlock()
|
||||
var due []string
|
||||
for id, at := range s.catchupDueAt {
|
||||
if !now.Before(at) {
|
||||
due = append(due, id)
|
||||
delete(s.catchupDueAt, id)
|
||||
}
|
||||
}
|
||||
return due
|
||||
}
|
||||
|
||||
// RunCatchupsDue is the tick entrypoint. For each host past its settle
|
||||
// window it dispatches a backup for every enabled schedule that is
|
||||
// overdue. Skips hosts that bounced back offline, that are already
|
||||
// running/queued a job, or that turned out to be always-on.
|
||||
func (s *Server) RunCatchupsDue(ctx context.Context) {
|
||||
if s.deps.Hub == nil {
|
||||
return
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
for _, hostID := range s.dueCatchups(now) {
|
||||
s.runCatchup(ctx, hostID, now)
|
||||
}
|
||||
}
|
||||
|
||||
// runCatchup evaluates and dispatches catch-up backups for a single
|
||||
// host. Kept separate so RunCatchupsDue reads cleanly.
|
||||
func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) {
|
||||
conn := s.deps.Hub.Conn(hostID)
|
||||
if conn == nil {
|
||||
return // bounced offline during the settle window; re-arms on next hello
|
||||
}
|
||||
host, err := s.deps.Store.GetHost(ctx, hostID)
|
||||
if err != nil {
|
||||
slog.Warn("catchup: load host", "host_id", hostID, "err", err)
|
||||
return
|
||||
}
|
||||
if host.AlwaysOn {
|
||||
return // mode flipped during settle window
|
||||
}
|
||||
if host.CurrentJobID != nil {
|
||||
return // a job is already running; don't pile on
|
||||
}
|
||||
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
|
||||
if err != nil {
|
||||
slog.Warn("catchup: list schedules", "host_id", hostID, "err", err)
|
||||
return
|
||||
}
|
||||
for _, sc := range schedules {
|
||||
if !sc.Enabled || len(sc.SourceGroupIDs) == 0 {
|
||||
continue
|
||||
}
|
||||
if !scheduleOverdue(sc.CronExpr, host.LastBackupAt, now) {
|
||||
continue
|
||||
}
|
||||
for _, gid := range sc.SourceGroupIDs {
|
||||
g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid)
|
||||
if err != nil {
|
||||
slog.Warn("catchup: load source group",
|
||||
"host_id", hostID, "schedule_id", sc.ID, "group_id", gid, "err", err)
|
||||
continue
|
||||
}
|
||||
if _, derr := s.dispatchBackupForGroupCore(ctx, conn, hostID, sc.ID, g, now); derr != nil {
|
||||
// Send failed — host dropped again. Re-arm so the next
|
||||
// reconnect retries; stop processing this host.
|
||||
s.ArmCatchup(hostID, now)
|
||||
return
|
||||
}
|
||||
slog.Info("catchup: dispatched missed backup",
|
||||
"host_id", hostID, "schedule_id", sc.ID, "group", g.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,6 +483,12 @@ func (s *Server) onAgentHello(ctx context.Context, hostID string, conn *ws.Conn)
|
||||
// and the drain may take seconds across many rows. A non-blocking
|
||||
// goroutine keeps the hello path snappy.
|
||||
go s.DrainPending(context.Background(), hostID)
|
||||
// Intermittent hosts that just reconnected may have slept through a
|
||||
// backup window. Arm a catch-up evaluation after a settle delay; the
|
||||
// pending-drain tick fires it. Always-on hosts never need this.
|
||||
if host, err := s.deps.Store.GetHost(ctx, hostID); err == nil && !host.AlwaysOn {
|
||||
s.ArmCatchup(hostID, time.Now().UTC())
|
||||
}
|
||||
}
|
||||
|
||||
// maybeAutoInit dispatches a `restic init` job iff the host has no
|
||||
|
||||
@@ -90,6 +90,13 @@ type Server struct {
|
||||
// directories (P3-X2). Pre-allocated in New so the lazy-init
|
||||
// race is impossible.
|
||||
treeCache *treeCache
|
||||
|
||||
// catchupDueAt tracks intermittent hosts that reconnected and are
|
||||
// in their settle window. Keyed hostID → earliest time to evaluate
|
||||
// catch-up. Best-effort + in-memory: a server restart simply re-arms
|
||||
// on the next hello. Guarded by catchupMu.
|
||||
catchupMu sync.Mutex
|
||||
catchupDueAt map[string]time.Time
|
||||
}
|
||||
|
||||
// New builds a configured but not-yet-started server.
|
||||
@@ -104,11 +111,12 @@ func New(deps Deps) *Server {
|
||||
r.Use(requestLogger)
|
||||
|
||||
s := &Server{
|
||||
deps: deps,
|
||||
drainLocks: make(map[string]*sync.Mutex),
|
||||
announceRL: newAnnounceLimiter(),
|
||||
pendingHub: newPendingHub(),
|
||||
treeCache: newTreeCache(),
|
||||
deps: deps,
|
||||
drainLocks: make(map[string]*sync.Mutex),
|
||||
announceRL: newAnnounceLimiter(),
|
||||
pendingHub: newPendingHub(),
|
||||
treeCache: newTreeCache(),
|
||||
catchupDueAt: make(map[string]time.Time),
|
||||
}
|
||||
s.routes(r)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user