package http import ( "context" "encoding/json" "errors" "log/slog" "time" "github.com/oklog/ulid/v2" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) // loadScheduleSetPayload reads the host's current schedule set + the // canonical version into a wire-shape payload. Returns an empty // (but well-formed) payload with version 0 if the host has nothing // scheduled yet — that's still a valid state to push so the agent // drops any stale cron entries from a previous deployment. func (s *Server) loadScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) { rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) if err != nil { return api.ScheduleSetPayload{}, err } version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) if err != nil { return api.ScheduleSetPayload{}, err } out := api.ScheduleSetPayload{ Version: version, Schedules: make([]api.Schedule, 0, len(rows)), } for _, r := range rows { retJSON, _ := json.Marshal(r.RetentionPolicy) optJSON, _ := json.Marshal(r.Options) out.Schedules = append(out.Schedules, api.Schedule{ ID: r.ID, Kind: api.JobKind(r.Kind), CronExpr: r.CronExpr, Paths: r.Paths, Excludes: r.Excludes, Tags: r.Tags, RetentionPolicy: retJSON, Options: optJSON, PreHook: r.PreHook, PostHook: r.PostHook, Enabled: r.Enabled, Manual: r.Manual, }) } return out, nil } // pushScheduleSet ships the current schedule list to the agent over // the hub. Caller has already determined the agent is connected. // Errors are logged and returned; the next push (or the next hello) // will retry. Idempotent — sending the same version twice is a // harmless no-op on the agent side. func (s *Server) pushScheduleSet(ctx context.Context, hostID string) error { pl, err := s.loadScheduleSetPayload(ctx, hostID) if err != nil { slog.Warn("push schedule.set: load failed", "host_id", hostID, "err", err) return err } env, err := api.Marshal(api.MsgScheduleSet, "", pl) if err != nil { return err } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { slog.Warn("push schedule.set: hub send failed", "host_id", hostID, "version", pl.Version, "err", err) return err } return nil } // pushScheduleSetOnConn is the on-hello flavour: writes directly to // the just-handshaken conn rather than racing through the hub. Used // by onAgentHello so a brand-new connection can't miss an early // push because Register hasn't completed yet. func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) { pl, err := s.loadScheduleSetPayload(ctx, hostID) if err != nil { slog.Warn("on-hello: load schedules", "host_id", hostID, "err", err) return } env, err := api.Marshal(api.MsgScheduleSet, "", pl) if err != nil { slog.Error("on-hello: marshal schedule.set", "host_id", hostID, "err", err) return } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.Send(sendCtx, env); err != nil { slog.Warn("on-hello: send schedule.set", "host_id", hostID, "version", pl.Version, "err", err) } } // pushIfConnected dispatches a schedule.set push asynchronously when // the agent is online. Used by CRUD handlers: a missed push is // non-fatal because the next reconnect's on-hello path will catch // the agent up. Decoupled from the request so the operator's HTTP // response doesn't wait on the WS round-trip. func (s *Server) pushScheduleSetAsync(hostID string) { if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) { return } go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = s.pushScheduleSet(ctx, hostID) }() } // applyScheduleAck records the version the agent has confirmed via // schedule.ack. Called from the WS dispatcher (wired below). A bad // version (zero, or higher than what we've issued) is logged but // not fatal — the next push will set the agent straight. func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) { if version <= 0 { return } canonical, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) if err != nil { slog.Warn("schedule.ack: load canonical version", "host_id", hostID, "err", err) return } if version > canonical { slog.Warn("schedule.ack: agent reported version ahead of server", "host_id", hostID, "agent", version, "server", canonical) return } if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil { slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "version", version, "err", err) return } slog.Info("schedule.ack: applied", "host_id", hostID, "version", version, "applied_at", appliedAt) } // dispatchScheduledJob is invoked when the agent reports a local // cron fire via `schedule.fire`. Thin wrapper around the shared // dispatcher; logs and discards the return values since the agent // can't usefully act on them. func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, _ *ws.Conn, scheduleID string, scheduledAt time.Time) { jobID, err := s.dispatchScheduleNow(ctx, hostID, scheduleID, nil) if err != nil { slog.Warn("schedule.fire: dispatch failed", "host_id", hostID, "schedule_id", scheduleID, "err", err) return } slog.Info("schedule.fire: dispatched", "host_id", hostID, "schedule_id", scheduleID, "job_id", jobID, "scheduled_at", scheduledAt) } // dispatchScheduleNow looks up a schedule, builds a CommandRunPayload, // persists a jobs row (actor_kind=schedule, scheduled_id linking // back), and ships MsgCommandRun to the host. Used by both the // agent-driven path (cron fire reaches us as schedule.fire) and the // UI-driven path (operator clicks Run-now on a schedule row). // // conn is optional: when set we write directly through it (no race // against an in-flight Register). When nil we fall back to Hub.Send. // Returns the new job_id on success. func (s *Server) dispatchScheduleNow(ctx context.Context, hostID, scheduleID string, conn *ws.Conn) (string, error) { sched, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID) if err != nil { if errors.Is(err, store.ErrNotFound) { return "", errFmtf("schedule not found") } return "", errFmtf("internal: %s", err) } if !sched.Enabled { return "", errFmtf("schedule is disabled") } var args []string if sched.Kind == string(api.JobBackup) { args = append(args, sched.Paths...) } jobID := ulid.Make().String() now := time.Now().UTC() if err := s.deps.Store.CreateJob(ctx, store.Job{ ID: jobID, HostID: hostID, Kind: sched.Kind, ScheduledID: &sched.ID, ActorKind: "schedule", ActorID: &sched.ID, CreatedAt: now, }); err != nil { return "", errFmtf("create job: %s", err) } env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{ JobID: jobID, Kind: api.JobKind(sched.Kind), Args: args, }) if err != nil { return "", errFmtf("marshal command.run: %s", err) } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if conn != nil { if err := conn.Send(sendCtx, env); err != nil { return "", errFmtf("send command.run: %s", err) } } else { if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { return "", errFmtf("send command.run: %s", err) } } _ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ ID: ulid.Make().String(), Actor: "schedule", Action: "job.run_now", TargetKind: ptr("job"), TargetID: &jobID, TS: now, }) return jobID, nil } // Compile-time guard that the store actually implements the methods // schedule_push.go calls. Useful when mocking the store in tests. var _ scheduleStore = (*store.Store)(nil) type scheduleStore interface { ListSchedulesByHost(ctx context.Context, hostID string) ([]store.Schedule, error) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error }