package http import ( "context" "encoding/json" "log/slog" "time" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) // loadScheduleSetPayload reads the host's current schedule set + the // canonical version into a wire-shape payload. Returns an empty // (but well-formed) payload with version 0 if the host has nothing // scheduled yet — that's still a valid state to push so the agent // drops any stale cron entries from a previous deployment. func (s *Server) loadScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) { rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) if err != nil { return api.ScheduleSetPayload{}, err } version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) if err != nil { return api.ScheduleSetPayload{}, err } out := api.ScheduleSetPayload{ Version: version, Schedules: make([]api.Schedule, 0, len(rows)), } for _, r := range rows { retJSON, _ := json.Marshal(r.RetentionPolicy) optJSON, _ := json.Marshal(r.Options) out.Schedules = append(out.Schedules, api.Schedule{ ID: r.ID, Kind: api.JobKind(r.Kind), CronExpr: r.CronExpr, Paths: r.Paths, Excludes: r.Excludes, Tags: r.Tags, RetentionPolicy: retJSON, Options: optJSON, PreHook: r.PreHook, PostHook: r.PostHook, Enabled: r.Enabled, }) } return out, nil } // pushScheduleSet ships the current schedule list to the agent over // the hub. Caller has already determined the agent is connected. // Errors are logged and returned; the next push (or the next hello) // will retry. Idempotent — sending the same version twice is a // harmless no-op on the agent side. func (s *Server) pushScheduleSet(ctx context.Context, hostID string) error { pl, err := s.loadScheduleSetPayload(ctx, hostID) if err != nil { slog.Warn("push schedule.set: load failed", "host_id", hostID, "err", err) return err } env, err := api.Marshal(api.MsgScheduleSet, "", pl) if err != nil { return err } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { slog.Warn("push schedule.set: hub send failed", "host_id", hostID, "version", pl.Version, "err", err) return err } return nil } // pushScheduleSetOnConn is the on-hello flavour: writes directly to // the just-handshaken conn rather than racing through the hub. Used // by onAgentHello so a brand-new connection can't miss an early // push because Register hasn't completed yet. func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) { pl, err := s.loadScheduleSetPayload(ctx, hostID) if err != nil { slog.Warn("on-hello: load schedules", "host_id", hostID, "err", err) return } env, err := api.Marshal(api.MsgScheduleSet, "", pl) if err != nil { slog.Error("on-hello: marshal schedule.set", "host_id", hostID, "err", err) return } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.Send(sendCtx, env); err != nil { slog.Warn("on-hello: send schedule.set", "host_id", hostID, "version", pl.Version, "err", err) } } // pushIfConnected dispatches a schedule.set push asynchronously when // the agent is online. Used by CRUD handlers: a missed push is // non-fatal because the next reconnect's on-hello path will catch // the agent up. Decoupled from the request so the operator's HTTP // response doesn't wait on the WS round-trip. func (s *Server) pushScheduleSetAsync(hostID string) { if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) { return } go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = s.pushScheduleSet(ctx, hostID) }() } // applyScheduleAck records the version the agent has confirmed via // schedule.ack. Called from the WS dispatcher (wired below). A bad // version (zero, or higher than what we've issued) is logged but // not fatal — the next push will set the agent straight. func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) { if version <= 0 { return } canonical, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) if err != nil { slog.Warn("schedule.ack: load canonical version", "host_id", hostID, "err", err) return } if version > canonical { slog.Warn("schedule.ack: agent reported version ahead of server", "host_id", hostID, "agent", version, "server", canonical) return } if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil { slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "version", version, "err", err) return } slog.Info("schedule.ack: applied", "host_id", hostID, "version", version, "applied_at", appliedAt) } // Compile-time guard that the store actually implements the methods // schedule_push.go calls. Useful when mocking the store in tests. var _ scheduleStore = (*store.Store)(nil) type scheduleStore interface { ListSchedulesByHost(ctx context.Context, hostID string) ([]store.Schedule, error) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error }