package http import ( "context" "encoding/json" "log/slog" "time" "github.com/oklog/ulid/v2" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) // loadScheduleSetPayload reads the host's current schedule set + the // canonical version into a wire-shape payload. Returns an empty // (but well-formed) payload with version 0 if the host has nothing // scheduled yet — that's still a valid state to push so the agent // drops any stale cron entries from a previous deployment. func (s *Server) loadScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) { rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) if err != nil { return api.ScheduleSetPayload{}, err } version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) if err != nil { return api.ScheduleSetPayload{}, err } out := api.ScheduleSetPayload{ Version: version, Schedules: make([]api.Schedule, 0, len(rows)), } for _, r := range rows { retJSON, _ := json.Marshal(r.RetentionPolicy) optJSON, _ := json.Marshal(r.Options) out.Schedules = append(out.Schedules, api.Schedule{ ID: r.ID, Kind: api.JobKind(r.Kind), CronExpr: r.CronExpr, Paths: r.Paths, Excludes: r.Excludes, Tags: r.Tags, RetentionPolicy: retJSON, Options: optJSON, PreHook: r.PreHook, PostHook: r.PostHook, Enabled: r.Enabled, }) } return out, nil } // pushScheduleSet ships the current schedule list to the agent over // the hub. Caller has already determined the agent is connected. // Errors are logged and returned; the next push (or the next hello) // will retry. Idempotent — sending the same version twice is a // harmless no-op on the agent side. func (s *Server) pushScheduleSet(ctx context.Context, hostID string) error { pl, err := s.loadScheduleSetPayload(ctx, hostID) if err != nil { slog.Warn("push schedule.set: load failed", "host_id", hostID, "err", err) return err } env, err := api.Marshal(api.MsgScheduleSet, "", pl) if err != nil { return err } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { slog.Warn("push schedule.set: hub send failed", "host_id", hostID, "version", pl.Version, "err", err) return err } return nil } // pushScheduleSetOnConn is the on-hello flavour: writes directly to // the just-handshaken conn rather than racing through the hub. Used // by onAgentHello so a brand-new connection can't miss an early // push because Register hasn't completed yet. func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) { pl, err := s.loadScheduleSetPayload(ctx, hostID) if err != nil { slog.Warn("on-hello: load schedules", "host_id", hostID, "err", err) return } env, err := api.Marshal(api.MsgScheduleSet, "", pl) if err != nil { slog.Error("on-hello: marshal schedule.set", "host_id", hostID, "err", err) return } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.Send(sendCtx, env); err != nil { slog.Warn("on-hello: send schedule.set", "host_id", hostID, "version", pl.Version, "err", err) } } // pushIfConnected dispatches a schedule.set push asynchronously when // the agent is online. Used by CRUD handlers: a missed push is // non-fatal because the next reconnect's on-hello path will catch // the agent up. Decoupled from the request so the operator's HTTP // response doesn't wait on the WS round-trip. func (s *Server) pushScheduleSetAsync(hostID string) { if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) { return } go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = s.pushScheduleSet(ctx, hostID) }() } // applyScheduleAck records the version the agent has confirmed via // schedule.ack. Called from the WS dispatcher (wired below). A bad // version (zero, or higher than what we've issued) is logged but // not fatal — the next push will set the agent straight. func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) { if version <= 0 { return } canonical, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) if err != nil { slog.Warn("schedule.ack: load canonical version", "host_id", hostID, "err", err) return } if version > canonical { slog.Warn("schedule.ack: agent reported version ahead of server", "host_id", hostID, "agent", version, "server", canonical) return } if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil { slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "version", version, "err", err) return } slog.Info("schedule.ack: applied", "host_id", hostID, "version", version, "applied_at", appliedAt) } // dispatchScheduledJob is invoked when the agent reports a local // cron fire via `schedule.fire`. We look up the schedule, build the // CommandRunPayload from it, persist a job row (actor=schedule, // linked back to scheduled_id), and write MsgCommandRun straight // back on the same conn so the agent runs the job through its // normal command dispatch path. // // On any error we log and bail — the agent's cron will fire again // at the next tick. We deliberately don't try to retry: schedules // are by definition repeating, and a missed tick is less bad than // a confused operator-visible "phantom job" that never actually // ran restic. func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *ws.Conn, scheduleID string, scheduledAt time.Time) { sched, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID) if err != nil { slog.Warn("schedule.fire: schedule not found", "host_id", hostID, "schedule_id", scheduleID, "err", err) return } if !sched.Enabled { // The agent shouldn't be firing disabled schedules — its // local cron is rebuilt from the canonical version after // every push — but treat as belt-and-braces. slog.Info("schedule.fire: ignoring disabled schedule", "host_id", hostID, "schedule_id", scheduleID) return } // Args differ by kind. For backup we ship the schedule's paths; // other kinds are still arg-less in Phase 2 (forget/prune/check // take their parameters from RetentionPolicy / Options at exec // time on the agent — handled when those job kinds land). var args []string if sched.Kind == string(api.JobBackup) { args = append(args, sched.Paths...) } jobID := ulid.Make().String() now := time.Now().UTC() if err := s.deps.Store.CreateJob(ctx, store.Job{ ID: jobID, HostID: hostID, Kind: sched.Kind, ScheduledID: &sched.ID, ActorKind: "schedule", ActorID: &sched.ID, CreatedAt: now, }); err != nil { slog.Warn("schedule.fire: create job", "host_id", hostID, "schedule_id", scheduleID, "err", err) return } env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{ JobID: jobID, Kind: api.JobKind(sched.Kind), Args: args, }) if err != nil { slog.Error("schedule.fire: marshal command.run", "host_id", hostID, "schedule_id", scheduleID, "err", err) return } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.Send(sendCtx, env); err != nil { slog.Warn("schedule.fire: send command.run", "host_id", hostID, "job_id", jobID, "err", err) return } _ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ ID: ulid.Make().String(), Actor: "schedule", Action: "job.run_now", TargetKind: ptr("job"), TargetID: &jobID, TS: now, }) slog.Info("schedule.fire: dispatched", "host_id", hostID, "schedule_id", scheduleID, "job_id", jobID, "kind", sched.Kind, "scheduled_at", scheduledAt) } // Compile-time guard that the store actually implements the methods // schedule_push.go calls. Useful when mocking the store in tests. var _ scheduleStore = (*store.Store)(nil) type scheduleStore interface { ListSchedulesByHost(ctx context.Context, hostID string) ([]store.Schedule, error) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error }