From 5c4e0275d9c5c88a4090f952f88e6ddc54cae85b Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Mon, 15 Jun 2026 21:02:04 +0100 Subject: [PATCH] feat(catchup): arm on hello, fire missed-window backups on tick --- cmd/server/main.go | 1 + internal/server/http/catchup.go | 98 ++++++++++++++++++++++++ internal/server/http/host_credentials.go | 6 ++ internal/server/http/server.go | 18 +++-- 4 files changed, 118 insertions(+), 5 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 1cd5151..ccb962b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -227,6 +227,7 @@ func run() error { } case <-pendingDrainTick.C: srv.DrainAllDue(ctx) + srv.RunCatchupsDue(ctx) case <-pendingExpiryTick.C: if n, err := st.DeleteExpiredPendingHosts(ctx, time.Now().UTC()); err == nil && n > 0 { slog.Info("expired pending hosts swept", "n", n) diff --git a/internal/server/http/catchup.go b/internal/server/http/catchup.go index 15ae429..68f7d92 100644 --- a/internal/server/http/catchup.go +++ b/internal/server/http/catchup.go @@ -6,6 +6,8 @@ package http import ( + "context" + "log/slog" "time" ) @@ -27,3 +29,99 @@ func scheduleOverdue(cronExpr string, lastBackup *time.Time, now time.Time) bool next := sched.Next(*lastBackup) return !next.After(now) } + +// catchupSettle is how long after a reconnect we wait before evaluating +// catch-up, so a laptop that wakes briefly and sleeps again doesn't +// trigger a backup it can't finish. ~1 minute per the spec. +const catchupSettle = 60 * time.Second + +// ArmCatchup records that an intermittent host just reconnected and +// should be evaluated for a missed backup after the settle window. +// No-op for always-on hosts (caller passes only intermittent hosts). +// Re-arming overwrites the timer (debounce — flapping doesn't stack). +func (s *Server) ArmCatchup(hostID string, now time.Time) { + s.catchupMu.Lock() + defer s.catchupMu.Unlock() + if s.catchupDueAt == nil { + s.catchupDueAt = make(map[string]time.Time) + } + s.catchupDueAt[hostID] = now.Add(catchupSettle) +} + +// dueCatchups returns the hostIDs whose settle window has elapsed and +// removes them from the map. Caller evaluates each. +func (s *Server) dueCatchups(now time.Time) []string { + s.catchupMu.Lock() + defer s.catchupMu.Unlock() + var due []string + for id, at := range s.catchupDueAt { + if !now.Before(at) { + due = append(due, id) + delete(s.catchupDueAt, id) + } + } + return due +} + +// RunCatchupsDue is the tick entrypoint. For each host past its settle +// window it dispatches a backup for every enabled schedule that is +// overdue. Skips hosts that bounced back offline, that are already +// running/queued a job, or that turned out to be always-on. +func (s *Server) RunCatchupsDue(ctx context.Context) { + if s.deps.Hub == nil { + return + } + now := time.Now().UTC() + for _, hostID := range s.dueCatchups(now) { + s.runCatchup(ctx, hostID, now) + } +} + +// runCatchup evaluates and dispatches catch-up backups for a single +// host. Kept separate so RunCatchupsDue reads cleanly. +func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) { + conn := s.deps.Hub.Conn(hostID) + if conn == nil { + return // bounced offline during the settle window; re-arms on next hello + } + host, err := s.deps.Store.GetHost(ctx, hostID) + if err != nil { + slog.Warn("catchup: load host", "host_id", hostID, "err", err) + return + } + if host.AlwaysOn { + return // mode flipped during settle window + } + if host.CurrentJobID != nil { + return // a job is already running; don't pile on + } + schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) + if err != nil { + slog.Warn("catchup: list schedules", "host_id", hostID, "err", err) + return + } + for _, sc := range schedules { + if !sc.Enabled || len(sc.SourceGroupIDs) == 0 { + continue + } + if !scheduleOverdue(sc.CronExpr, host.LastBackupAt, now) { + continue + } + for _, gid := range sc.SourceGroupIDs { + g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid) + if err != nil { + slog.Warn("catchup: load source group", + "host_id", hostID, "schedule_id", sc.ID, "group_id", gid, "err", err) + continue + } + if _, derr := s.dispatchBackupForGroupCore(ctx, conn, hostID, sc.ID, g, now); derr != nil { + // Send failed — host dropped again. Re-arm so the next + // reconnect retries; stop processing this host. + s.ArmCatchup(hostID, now) + return + } + slog.Info("catchup: dispatched missed backup", + "host_id", hostID, "schedule_id", sc.ID, "group", g.Name) + } + } +} diff --git a/internal/server/http/host_credentials.go b/internal/server/http/host_credentials.go index ed8b504..5a52f4e 100644 --- a/internal/server/http/host_credentials.go +++ b/internal/server/http/host_credentials.go @@ -483,6 +483,12 @@ func (s *Server) onAgentHello(ctx context.Context, hostID string, conn *ws.Conn) // and the drain may take seconds across many rows. A non-blocking // goroutine keeps the hello path snappy. go s.DrainPending(context.Background(), hostID) + // Intermittent hosts that just reconnected may have slept through a + // backup window. Arm a catch-up evaluation after a settle delay; the + // pending-drain tick fires it. Always-on hosts never need this. + if host, err := s.deps.Store.GetHost(ctx, hostID); err == nil && !host.AlwaysOn { + s.ArmCatchup(hostID, time.Now().UTC()) + } } // maybeAutoInit dispatches a `restic init` job iff the host has no diff --git a/internal/server/http/server.go b/internal/server/http/server.go index 7d79cbf..15f8473 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -90,6 +90,13 @@ type Server struct { // directories (P3-X2). Pre-allocated in New so the lazy-init // race is impossible. treeCache *treeCache + + // catchupDueAt tracks intermittent hosts that reconnected and are + // in their settle window. Keyed hostID → earliest time to evaluate + // catch-up. Best-effort + in-memory: a server restart simply re-arms + // on the next hello. Guarded by catchupMu. + catchupMu sync.Mutex + catchupDueAt map[string]time.Time } // New builds a configured but not-yet-started server. @@ -104,11 +111,12 @@ func New(deps Deps) *Server { r.Use(requestLogger) s := &Server{ - deps: deps, - drainLocks: make(map[string]*sync.Mutex), - announceRL: newAnnounceLimiter(), - pendingHub: newPendingHub(), - treeCache: newTreeCache(), + deps: deps, + drainLocks: make(map[string]*sync.Mutex), + announceRL: newAnnounceLimiter(), + pendingHub: newPendingHub(), + treeCache: newTreeCache(), + catchupDueAt: make(map[string]time.Time), } s.routes(r)