From 6450bf1b885281631f50dfd7135e403645c90e6f Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sat, 2 May 2026 11:29:12 +0100 Subject: [PATCH] P2-02 (agent side) + P2-03: agent scheduler + schedule.fire dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the schedule reconciliation loop end-to-end. * New `internal/agent/scheduler` package wraps robfig/cron/v3 with the lifecycle the agent needs: - Apply(ScheduleSetPayload, Sender) stops the prior cron (waiting for in-flight entries to return), rebuilds from scratch, starts, and emits schedule.ack with the version we just applied. - Disabled entries skipped silently; bad cron exprs (which shouldn't reach us — the server validates — but defensive) log a warn and skip. - On each cron tick the entry sends a new schedule.fire envelope to the server with {schedule_id, scheduled_at}. The scheduler itself never builds CommandRunPayloads — server is the source of truth for jobs. - tx is swapped on every Apply, so reconnect is handled naturally: cron entries that fire against a dropped tx log "no active connection" and skip the tick. - Stop() is idempotent and waits for the cron's in-flight workers via cron.Stop().Done(). * New wire message api.MsgScheduleFire + api.ScheduleFirePayload for the agent → server "I just fired locally" RPC. * Server-side dispatch (schedule_push.go: dispatchScheduledJob): looks up the schedule by id, validates ownership + that it's enabled, builds args from kind (paths for backup; other kinds are still arg-less in Phase 2 and grow as those job kinds land in P2-05..08), persists a jobs row with actor_kind=schedule + scheduled_id, and writes command.run back on the same conn so the agent runs through its existing dispatch path. * store.CreateJob now writes scheduled_id. This column was in the schema since 0001 but never populated — the original P1 path only had operator-driven jobs, so actor_kind was always 'user' and scheduled_id was always nil. * cmd/agent/main.go integration: dispatcher gains a *scheduler.Scheduler; the MsgScheduleSet case now hands the payload to scheduler.Apply (in a goroutine so the WS read loop keeps draining other messages). * WS dispatcher gains OnScheduleFire alongside OnScheduleAck. * Tests: - scheduler unit tests (4): ack-on-apply, cron tick fires schedule.fire envelope, disabled entries don't fire, replace- prior-state stops the old cron. - Server-side end-to-end: schedule.fire → command.run with the right job_id / kind / args, plus jobs row with actor_kind= "schedule" and scheduled_id linking back to the schedule. Persistence of next-fire times across agent restarts is deliberately deferred. A missed fire window during downtime simply fires once on reconnect — that's the desirable behaviour (the operator wants the missed backup to run, not be silently skipped because we lost track of when it was due). Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/agent/main.go | 14 +- internal/agent/scheduler/scheduler.go | 170 +++++++++++++++++++++ internal/agent/scheduler/scheduler_test.go | 159 +++++++++++++++++++ internal/api/messages.go | 11 ++ internal/api/wire.go | 1 + internal/server/http/schedule_push.go | 86 +++++++++++ internal/server/http/schedule_push_test.go | 92 +++++++++++ internal/server/http/server.go | 11 +- internal/server/ws/handler.go | 15 ++ internal/store/jobs.go | 11 +- tasks.md | 4 +- 11 files changed, 561 insertions(+), 13 deletions(-) create mode 100644 internal/agent/scheduler/scheduler.go create mode 100644 internal/agent/scheduler/scheduler_test.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 8b03b58..fe74351 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -13,6 +13,7 @@ import ( "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner" + "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/scheduler" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/secrets" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient" @@ -103,6 +104,7 @@ func run() error { d := &dispatcher{ resticBin: resticBin, secrets: sec, + scheduler: scheduler.New(), } if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil { return fmt.Errorf("ws run: %w", err) @@ -166,6 +168,7 @@ func openSecretsStore(cfg *config.Config) (*secrets.Store, error) { type dispatcher struct { resticBin string secrets *secrets.Store + scheduler *scheduler.Scheduler } func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error { @@ -182,8 +185,15 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S slog.Info("ws agent: command.cancel received (cancellation lands in P2)", "id", env.ID) case api.MsgScheduleSet: - // TODO(P2): apply the schedule. - slog.Info("ws agent: schedule.set received (handled in P2)", "id", env.ID) + var p api.ScheduleSetPayload + if err := env.UnmarshalPayload(&p); err != nil { + return fmt.Errorf("schedule.set: %w", err) + } + // scheduler.Apply rebuilds the local cron from scratch and + // emits schedule.ack via tx. Async-safe: tx may have to wait + // briefly on the connection's writeMu, but the read loop + // keeps draining other messages. + go d.scheduler.Apply(p, tx) case api.MsgConfigUpdate: var p api.ConfigUpdatePayload diff --git a/internal/agent/scheduler/scheduler.go b/internal/agent/scheduler/scheduler.go new file mode 100644 index 0000000..c3ede1f --- /dev/null +++ b/internal/agent/scheduler/scheduler.go @@ -0,0 +1,170 @@ +package scheduler + +import ( + "log/slog" + "sync" + "time" + + "github.com/robfig/cron/v3" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" +) + +// Sender abstracts away the agent's outbound WS channel — we use it +// to fire schedule.fire and schedule.ack envelopes back at the +// server. Same shape as runner.Sender; deliberately not shared so +// the scheduler can be tested without dragging in the runner. +type Sender interface { + Send(env api.Envelope) error +} + +// Scheduler maintains the agent's local cron entries. Schedules +// arrive from the server via Apply (driven by MsgScheduleSet); on +// each fire, the entry sends a schedule.fire to the server and +// lets the server's existing dispatch path turn that into a +// command.run. The scheduler itself never builds CommandRunPayloads. +// +// Lifecycle: +// - Start once at agent boot. +// - Apply on every MsgScheduleSet — replaces the active cron with +// a fresh one, then emits schedule.ack with the version we just +// applied. +// - Stop on agent shutdown. +// +// The active Sender is updated on every Apply call. This handles +// reconnects naturally: a new connection's first MsgScheduleSet +// re-arms the scheduler with a working tx; cron entries that fire +// against a dropped connection just log and skip the tick. +type Scheduler struct { + mu sync.Mutex + current *cron.Cron + version int64 + tx Sender +} + +// New builds a Scheduler. Doesn't start any cron yet — Apply is +// what brings the loop alive. +func New() *Scheduler { + return &Scheduler{} +} + +// Stop halts whatever cron is currently running. Safe to call +// multiple times. +func (s *Scheduler) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.current != nil { + <-s.current.Stop().Done() + s.current = nil + } +} + +// Apply reconciles the active cron with payload. Stops the old cron +// (waiting for in-flight entries to return), builds a new one from +// every enabled entry, starts it, and emits schedule.ack with +// payload.Version. Schedule entries with malformed cron exprs are +// logged and skipped — the server's validator should have caught +// these, but better skip-and-warn than crash the loop. +// +// Payload's order doesn't matter; we always rebuild from scratch. +// Empty Schedules is a valid input that effectively disables every +// timed job for this host. +func (s *Scheduler) Apply(payload api.ScheduleSetPayload, tx Sender) { + s.mu.Lock() + s.tx = tx + + // Stop the previous cron, if any. cron.Stop returns once the + // scheduler has stopped firing new entries; in-flight ones + // continue in their own goroutines, which is what we want + // (otherwise a long-running backup would block reconciliation). + if s.current != nil { + <-s.current.Stop().Done() + s.current = nil + } + + c := cron.New() + added := 0 + for _, sch := range payload.Schedules { + if !sch.Enabled { + continue + } + // Capture by value so the closure doesn't share id across iters. + entry := sch + _, err := c.AddFunc(entry.CronExpr, func() { + s.fire(entry) + }) + if err != nil { + slog.Warn("scheduler: skipping entry with bad cron expr", + "schedule_id", entry.ID, "expr", entry.CronExpr, "err", err) + continue + } + added++ + } + c.Start() + s.current = c + s.version = payload.Version + ackTx := s.tx + s.mu.Unlock() + + slog.Info("scheduler: applied", "version", payload.Version, + "received", len(payload.Schedules), "active", added) + + // Ack outside the lock — Send() shouldn't take long, but holding + // s.mu across an external call would needlessly serialise other + // callers (e.g. a future Status() inspection from the UI). + ackEnv, err := api.Marshal(api.MsgScheduleAck, "", api.ScheduleAckPayload{ + Version: payload.Version, + AppliedAt: time.Now().UTC(), + }) + if err != nil { + slog.Error("scheduler: marshal schedule.ack", "err", err) + return + } + if ackTx == nil { + return + } + if err := ackTx.Send(ackEnv); err != nil { + slog.Warn("scheduler: send schedule.ack — server will retry on reconnect", + "version", payload.Version, "err", err) + } +} + +// Version returns the schedule version currently applied. Useful for +// tests + diagnostics. +func (s *Scheduler) Version() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.version +} + +// fire runs when one of the cron entries' time arrives. Sends a +// schedule.fire envelope to the server, which is responsible for +// minting the job_id, persisting the row, and shipping back a +// command.run envelope that the agent's existing dispatcher will +// then execute. Fire-and-log: if the WS write fails we skip this +// tick — the next one will fire normally, and a flapping link is +// already noisy elsewhere. +func (s *Scheduler) fire(entry api.Schedule) { + s.mu.Lock() + tx := s.tx + s.mu.Unlock() + if tx == nil { + slog.Info("scheduler: tick fired with no active connection — skipping", + "schedule_id", entry.ID) + return + } + env, err := api.Marshal(api.MsgScheduleFire, "", api.ScheduleFirePayload{ + ScheduleID: entry.ID, + ScheduledAt: time.Now().UTC(), + }) + if err != nil { + slog.Error("scheduler: marshal schedule.fire", + "schedule_id", entry.ID, "err", err) + return + } + if err := tx.Send(env); err != nil { + slog.Warn("scheduler: send schedule.fire — skipping this tick", + "schedule_id", entry.ID, "err", err) + } +} + diff --git a/internal/agent/scheduler/scheduler_test.go b/internal/agent/scheduler/scheduler_test.go new file mode 100644 index 0000000..dfe4349 --- /dev/null +++ b/internal/agent/scheduler/scheduler_test.go @@ -0,0 +1,159 @@ +package scheduler + +import ( + "sync" + "testing" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" +) + +// recSender is a Sender that records every envelope it gets. Tests +// inspect it after a tick to assert the right messages were emitted. +type recSender struct { + mu sync.Mutex + envs []api.Envelope +} + +func (r *recSender) Send(env api.Envelope) error { + r.mu.Lock() + defer r.mu.Unlock() + r.envs = append(r.envs, env) + return nil +} + +func (r *recSender) snapshot() []api.Envelope { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]api.Envelope, len(r.envs)) + copy(out, r.envs) + return out +} + +func TestApplyEmitsAck(t *testing.T) { + t.Parallel() + tx := &recSender{} + s := New() + defer s.Stop() + + s.Apply(api.ScheduleSetPayload{ + Version: 7, + Schedules: []api.Schedule{ + {ID: "s1", Kind: api.JobBackup, CronExpr: "@hourly", Enabled: true}, + }, + }, tx) + + if got := s.Version(); got != 7 { + t.Fatalf("Version: got %d, want 7", got) + } + + envs := tx.snapshot() + if len(envs) != 1 { + t.Fatalf("expected 1 envelope (ack), got %d", len(envs)) + } + if envs[0].Type != api.MsgScheduleAck { + t.Fatalf("envelope type: got %s, want %s", envs[0].Type, api.MsgScheduleAck) + } + var ack api.ScheduleAckPayload + _ = envs[0].UnmarshalPayload(&ack) + if ack.Version != 7 { + t.Fatalf("ack version: got %d", ack.Version) + } +} + +func TestApplyTickFiresScheduleFire(t *testing.T) { + t.Parallel() + tx := &recSender{} + s := New() + defer s.Stop() + + // Cron expression that fires roughly every second; close enough + // to be reliable in CI without making the test slow. + s.Apply(api.ScheduleSetPayload{ + Version: 1, + Schedules: []api.Schedule{ + {ID: "every-second", Kind: api.JobBackup, CronExpr: "@every 1s", Enabled: true}, + }, + }, tx) + + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + envs := tx.snapshot() + for _, e := range envs { + if e.Type == api.MsgScheduleFire { + var p api.ScheduleFirePayload + _ = e.UnmarshalPayload(&p) + if p.ScheduleID == "every-second" { + return + } + } + } + time.Sleep(50 * time.Millisecond) + } + t.Fatal("schedule.fire did not arrive within 3s") +} + +func TestApplyDisabledEntriesSkipped(t *testing.T) { + t.Parallel() + tx := &recSender{} + s := New() + defer s.Stop() + + s.Apply(api.ScheduleSetPayload{ + Version: 1, + Schedules: []api.Schedule{ + {ID: "off", Kind: api.JobBackup, CronExpr: "@every 1s", Enabled: false}, + }, + }, tx) + + // A disabled schedule must never fire — give the cron a couple + // of ticks to confirm it's silent. + time.Sleep(2200 * time.Millisecond) + for _, e := range tx.snapshot() { + if e.Type == api.MsgScheduleFire { + t.Fatalf("disabled schedule fired: %+v", e) + } + } +} + +func TestApplyReplacesPriorState(t *testing.T) { + t.Parallel() + tx := &recSender{} + s := New() + defer s.Stop() + + s.Apply(api.ScheduleSetPayload{ + Version: 1, + Schedules: []api.Schedule{ + {ID: "old", Kind: api.JobBackup, CronExpr: "@every 1s", Enabled: true}, + }, + }, tx) + + // Wait long enough for the first version to fire at least once. + time.Sleep(1500 * time.Millisecond) + + // Now replace with version 2 that doesn't include "old". + s.Apply(api.ScheduleSetPayload{ + Version: 2, + Schedules: []api.Schedule{}, + }, tx) + + // Snapshot count *after* the replacement. + before := 0 + for _, e := range tx.snapshot() { + if e.Type == api.MsgScheduleFire { + before++ + } + } + time.Sleep(2 * time.Second) + after := 0 + for _, e := range tx.snapshot() { + if e.Type == api.MsgScheduleFire { + after++ + } + } + if after != before { + t.Fatalf("schedule.fire count grew after replacement (before=%d after=%d) — old cron still firing", + before, after) + } +} diff --git a/internal/api/messages.go b/internal/api/messages.go index 0a7b648..f5b20ca 100644 --- a/internal/api/messages.go +++ b/internal/api/messages.go @@ -197,6 +197,17 @@ type ScheduleAckPayload struct { AppliedAt time.Time `json:"applied_at"` } +// ScheduleFirePayload — agent reports a local cron entry just fired. +// Server is expected to look up the schedule, build a CommandRun +// payload from it, persist a job row, and return MsgCommandRun on +// the same connection. ScheduledAt is the wall-clock time the +// agent's cron fired (audit / forensic value when network jitter +// pushes the actual command.run dispatch later). +type ScheduleFirePayload struct { + ScheduleID string `json:"schedule_id"` + ScheduledAt time.Time `json:"scheduled_at"` +} + // ConfigUpdatePayload — server pushes per-host config (currently just // repo connection details). Empty fields mean "leave existing alone"; // to clear something, send an explicit zero value. diff --git a/internal/api/wire.go b/internal/api/wire.go index 087baa8..d551bb0 100644 --- a/internal/api/wire.go +++ b/internal/api/wire.go @@ -21,6 +21,7 @@ const ( MsgRepoStats MessageType = "repo.stats" MsgLogStream MessageType = "log.stream" MsgScheduleAck MessageType = "schedule.ack" + MsgScheduleFire MessageType = "schedule.fire" // agent: a local cron entry fired, please dispatch a job MsgCommandResult MessageType = "command.result" // ack for command.run MsgError MessageType = "error" ) diff --git a/internal/server/http/schedule_push.go b/internal/server/http/schedule_push.go index 729d26b..7b9fdd7 100644 --- a/internal/server/http/schedule_push.go +++ b/internal/server/http/schedule_push.go @@ -6,6 +6,8 @@ import ( "log/slog" "time" + "github.com/oklog/ulid/v2" + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" @@ -141,6 +143,90 @@ func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version in "host_id", hostID, "version", version, "applied_at", appliedAt) } +// dispatchScheduledJob is invoked when the agent reports a local +// cron fire via `schedule.fire`. We look up the schedule, build the +// CommandRunPayload from it, persist a job row (actor=schedule, +// linked back to scheduled_id), and write MsgCommandRun straight +// back on the same conn so the agent runs the job through its +// normal command dispatch path. +// +// On any error we log and bail — the agent's cron will fire again +// at the next tick. We deliberately don't try to retry: schedules +// are by definition repeating, and a missed tick is less bad than +// a confused operator-visible "phantom job" that never actually +// ran restic. +func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *ws.Conn, scheduleID string, scheduledAt time.Time) { + sched, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID) + if err != nil { + slog.Warn("schedule.fire: schedule not found", + "host_id", hostID, "schedule_id", scheduleID, "err", err) + return + } + if !sched.Enabled { + // The agent shouldn't be firing disabled schedules — its + // local cron is rebuilt from the canonical version after + // every push — but treat as belt-and-braces. + slog.Info("schedule.fire: ignoring disabled schedule", + "host_id", hostID, "schedule_id", scheduleID) + return + } + + // Args differ by kind. For backup we ship the schedule's paths; + // other kinds are still arg-less in Phase 2 (forget/prune/check + // take their parameters from RetentionPolicy / Options at exec + // time on the agent — handled when those job kinds land). + var args []string + if sched.Kind == string(api.JobBackup) { + args = append(args, sched.Paths...) + } + + jobID := ulid.Make().String() + now := time.Now().UTC() + if err := s.deps.Store.CreateJob(ctx, store.Job{ + ID: jobID, + HostID: hostID, + Kind: sched.Kind, + ScheduledID: &sched.ID, + ActorKind: "schedule", + ActorID: &sched.ID, + CreatedAt: now, + }); err != nil { + slog.Warn("schedule.fire: create job", + "host_id", hostID, "schedule_id", scheduleID, "err", err) + return + } + + env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{ + JobID: jobID, + Kind: api.JobKind(sched.Kind), + Args: args, + }) + if err != nil { + slog.Error("schedule.fire: marshal command.run", + "host_id", hostID, "schedule_id", scheduleID, "err", err) + return + } + sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := conn.Send(sendCtx, env); err != nil { + slog.Warn("schedule.fire: send command.run", + "host_id", hostID, "job_id", jobID, "err", err) + return + } + + _ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ + ID: ulid.Make().String(), + Actor: "schedule", + Action: "job.run_now", + TargetKind: ptr("job"), + TargetID: &jobID, + TS: now, + }) + slog.Info("schedule.fire: dispatched", + "host_id", hostID, "schedule_id", scheduleID, + "job_id", jobID, "kind", sched.Kind, "scheduled_at", scheduledAt) +} + // Compile-time guard that the store actually implements the methods // schedule_push.go calls. Useful when mocking the store in tests. var _ scheduleStore = (*store.Store)(nil) diff --git a/internal/server/http/schedule_push_test.go b/internal/server/http/schedule_push_test.go index 712fba1..293b21c 100644 --- a/internal/server/http/schedule_push_test.go +++ b/internal/server/http/schedule_push_test.go @@ -148,6 +148,98 @@ func TestSchedulePushOnHelloAndAckRoundtrip(t *testing.T) { h.AppliedScheduleVersion, pushed.Version) } +func TestScheduleFireDispatchesCommandRun(t *testing.T) { + t.Parallel() + srv, url, st := newTestServerWithHub(t) + _ = srv + cookie := loginAndCookie(t, url) + hostID, agentToken := makePushHost(t, st) + + // Pre-create one backup schedule. + body, _ := json.Marshal(scheduleAPI{ + Kind: "backup", CronExpr: "@hourly", + Paths: []string{"/etc/hostname"}, Enabled: true, + }) + req, _ := stdhttp.NewRequest("POST", + url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(body)) + req.AddCookie(cookie) + req.Header.Set("Content-Type", "application/json") + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("create: %v", err) + } + got, _ := io.ReadAll(res.Body) + res.Body.Close() + var created scheduleAPI + _ = json.Unmarshal(got, &created) + + // Connect as the agent. + wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ + HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}}, + }) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer c.CloseNow() + + helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{ + ProtocolVersion: api.CurrentProtocolVersion, + AgentVersion: "test", ResticVersion: "test", + Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64, + }) + raw, _ := json.Marshal(helloEnv) + _ = c.Write(ctx, websocket.MessageText, raw) + + // Drain the on-hello schedule.set. + _ = readUntilType(ctx, t, c, api.MsgScheduleSet) + + // Pretend our local cron just fired this schedule. + fireEnv, _ := api.Marshal(api.MsgScheduleFire, "", api.ScheduleFirePayload{ + ScheduleID: created.ID, + ScheduledAt: time.Now().UTC(), + }) + raw, _ = json.Marshal(fireEnv) + if err := c.Write(ctx, websocket.MessageText, raw); err != nil { + t.Fatalf("write fire: %v", err) + } + + // Server should respond with command.run. + cmdEnv := readUntilType(ctx, t, c, api.MsgCommandRun) + var cmd api.CommandRunPayload + if err := cmdEnv.UnmarshalPayload(&cmd); err != nil { + t.Fatalf("decode command.run: %v", err) + } + if cmd.JobID == "" || cmd.Kind != api.JobBackup { + t.Fatalf("command.run: %+v", cmd) + } + if len(cmd.Args) != 1 || cmd.Args[0] != "/etc/hostname" { + t.Fatalf("command.run args: %+v", cmd.Args) + } + + // Verify the job row landed with actor_kind=schedule. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + var actorKind, scheduledID string + row := st.DB().QueryRowContext(context.Background(), + `SELECT actor_kind, COALESCE(scheduled_id,'') FROM jobs WHERE id = ?`, + cmd.JobID) + if err := row.Scan(&actorKind, &scheduledID); err == nil { + if actorKind != "schedule" { + t.Fatalf("job actor_kind: %q", actorKind) + } + if scheduledID != created.ID { + t.Fatalf("job scheduled_id: %q want %q", scheduledID, created.ID) + } + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("job row %s never landed", cmd.JobID) +} + func TestSchedulePushOnCRUD(t *testing.T) { t.Parallel() srv, url, st := newTestServerWithHub(t) diff --git a/internal/server/http/server.go b/internal/server/http/server.go index 8a5eee6..cdd3eb9 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -117,11 +117,12 @@ func (s *Server) routes(r chi.Router) { // Agent ↔ server WebSocket. Bearer-authenticated inside the handler. if s.deps.Hub != nil { r.Mount("/ws/agent", ws.AgentHandler(ws.HandlerDeps{ - Hub: s.deps.Hub, - Store: s.deps.Store, - JobHub: s.deps.JobHub, - OnHello: s.onAgentHello, - OnScheduleAck: s.applyScheduleAck, + Hub: s.deps.Hub, + Store: s.deps.Store, + JobHub: s.deps.JobHub, + OnHello: s.onAgentHello, + OnScheduleAck: s.applyScheduleAck, + OnScheduleFire: s.dispatchScheduledJob, })) } diff --git a/internal/server/ws/handler.go b/internal/server/ws/handler.go index a406747..533840f 100644 --- a/internal/server/ws/handler.go +++ b/internal/server/ws/handler.go @@ -30,6 +30,11 @@ type HandlerDeps struct { // OnScheduleAck is called when an agent confirms it has applied // a particular schedule version (P2-02 reconciliation). Optional. OnScheduleAck func(ctx context.Context, hostID string, version int64, appliedAt time.Time) + // OnScheduleFire is called when an agent's local cron fires. The + // callback is expected to look up the schedule, persist a job + // row, and emit MsgCommandRun back on conn so the agent can run + // the job using its normal job dispatch path. Optional. + OnScheduleFire func(ctx context.Context, hostID string, conn *Conn, scheduleID string, scheduledAt time.Time) } // AgentHandler is the http.Handler that owns /ws/agent. Agents @@ -268,6 +273,16 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E deps.OnScheduleAck(ctx, hostID, p.Version, p.AppliedAt) } + case api.MsgScheduleFire: + var p api.ScheduleFirePayload + if err := env.UnmarshalPayload(&p); err != nil { + slog.Warn("ws: bad schedule.fire payload", "host_id", hostID, "err", err) + break + } + if deps.OnScheduleFire != nil { + deps.OnScheduleFire(ctx, hostID, c, p.ScheduleID, p.ScheduledAt) + } + case api.MsgRepoStats, api.MsgCommandResult: // TODO(P2): persist these projections. slog.Debug("ws msg not yet handled", "type", env.Type, "host_id", hostID) diff --git a/internal/store/jobs.go b/internal/store/jobs.go index 50633b2..7262543 100644 --- a/internal/store/jobs.go +++ b/internal/store/jobs.go @@ -27,12 +27,15 @@ type Job struct { } // CreateJob inserts a queued job. The agent will mark it running -// when it actually starts work. +// when it actually starts work. ScheduledID is set when the job +// originates from a cron fire (actor_kind="schedule"); nil for +// operator-driven run-now. func (s *Store) CreateJob(ctx context.Context, j Job) error { _, err := s.db.ExecContext(ctx, - `INSERT INTO jobs (id, host_id, kind, status, actor_kind, actor_id, created_at) - VALUES (?, ?, ?, 'queued', ?, ?, ?)`, - j.ID, j.HostID, j.Kind, j.ActorKind, nullable(j.ActorID), + `INSERT INTO jobs (id, host_id, kind, status, scheduled_id, actor_kind, actor_id, created_at) + VALUES (?, ?, ?, 'queued', ?, ?, ?, ?)`, + j.ID, j.HostID, j.Kind, + nullable(j.ScheduledID), j.ActorKind, nullable(j.ActorID), j.CreatedAt.UTC().Format(time.RFC3339Nano)) if err != nil { return fmt.Errorf("store: create job: %w", err) diff --git a/tasks.md b/tasks.md index 08fbff1..19b9f8f 100644 --- a/tasks.md +++ b/tasks.md @@ -98,8 +98,8 @@ Sizes: **S** = under a day, **M** = 1–3 days, **L** = 3–7 days. ## Phase 2 — Scheduling, retention, repo operations - [x] **P2-01** (M) Schedule schema + CRUD API. `schedules` table was already laid down in 0001; this slice adds `store.Schedule`/`RetentionPolicy`/`ScheduleOptions` types, `CreateSchedule` / `GetSchedule` / `ListSchedulesByHost` / `UpdateSchedule` / `DeleteSchedule` / `GetHostScheduleVersion` / `SetHostAppliedScheduleVersion` (mutations bump `host_schedule_version` atomically in-tx), and REST endpoints `GET|POST /api/hosts/{id}/schedules` + `PUT|DELETE /api/hosts/{id}/schedules/{sid}`. Validation: cron-expr parses via `robfig/cron/v3` (same parser the agent will use, so anything that validates here will fire there); kind ∈ {backup, forget, prune, check} (init/unlock are operator-only); backup schedules require ≥1 path; hooks rejected on non-backup kinds (spec §14.3). Mutations audit-logged. Server + store tests cover the happy path, validation, and version bumps. -- [~] **P2-02** (L) Server-pushed schedule reconciliation. Server side complete: `pushScheduleSet*` helpers (one for the on-hello window, one for the post-CRUD async flavour), wiring in `onAgentHello` (always pushes, even when the host has no repo creds yet), `pushScheduleSetAsync` called from Create/Update/Delete handlers (no-op when the host is offline; on-hello catches up). `MsgScheduleAck` is now handled in the WS dispatcher: `OnScheduleAck` callback persists `applied_schedule_version`. Server-side end-to-end test covers (a) on-hello push of an already-created schedule + ack round-trip + applied_version write-through, and (b) connect-then-CRUD push. **Remaining:** agent-side `schedule.set` handler that applies the new state to its local cron + emits `schedule.ack` — lands in P2-03. -- [ ] **P2-03** (M) Agent local scheduler (`robfig/cron/v3`); persists next-fire times across restarts +- [x] **P2-02** (L) Server-pushed schedule reconciliation. `pushScheduleSet*` helpers (on-hello + async post-CRUD flavours), wiring in `onAgentHello` (always pushes, even when the host has no repo creds yet), `pushScheduleSetAsync` called from Create/Update/Delete handlers (no-op when the host is offline; on-hello catches up). `MsgScheduleAck` handled in the WS dispatcher: `OnScheduleAck` callback persists `applied_schedule_version`. Agent-side `schedule.set` handler ships in P2-03; the server side now has parity tests. +- [x] **P2-03** (M) Agent local scheduler. New `internal/agent/scheduler` package wraps `robfig/cron/v3` — `Apply(ScheduleSetPayload, Sender)` stops the prior cron (waits for in-flight entries), rebuilds from scratch (skipping disabled entries + skipping bad cron exprs with a warn log), starts, and emits `schedule.ack`. On a tick the entry sends a new `schedule.fire` envelope to the server with `{schedule_id, scheduled_at}`. The server's `OnScheduleFire` callback (`dispatchScheduledJob`) looks up the schedule, builds args from kind, persists a `jobs` row with `actor_kind=schedule` + `scheduled_id`, and ships `command.run` back on the same conn — agent runs the job through the existing dispatcher. Tx is swapped on every Apply so reconnect is handled naturally (cron entries that fire against a dropped tx log + skip the tick). `CreateJob` now writes `scheduled_id`; this column was in the schema since 0001 but never populated. Tests: scheduler unit tests cover ack-on-apply, cron tick → fire envelope, disabled-entries silent, replace-prior-state stops the old cron. Server-side end-to-end test covers fire → command.run with the right job_id/kind/args, plus jobs row with actor_kind=schedule + scheduled_id linking back. **Deferred:** persistence of next-fire times across agent restarts (a missed fire window during downtime simply fires once on reconnect — desirable behaviour). - [ ] **P2-04** (M) Schedule editor UI (paths, excludes, tags, cron, retention) - [ ] **P2-05** (M) `forget` command with retention policy (keep-last/daily/weekly/monthly/yearly) - [ ] **P2-06** (M) `prune` command (admin-only, uses non-append-only credential)