package scheduler import ( "log/slog" "sync" "time" "github.com/robfig/cron/v3" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" ) // Sender abstracts away the agent's outbound WS channel — we use it // to fire schedule.fire and schedule.ack envelopes back at the // server. Same shape as runner.Sender; deliberately not shared so // the scheduler can be tested without dragging in the runner. type Sender interface { Send(env api.Envelope) error } // Scheduler maintains the agent's local cron entries. Schedules // arrive from the server via Apply (driven by MsgScheduleSet); on // each fire, the entry sends a schedule.fire to the server and // lets the server's existing dispatch path turn that into a // command.run. The scheduler itself never builds CommandRunPayloads. // // Lifecycle: // - Start once at agent boot. // - Apply on every MsgScheduleSet — replaces the active cron with // a fresh one, then emits schedule.ack with the version we just // applied. // - Stop on agent shutdown. // // The active Sender is updated on every Apply call. This handles // reconnects naturally: a new connection's first MsgScheduleSet // re-arms the scheduler with a working tx; cron entries that fire // against a dropped connection just log and skip the tick. type Scheduler struct { mu sync.Mutex current *cron.Cron version int64 tx Sender } // New builds a Scheduler. Doesn't start any cron yet — Apply is // what brings the loop alive. func New() *Scheduler { return &Scheduler{} } // Stop halts whatever cron is currently running. Safe to call // multiple times. func (s *Scheduler) Stop() { s.mu.Lock() defer s.mu.Unlock() if s.current != nil { <-s.current.Stop().Done() s.current = nil } } // Apply reconciles the active cron with payload. Stops the old cron // (waiting for in-flight entries to return), builds a new one from // every enabled entry, starts it, and emits schedule.ack with // payload.Version. Schedule entries with malformed cron exprs are // logged and skipped — the server's validator should have caught // these, but better skip-and-warn than crash the loop. // // Payload's order doesn't matter; we always rebuild from scratch. // Empty Schedules is a valid input that effectively disables every // timed job for this host. func (s *Scheduler) Apply(payload api.ScheduleSetPayload, tx Sender) { s.mu.Lock() s.tx = tx // Stop the previous cron, if any. cron.Stop returns once the // scheduler has stopped firing new entries; in-flight ones // continue in their own goroutines, which is what we want // (otherwise a long-running backup would block reconciliation). if s.current != nil { <-s.current.Stop().Done() s.current = nil } c := cron.New() added := 0 for _, sch := range payload.Schedules { if !sch.Enabled { continue } // Capture by value so the closure doesn't share id across iters. entry := sch _, err := c.AddFunc(entry.CronExpr, func() { s.fire(entry) }) if err != nil { slog.Warn("scheduler: skipping entry with bad cron expr", "schedule_id", entry.ID, "expr", entry.CronExpr, "err", err) continue } added++ } c.Start() s.current = c s.version = payload.Version ackTx := s.tx s.mu.Unlock() slog.Info("scheduler: applied", "version", payload.Version, "received", len(payload.Schedules), "active", added) // Ack outside the lock — Send() shouldn't take long, but holding // s.mu across an external call would needlessly serialise other // callers (e.g. a future Status() inspection from the UI). ackEnv, err := api.Marshal(api.MsgScheduleAck, "", api.ScheduleAckPayload{ Version: payload.Version, AppliedAt: time.Now().UTC(), }) if err != nil { slog.Error("scheduler: marshal schedule.ack", "err", err) return } if ackTx == nil { return } if err := ackTx.Send(ackEnv); err != nil { slog.Warn("scheduler: send schedule.ack — server will retry on reconnect", "version", payload.Version, "err", err) } } // Version returns the schedule version currently applied. Useful for // tests + diagnostics. func (s *Scheduler) Version() int64 { s.mu.Lock() defer s.mu.Unlock() return s.version } // fire runs when one of the cron entries' time arrives. Sends a // schedule.fire envelope to the server, which is responsible for // minting the job_id, persisting the row, and shipping back a // command.run envelope that the agent's existing dispatcher will // then execute. Fire-and-log: if the WS write fails we skip this // tick — the next one will fire normally, and a flapping link is // already noisy elsewhere. func (s *Scheduler) fire(entry api.Schedule) { s.mu.Lock() tx := s.tx s.mu.Unlock() if tx == nil { slog.Info("scheduler: tick fired with no active connection — skipping", "schedule_id", entry.ID) return } env, err := api.Marshal(api.MsgScheduleFire, "", api.ScheduleFirePayload{ ScheduleID: entry.ID, ScheduledAt: time.Now().UTC(), }) if err != nil { slog.Error("scheduler: marshal schedule.fire", "schedule_id", entry.ID, "err", err) return } if err := tx.Send(env); err != nil { slog.Warn("scheduler: send schedule.fire — skipping this tick", "schedule_id", entry.ID, "err", err) } }