Files

142 lines
5.1 KiB
Go

// catchup.go — server-side catch-up for intermittent (non-always-on)
// hosts. When such a host reconnects we wait a short settle window,
// then dispatch a backup for any schedule whose window elapsed while
// the host was asleep. This is separate from pending_runs: a host that
// was asleep never fired its local cron, so no pending row exists.
package http
import (
"context"
"log/slog"
"time"
)
// scheduleOverdue reports whether a schedule's most recent expected
// fire is newer than the host's last successful backup — i.e. a window
// passed with no backup. A nil lastBackup means "never backed up" and
// is always overdue (provided the cron parses). An unparseable cron is
// treated as not-overdue so a bad expression can never trigger a
// surprise dispatch. Uses the same cronParser the agent's scheduler
// and schedule validation use, so interpretation is identical.
func scheduleOverdue(cronExpr string, lastBackup *time.Time, now time.Time) bool {
sched, err := cronParser.Parse(cronExpr)
if err != nil {
return false
}
if lastBackup == nil {
return true
}
next := sched.Next(*lastBackup)
return !next.After(now)
}
// catchupSettle is how long after a reconnect we wait before evaluating
// catch-up, so a laptop that wakes briefly and sleeps again doesn't
// trigger a backup it can't finish. ~1 minute per the spec.
const catchupSettle = 60 * time.Second
// ArmCatchup records that an intermittent host just reconnected and
// should be evaluated for a missed backup after the settle window.
// No-op for always-on hosts (caller passes only intermittent hosts).
// Re-arming overwrites the timer (debounce — flapping doesn't stack).
func (s *Server) ArmCatchup(hostID string, now time.Time) {
s.catchupMu.Lock()
defer s.catchupMu.Unlock()
s.catchupDueAt[hostID] = now.Add(catchupSettle)
}
// dueCatchups returns the hostIDs whose settle window has elapsed and
// removes them from the map. Caller evaluates each.
func (s *Server) dueCatchups(now time.Time) []string {
s.catchupMu.Lock()
defer s.catchupMu.Unlock()
var due []string
for id, at := range s.catchupDueAt {
if !now.Before(at) {
due = append(due, id)
delete(s.catchupDueAt, id)
}
}
return due
}
// RunCatchupsDue is the tick entrypoint. For each host past its settle
// window it dispatches a backup for every enabled schedule that is
// overdue. Skips hosts that bounced back offline, that are already
// running/queued a job, or that turned out to be always-on.
func (s *Server) RunCatchupsDue(ctx context.Context) {
if s.deps.Hub == nil {
return
}
now := time.Now().UTC()
for _, hostID := range s.dueCatchups(now) {
s.runCatchup(ctx, hostID, now)
}
}
// runCatchup evaluates and dispatches catch-up backups for a single
// host. Kept separate so RunCatchupsDue reads cleanly.
func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) {
conn := s.deps.Hub.Conn(hostID)
if conn == nil {
return // bounced offline during the settle window; re-arms on next hello
}
host, err := s.deps.Store.GetHost(ctx, hostID)
if err != nil {
slog.Warn("catchup: load host", "host_id", hostID, "err", err)
return
}
if host.AlwaysOn {
return // mode flipped during settle window
}
// Skip if a backup is already queued or running for this host —
// don't pile a catch-up on top of in-flight work. (hosts.current_job_id
// is not maintained, so we check the jobs table directly.)
active, err := s.deps.Store.HasActiveBackupJob(ctx, hostID)
if err != nil {
slog.Warn("catchup: check active backup", "host_id", hostID, "err", err)
return
}
if active {
return
}
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
if err != nil {
slog.Warn("catchup: list schedules", "host_id", hostID, "err", err)
return
}
// NOTE: overdue is measured against host.LastBackupAt, which is the
// most recent *successful backup of any schedule* on this host — not
// a per-schedule timestamp. For the common intermittent host (a
// single backup schedule) this is exact. With multiple schedules of
// different cadences, a recent backup from one schedule can mask
// another schedule's missed window. Acceptable for v1; revisit with
// per-schedule last-success tracking if multi-cadence laptops appear.
for _, sc := range schedules {
if !sc.Enabled || len(sc.SourceGroupIDs) == 0 {
continue
}
if !scheduleOverdue(sc.CronExpr, host.LastBackupAt, now) {
continue
}
for _, gid := range sc.SourceGroupIDs {
g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid)
if err != nil {
slog.Warn("catchup: load source group",
"host_id", hostID, "schedule_id", sc.ID, "group_id", gid, "err", err)
continue
}
if _, derr := s.dispatchBackupForGroupCore(ctx, conn, hostID, sc.ID, g, now); derr != nil {
// Send failed for this group — host may have dropped
// again. Earlier groups in this batch were already
// dispatched; re-arm so a later reconnect re-evaluates
// any still-overdue schedules.
s.ArmCatchup(hostID, now)
return
}
slog.Info("catchup: dispatched missed backup",
"host_id", hostID, "schedule_id", sc.ID, "group", g.Name)
}
}
}