From a086b0eb75728104f524f55437d1136809c4bf10 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sat, 2 May 2026 11:22:06 +0100 Subject: [PATCH] P2-02 (server side): schedule reconciliation push + ack handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server is now the source of truth for the agent's cron set. * Helpers in schedule_push.go: - loadScheduleSetPayload reads the host's schedules + canonical version into the wire shape. - pushScheduleSetOnConn writes directly to a just-handshaken conn (avoids racing against Hub.Register on a brand-new connection). - pushScheduleSetAsync is the post-CRUD flavour — no-op when the host is offline (the next reconnect's on-hello path catches it up, so a missed push is non-fatal). - applyScheduleAck records what version the agent has confirmed. * onAgentHello restructured: was returning early when the host had no repo credentials, which made the schedule push unreachable for fresh hosts. Split into pushRepoCredsOnHello (silent no-op on ErrNotFound) + pushScheduleSetOnConn (always runs). Empty schedule list is a valid push: tells the agent to drop stale cron entries. * WS dispatcher gains an OnScheduleAck hook on HandlerDeps; the http server wires it to applyScheduleAck. MsgScheduleAck moves out of the "TODO(P2)" group into a real case that decodes the payload and forwards to the callback. * Schedule CRUD handlers each fire pushScheduleSetAsync after the audit-log write so the agent picks up changes within seconds. Tests cover: - On-hello push of an already-created schedule, agent acks, applied_schedule_version flips on the host row. - Connect-then-CRUD: empty initial push (version 0), then a follow-on push at version 1 after the operator creates a schedule via REST. Agent-side `schedule.set` handler (parse, replace local cron, emit `schedule.ack`) is the remainder of P2-02 and lands with P2-03's local scheduler. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/server/http/host_credentials.go | 13 ++ internal/server/http/schedule_push.go | 152 +++++++++++++++ internal/server/http/schedule_push_test.go | 212 +++++++++++++++++++++ internal/server/http/schedules.go | 3 + internal/server/http/server.go | 9 +- internal/server/ws/handler.go | 15 +- tasks.md | 2 +- 7 files changed, 400 insertions(+), 6 deletions(-) create mode 100644 internal/server/http/schedule_push.go create mode 100644 internal/server/http/schedule_push_test.go diff --git a/internal/server/http/host_credentials.go b/internal/server/http/host_credentials.go index 1c64364..2d99a1e 100644 --- a/internal/server/http/host_credentials.go +++ b/internal/server/http/host_credentials.go @@ -192,6 +192,19 @@ func (s *Server) pushRepoCredsToAgent(ctx context.Context, hostID string, blob r // The conn argument is used directly (rather than via the hub) so we // don't race a brand-new register against an old still-closing conn. func (s *Server) onAgentHello(ctx context.Context, hostID string, conn *ws.Conn) { + s.pushRepoCredsOnHello(ctx, hostID, conn) + // Push the current schedule set in the same on-hello window so + // the agent's local cron is in sync before any command.run lands. + // An empty schedule list is a valid push: it tells the agent to + // drop any cron entries left over from a previous deployment. + // Always runs, even when the host has no repo credentials yet. + s.pushScheduleSetOnConn(ctx, hostID, conn) +} + +// pushRepoCredsOnHello loads + decrypts + sends the host's repo +// credentials. Silent no-op when the host has nothing on file +// (the operator hasn't bound creds to it yet). +func (s *Server) pushRepoCredsOnHello(ctx context.Context, hostID string, conn *ws.Conn) { enc, err := s.deps.Store.GetHostCredentials(ctx, hostID) if err != nil { if !errors.Is(err, store.ErrNotFound) { diff --git a/internal/server/http/schedule_push.go b/internal/server/http/schedule_push.go new file mode 100644 index 0000000..729d26b --- /dev/null +++ b/internal/server/http/schedule_push.go @@ -0,0 +1,152 @@ +package http + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// loadScheduleSetPayload reads the host's current schedule set + the +// canonical version into a wire-shape payload. Returns an empty +// (but well-formed) payload with version 0 if the host has nothing +// scheduled yet — that's still a valid state to push so the agent +// drops any stale cron entries from a previous deployment. +func (s *Server) loadScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) { + rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) + if err != nil { + return api.ScheduleSetPayload{}, err + } + version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) + if err != nil { + return api.ScheduleSetPayload{}, err + } + out := api.ScheduleSetPayload{ + Version: version, + Schedules: make([]api.Schedule, 0, len(rows)), + } + for _, r := range rows { + retJSON, _ := json.Marshal(r.RetentionPolicy) + optJSON, _ := json.Marshal(r.Options) + out.Schedules = append(out.Schedules, api.Schedule{ + ID: r.ID, + Kind: api.JobKind(r.Kind), + CronExpr: r.CronExpr, + Paths: r.Paths, + Excludes: r.Excludes, + Tags: r.Tags, + RetentionPolicy: retJSON, + Options: optJSON, + PreHook: r.PreHook, + PostHook: r.PostHook, + Enabled: r.Enabled, + }) + } + return out, nil +} + +// pushScheduleSet ships the current schedule list to the agent over +// the hub. Caller has already determined the agent is connected. +// Errors are logged and returned; the next push (or the next hello) +// will retry. Idempotent — sending the same version twice is a +// harmless no-op on the agent side. +func (s *Server) pushScheduleSet(ctx context.Context, hostID string) error { + pl, err := s.loadScheduleSetPayload(ctx, hostID) + if err != nil { + slog.Warn("push schedule.set: load failed", "host_id", hostID, "err", err) + return err + } + env, err := api.Marshal(api.MsgScheduleSet, "", pl) + if err != nil { + return err + } + sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { + slog.Warn("push schedule.set: hub send failed", + "host_id", hostID, "version", pl.Version, "err", err) + return err + } + return nil +} + +// pushScheduleSetOnConn is the on-hello flavour: writes directly to +// the just-handshaken conn rather than racing through the hub. Used +// by onAgentHello so a brand-new connection can't miss an early +// push because Register hasn't completed yet. +func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) { + pl, err := s.loadScheduleSetPayload(ctx, hostID) + if err != nil { + slog.Warn("on-hello: load schedules", "host_id", hostID, "err", err) + return + } + env, err := api.Marshal(api.MsgScheduleSet, "", pl) + if err != nil { + slog.Error("on-hello: marshal schedule.set", "host_id", hostID, "err", err) + return + } + sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := conn.Send(sendCtx, env); err != nil { + slog.Warn("on-hello: send schedule.set", + "host_id", hostID, "version", pl.Version, "err", err) + } +} + +// pushIfConnected dispatches a schedule.set push asynchronously when +// the agent is online. Used by CRUD handlers: a missed push is +// non-fatal because the next reconnect's on-hello path will catch +// the agent up. Decoupled from the request so the operator's HTTP +// response doesn't wait on the WS round-trip. +func (s *Server) pushScheduleSetAsync(hostID string) { + if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) { + return + } + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = s.pushScheduleSet(ctx, hostID) + }() +} + +// applyScheduleAck records the version the agent has confirmed via +// schedule.ack. Called from the WS dispatcher (wired below). A bad +// version (zero, or higher than what we've issued) is logged but +// not fatal — the next push will set the agent straight. +func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) { + if version <= 0 { + return + } + canonical, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) + if err != nil { + slog.Warn("schedule.ack: load canonical version", + "host_id", hostID, "err", err) + return + } + if version > canonical { + slog.Warn("schedule.ack: agent reported version ahead of server", + "host_id", hostID, "agent", version, "server", canonical) + return + } + if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil { + slog.Warn("schedule.ack: persist applied version", + "host_id", hostID, "version", version, "err", err) + return + } + slog.Info("schedule.ack: applied", + "host_id", hostID, "version", version, "applied_at", appliedAt) +} + +// Compile-time guard that the store actually implements the methods +// schedule_push.go calls. Useful when mocking the store in tests. +var _ scheduleStore = (*store.Store)(nil) + +type scheduleStore interface { + ListSchedulesByHost(ctx context.Context, hostID string) ([]store.Schedule, error) + GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) + SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error +} diff --git a/internal/server/http/schedule_push_test.go b/internal/server/http/schedule_push_test.go new file mode 100644 index 0000000..712fba1 --- /dev/null +++ b/internal/server/http/schedule_push_test.go @@ -0,0 +1,212 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "io" + stdhttp "net/http" + "strings" + "testing" + "time" + + "github.com/coder/websocket" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/auth" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// makePushHost is like makeHTTPHost but mints a known agent token so +// the test can dial /ws/agent as the host. Returns (hostID, raw token). +func makePushHost(t *testing.T, st *store.Store) (string, string) { + t.Helper() + const id = "01HSCHEDPUSH00000000000000" + tok, _ := auth.NewToken() + if err := st.CreateHost(context.Background(), store.Host{ + ID: id, Name: "ph", OS: "linux", Arch: "amd64", + AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, + EnrolledAt: time.Now().UTC(), + }, auth.HashToken(tok), ""); err != nil { + t.Fatalf("create host: %v", err) + } + return id, tok +} + +// readUntilType pumps messages from the WS until one of the wanted +// types arrives or ctx times out. Returns the matched envelope. +// Useful because the on-hello path may push several messages +// (config.update first if creds exist, schedule.set, …). +func readUntilType(ctx context.Context, t *testing.T, c *websocket.Conn, want api.MessageType) api.Envelope { + t.Helper() + for { + _, raw, err := c.Read(ctx) + if err != nil { + t.Fatalf("ws read waiting for %s: %v", want, err) + } + var env api.Envelope + if err := json.Unmarshal(raw, &env); err != nil { + t.Fatalf("envelope: %v (raw=%s)", err, raw) + } + t.Logf("recv: type=%s payload=%s", env.Type, env.Payload) + if env.Type == want { + return env + } + } +} + +func TestSchedulePushOnHelloAndAckRoundtrip(t *testing.T) { + t.Parallel() + srv, url, st := newTestServerWithHub(t) + _ = srv + cookie := loginAndCookie(t, url) + hostID, agentToken := makePushHost(t, st) + + // Pre-populate one schedule so we have something to push. + body, _ := json.Marshal(scheduleAPI{ + Kind: "backup", + CronExpr: "@hourly", + Paths: []string{"/etc"}, + Enabled: true, + }) + req, _ := stdhttp.NewRequest("POST", url+"/api/hosts/"+hostID+"/schedules", + bytes.NewReader(body)) + req.AddCookie(cookie) + req.Header.Set("Content-Type", "application/json") + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("create schedule: %v", err) + } + got, _ := io.ReadAll(res.Body) + res.Body.Close() + if res.StatusCode != stdhttp.StatusCreated { + t.Fatalf("create schedule: %d %s", res.StatusCode, got) + } + var created scheduleAPI + _ = json.Unmarshal(got, &created) + + // Dial the WS as the agent and send hello. + wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ + HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}}, + }) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer c.CloseNow() + + helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{ + ProtocolVersion: api.CurrentProtocolVersion, + AgentVersion: "test", ResticVersion: "test", + Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64, + }) + raw, _ := json.Marshal(helloEnv) + if err := c.Write(ctx, websocket.MessageText, raw); err != nil { + t.Fatalf("write hello: %v", err) + } + + // Server should push schedule.set (our host has no creds, so the + // config.update branch is silently skipped). + pushedEnv := readUntilType(ctx, t, c, api.MsgScheduleSet) + var pushed api.ScheduleSetPayload + if err := pushedEnv.UnmarshalPayload(&pushed); err != nil { + t.Fatalf("decode payload: %v", err) + } + if pushed.Version != 1 { + t.Fatalf("pushed version: got %d, want 1", pushed.Version) + } + if len(pushed.Schedules) != 1 || pushed.Schedules[0].ID != created.ID { + t.Fatalf("pushed schedules: %+v", pushed.Schedules) + } + if pushed.Schedules[0].CronExpr != "@hourly" || len(pushed.Schedules[0].Paths) != 1 { + t.Fatalf("schedule contents: %+v", pushed.Schedules[0]) + } + + // Ack the version. Server should record it on the host row. + ackEnv, _ := api.Marshal(api.MsgScheduleAck, "", api.ScheduleAckPayload{ + Version: pushed.Version, + AppliedAt: time.Now().UTC(), + }) + raw, _ = json.Marshal(ackEnv) + if err := c.Write(ctx, websocket.MessageText, raw); err != nil { + t.Fatalf("write ack: %v", err) + } + + // Wait for applied_schedule_version to flip. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + h, err := st.GetHost(context.Background(), hostID) + if err == nil && h.AppliedScheduleVersion == pushed.Version { + return + } + time.Sleep(20 * time.Millisecond) + } + h, _ := st.GetHost(context.Background(), hostID) + t.Fatalf("applied_schedule_version did not advance: got %d, want %d", + h.AppliedScheduleVersion, pushed.Version) +} + +func TestSchedulePushOnCRUD(t *testing.T) { + t.Parallel() + srv, url, st := newTestServerWithHub(t) + _ = srv + cookie := loginAndCookie(t, url) + hostID, agentToken := makePushHost(t, st) + + // Connect first so the CRUD push has somewhere to land. + wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ + HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}}, + }) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer c.CloseNow() + + helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{ + ProtocolVersion: api.CurrentProtocolVersion, + AgentVersion: "test", ResticVersion: "test", + Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64, + }) + raw, _ := json.Marshal(helloEnv) + _ = c.Write(ctx, websocket.MessageText, raw) + + // Drain the on-hello schedule.set (will be version 0, empty list). + first := readUntilType(ctx, t, c, api.MsgScheduleSet) + var initial api.ScheduleSetPayload + _ = first.UnmarshalPayload(&initial) + if initial.Version != 0 || len(initial.Schedules) != 0 { + t.Fatalf("initial push: %+v", initial) + } + + // Now create a schedule via REST. The handler should fire a + // schedule.set push asynchronously. + body, _ := json.Marshal(scheduleAPI{ + Kind: "backup", CronExpr: "*/30 * * * *", + Paths: []string{"/var/lib"}, Enabled: true, + }) + req, _ := stdhttp.NewRequest("POST", + url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(body)) + req.AddCookie(cookie) + req.Header.Set("Content-Type", "application/json") + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("create: %v", err) + } + res.Body.Close() + if res.StatusCode != stdhttp.StatusCreated { + t.Fatalf("create: %d", res.StatusCode) + } + + // Wait for the pushed schedule.set with version 1. + pushed := readUntilType(ctx, t, c, api.MsgScheduleSet) + var pl api.ScheduleSetPayload + _ = pushed.UnmarshalPayload(&pl) + if pl.Version != 1 || len(pl.Schedules) != 1 { + t.Fatalf("push after create: %+v", pl) + } +} diff --git a/internal/server/http/schedules.go b/internal/server/http/schedules.go index ceff729..71cadb8 100644 --- a/internal/server/http/schedules.go +++ b/internal/server/http/schedules.go @@ -141,6 +141,7 @@ func (s *Server) handleCreateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Reque TargetID: &row.ID, TS: nowUTC(), }) + s.pushScheduleSetAsync(hostID) writeJSON(w, stdhttp.StatusCreated, toScheduleAPI(row)) } @@ -207,6 +208,7 @@ func (s *Server) handleUpdateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Reque TargetID: &existing.ID, TS: nowUTC(), }) + s.pushScheduleSetAsync(hostID) writeJSON(w, stdhttp.StatusOK, toScheduleAPI(*existing)) } @@ -239,6 +241,7 @@ func (s *Server) handleDeleteSchedule(w stdhttp.ResponseWriter, r *stdhttp.Reque TargetID: &scheduleID, TS: nowUTC(), }) + s.pushScheduleSetAsync(hostID) w.WriteHeader(stdhttp.StatusNoContent) } diff --git a/internal/server/http/server.go b/internal/server/http/server.go index 471d53a..8a5eee6 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -117,10 +117,11 @@ func (s *Server) routes(r chi.Router) { // Agent ↔ server WebSocket. Bearer-authenticated inside the handler. if s.deps.Hub != nil { r.Mount("/ws/agent", ws.AgentHandler(ws.HandlerDeps{ - Hub: s.deps.Hub, - Store: s.deps.Store, - JobHub: s.deps.JobHub, - OnHello: s.onAgentHello, + Hub: s.deps.Hub, + Store: s.deps.Store, + JobHub: s.deps.JobHub, + OnHello: s.onAgentHello, + OnScheduleAck: s.applyScheduleAck, })) } diff --git a/internal/server/ws/handler.go b/internal/server/ws/handler.go index e72e946..a406747 100644 --- a/internal/server/ws/handler.go +++ b/internal/server/ws/handler.go @@ -27,6 +27,9 @@ type HandlerDeps struct { // layer to push host_credentials down as a config.update before // the agent starts asking for jobs. Optional; nil = no-op. OnHello func(ctx context.Context, hostID string, conn *Conn) + // OnScheduleAck is called when an agent confirms it has applied + // a particular schedule version (P2-02 reconciliation). Optional. + OnScheduleAck func(ctx context.Context, hostID string, version int64, appliedAt time.Time) } // AgentHandler is the http.Handler that owns /ws/agent. Agents @@ -255,7 +258,17 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E } } - case api.MsgRepoStats, api.MsgScheduleAck, api.MsgCommandResult: + case api.MsgScheduleAck: + var p api.ScheduleAckPayload + if err := env.UnmarshalPayload(&p); err != nil { + slog.Warn("ws: bad schedule.ack payload", "host_id", hostID, "err", err) + break + } + if deps.OnScheduleAck != nil { + deps.OnScheduleAck(ctx, hostID, p.Version, p.AppliedAt) + } + + case api.MsgRepoStats, api.MsgCommandResult: // TODO(P2): persist these projections. slog.Debug("ws msg not yet handled", "type", env.Type, "host_id", hostID) diff --git a/tasks.md b/tasks.md index a2c2fc9..08fbff1 100644 --- a/tasks.md +++ b/tasks.md @@ -98,7 +98,7 @@ Sizes: **S** = under a day, **M** = 1–3 days, **L** = 3–7 days. ## Phase 2 — Scheduling, retention, repo operations - [x] **P2-01** (M) Schedule schema + CRUD API. `schedules` table was already laid down in 0001; this slice adds `store.Schedule`/`RetentionPolicy`/`ScheduleOptions` types, `CreateSchedule` / `GetSchedule` / `ListSchedulesByHost` / `UpdateSchedule` / `DeleteSchedule` / `GetHostScheduleVersion` / `SetHostAppliedScheduleVersion` (mutations bump `host_schedule_version` atomically in-tx), and REST endpoints `GET|POST /api/hosts/{id}/schedules` + `PUT|DELETE /api/hosts/{id}/schedules/{sid}`. Validation: cron-expr parses via `robfig/cron/v3` (same parser the agent will use, so anything that validates here will fire there); kind ∈ {backup, forget, prune, check} (init/unlock are operator-only); backup schedules require ≥1 path; hooks rejected on non-backup kinds (spec §14.3). Mutations audit-logged. Server + store tests cover the happy path, validation, and version bumps. -- [ ] **P2-02** (L) Server-pushed schedule reconciliation (server is source of truth; agent applies) +- [~] **P2-02** (L) Server-pushed schedule reconciliation. Server side complete: `pushScheduleSet*` helpers (one for the on-hello window, one for the post-CRUD async flavour), wiring in `onAgentHello` (always pushes, even when the host has no repo creds yet), `pushScheduleSetAsync` called from Create/Update/Delete handlers (no-op when the host is offline; on-hello catches up). `MsgScheduleAck` is now handled in the WS dispatcher: `OnScheduleAck` callback persists `applied_schedule_version`. Server-side end-to-end test covers (a) on-hello push of an already-created schedule + ack round-trip + applied_version write-through, and (b) connect-then-CRUD push. **Remaining:** agent-side `schedule.set` handler that applies the new state to its local cron + emits `schedule.ack` — lands in P2-03. - [ ] **P2-03** (M) Agent local scheduler (`robfig/cron/v3`); persists next-fire times across restarts - [ ] **P2-04** (M) Schedule editor UI (paths, excludes, tags, cron, retention) - [ ] **P2-05** (M) `forget` command with retention policy (keep-last/daily/weekly/monthly/yearly)