// catchup.go — server-side catch-up for intermittent (non-always-on) // hosts. When such a host reconnects we wait a short settle window, // then dispatch a backup for any schedule whose window elapsed while // the host was asleep. This is separate from pending_runs: a host that // was asleep never fired its local cron, so no pending row exists. package http import ( "context" "log/slog" "time" ) // scheduleOverdue reports whether a schedule's most recent expected // fire is newer than the host's last successful backup — i.e. a window // passed with no backup. A nil lastBackup means "never backed up" and // is always overdue (provided the cron parses). An unparseable cron is // treated as not-overdue so a bad expression can never trigger a // surprise dispatch. Uses the same cronParser the agent's scheduler // and schedule validation use, so interpretation is identical. func scheduleOverdue(cronExpr string, lastBackup *time.Time, now time.Time) bool { sched, err := cronParser.Parse(cronExpr) if err != nil { return false } if lastBackup == nil { return true } next := sched.Next(*lastBackup) return !next.After(now) } // catchupSettle is how long after a reconnect we wait before evaluating // catch-up, so a laptop that wakes briefly and sleeps again doesn't // trigger a backup it can't finish. ~1 minute per the spec. const catchupSettle = 60 * time.Second // ArmCatchup records that an intermittent host just reconnected and // should be evaluated for a missed backup after the settle window. // No-op for always-on hosts (caller passes only intermittent hosts). // Re-arming overwrites the timer (debounce — flapping doesn't stack). func (s *Server) ArmCatchup(hostID string, now time.Time) { s.catchupMu.Lock() defer s.catchupMu.Unlock() s.catchupDueAt[hostID] = now.Add(catchupSettle) } // dueCatchups returns the hostIDs whose settle window has elapsed and // removes them from the map. Caller evaluates each. func (s *Server) dueCatchups(now time.Time) []string { s.catchupMu.Lock() defer s.catchupMu.Unlock() var due []string for id, at := range s.catchupDueAt { if !now.Before(at) { due = append(due, id) delete(s.catchupDueAt, id) } } return due } // RunCatchupsDue is the tick entrypoint. For each host past its settle // window it dispatches a backup for every enabled schedule that is // overdue. Skips hosts that bounced back offline, that are already // running/queued a job, or that turned out to be always-on. func (s *Server) RunCatchupsDue(ctx context.Context) { if s.deps.Hub == nil { return } now := time.Now().UTC() for _, hostID := range s.dueCatchups(now) { s.runCatchup(ctx, hostID, now) } } // runCatchup evaluates and dispatches catch-up backups for a single // host. Kept separate so RunCatchupsDue reads cleanly. func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) { conn := s.deps.Hub.Conn(hostID) if conn == nil { return // bounced offline during the settle window; re-arms on next hello } host, err := s.deps.Store.GetHost(ctx, hostID) if err != nil { slog.Warn("catchup: load host", "host_id", hostID, "err", err) return } if host.AlwaysOn { return // mode flipped during settle window } if host.CurrentJobID != nil { return // a job is already running; don't pile on } schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) if err != nil { slog.Warn("catchup: list schedules", "host_id", hostID, "err", err) return } // NOTE: overdue is measured against host.LastBackupAt, which is the // most recent *successful backup of any schedule* on this host — not // a per-schedule timestamp. For the common intermittent host (a // single backup schedule) this is exact. With multiple schedules of // different cadences, a recent backup from one schedule can mask // another schedule's missed window. Acceptable for v1; revisit with // per-schedule last-success tracking if multi-cadence laptops appear. for _, sc := range schedules { if !sc.Enabled || len(sc.SourceGroupIDs) == 0 { continue } if !scheduleOverdue(sc.CronExpr, host.LastBackupAt, now) { continue } for _, gid := range sc.SourceGroupIDs { g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid) if err != nil { slog.Warn("catchup: load source group", "host_id", hostID, "schedule_id", sc.ID, "group_id", gid, "err", err) continue } if _, derr := s.dispatchBackupForGroupCore(ctx, conn, hostID, sc.ID, g, now); derr != nil { // Send failed for this group — host may have dropped // again. Earlier groups in this batch were already // dispatched; re-arm so a later reconnect re-evaluates // any still-overdue schedules. s.ArmCatchup(hostID, now) return } slog.Info("catchup: dispatched missed backup", "host_id", hostID, "schedule_id", sc.ID, "group", g.Name) } } }