142 lines
5.1 KiB
Go
142 lines
5.1 KiB
Go
// catchup.go — server-side catch-up for intermittent (non-always-on)
|
|
// hosts. When such a host reconnects we wait a short settle window,
|
|
// then dispatch a backup for any schedule whose window elapsed while
|
|
// the host was asleep. This is separate from pending_runs: a host that
|
|
// was asleep never fired its local cron, so no pending row exists.
|
|
package http
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
)
|
|
|
|
// scheduleOverdue reports whether a schedule's most recent expected
|
|
// fire is newer than the host's last successful backup — i.e. a window
|
|
// passed with no backup. A nil lastBackup means "never backed up" and
|
|
// is always overdue (provided the cron parses). An unparseable cron is
|
|
// treated as not-overdue so a bad expression can never trigger a
|
|
// surprise dispatch. Uses the same cronParser the agent's scheduler
|
|
// and schedule validation use, so interpretation is identical.
|
|
func scheduleOverdue(cronExpr string, lastBackup *time.Time, now time.Time) bool {
|
|
sched, err := cronParser.Parse(cronExpr)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if lastBackup == nil {
|
|
return true
|
|
}
|
|
next := sched.Next(*lastBackup)
|
|
return !next.After(now)
|
|
}
|
|
|
|
// catchupSettle is how long after a reconnect we wait before evaluating
|
|
// catch-up, so a laptop that wakes briefly and sleeps again doesn't
|
|
// trigger a backup it can't finish. ~1 minute per the spec.
|
|
const catchupSettle = 60 * time.Second
|
|
|
|
// ArmCatchup records that an intermittent host just reconnected and
|
|
// should be evaluated for a missed backup after the settle window.
|
|
// No-op for always-on hosts (caller passes only intermittent hosts).
|
|
// Re-arming overwrites the timer (debounce — flapping doesn't stack).
|
|
func (s *Server) ArmCatchup(hostID string, now time.Time) {
|
|
s.catchupMu.Lock()
|
|
defer s.catchupMu.Unlock()
|
|
s.catchupDueAt[hostID] = now.Add(catchupSettle)
|
|
}
|
|
|
|
// dueCatchups returns the hostIDs whose settle window has elapsed and
|
|
// removes them from the map. Caller evaluates each.
|
|
func (s *Server) dueCatchups(now time.Time) []string {
|
|
s.catchupMu.Lock()
|
|
defer s.catchupMu.Unlock()
|
|
var due []string
|
|
for id, at := range s.catchupDueAt {
|
|
if !now.Before(at) {
|
|
due = append(due, id)
|
|
delete(s.catchupDueAt, id)
|
|
}
|
|
}
|
|
return due
|
|
}
|
|
|
|
// RunCatchupsDue is the tick entrypoint. For each host past its settle
|
|
// window it dispatches a backup for every enabled schedule that is
|
|
// overdue. Skips hosts that bounced back offline, that are already
|
|
// running/queued a job, or that turned out to be always-on.
|
|
func (s *Server) RunCatchupsDue(ctx context.Context) {
|
|
if s.deps.Hub == nil {
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
for _, hostID := range s.dueCatchups(now) {
|
|
s.runCatchup(ctx, hostID, now)
|
|
}
|
|
}
|
|
|
|
// runCatchup evaluates and dispatches catch-up backups for a single
|
|
// host. Kept separate so RunCatchupsDue reads cleanly.
|
|
func (s *Server) runCatchup(ctx context.Context, hostID string, now time.Time) {
|
|
conn := s.deps.Hub.Conn(hostID)
|
|
if conn == nil {
|
|
return // bounced offline during the settle window; re-arms on next hello
|
|
}
|
|
host, err := s.deps.Store.GetHost(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("catchup: load host", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
if host.AlwaysOn {
|
|
return // mode flipped during settle window
|
|
}
|
|
// Skip if a backup is already queued or running for this host —
|
|
// don't pile a catch-up on top of in-flight work. (hosts.current_job_id
|
|
// is not maintained, so we check the jobs table directly.)
|
|
active, err := s.deps.Store.HasActiveBackupJob(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("catchup: check active backup", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
if active {
|
|
return
|
|
}
|
|
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("catchup: list schedules", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
// NOTE: overdue is measured against host.LastBackupAt, which is the
|
|
// most recent *successful backup of any schedule* on this host — not
|
|
// a per-schedule timestamp. For the common intermittent host (a
|
|
// single backup schedule) this is exact. With multiple schedules of
|
|
// different cadences, a recent backup from one schedule can mask
|
|
// another schedule's missed window. Acceptable for v1; revisit with
|
|
// per-schedule last-success tracking if multi-cadence laptops appear.
|
|
for _, sc := range schedules {
|
|
if !sc.Enabled || len(sc.SourceGroupIDs) == 0 {
|
|
continue
|
|
}
|
|
if !scheduleOverdue(sc.CronExpr, host.LastBackupAt, now) {
|
|
continue
|
|
}
|
|
for _, gid := range sc.SourceGroupIDs {
|
|
g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid)
|
|
if err != nil {
|
|
slog.Warn("catchup: load source group",
|
|
"host_id", hostID, "schedule_id", sc.ID, "group_id", gid, "err", err)
|
|
continue
|
|
}
|
|
if _, derr := s.dispatchBackupForGroupCore(ctx, conn, hostID, sc.ID, g, now); derr != nil {
|
|
// Send failed for this group — host may have dropped
|
|
// again. Earlier groups in this batch were already
|
|
// dispatched; re-arm so a later reconnect re-evaluates
|
|
// any still-overdue schedules.
|
|
s.ArmCatchup(hostID, now)
|
|
return
|
|
}
|
|
slog.Info("catchup: dispatched missed backup",
|
|
"host_id", hostID, "schedule_id", sc.ID, "group", g.Name)
|
|
}
|
|
}
|
|
}
|