From aa9fc330fc3062746fd6a55f62fa6fb1cd61d4ae Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sat, 2 May 2026 11:12:58 +0100 Subject: [PATCH] P2-01: schedule schema + CRUD API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `schedules` table was already laid down in migration 0001; this slice adds the Go-side data model, store CRUD with atomic version bumps, and REST endpoints. * `store.Schedule` + `RetentionPolicy` + `ScheduleOptions` typed views (the wire form on the agent side keeps retention/options as raw JSON since the agent just forwards them to restic). * Store CRUD: CreateSchedule / GetSchedule / ListSchedulesByHost / UpdateSchedule / DeleteSchedule. Each mutation bumps `host_schedule_version` atomically in the same tx via UPSERT on `host_schedule_version`. SetHostAppliedScheduleVersion records what the agent has confirmed via schedule.ack (P2-02 will use it). * REST endpoints under /api/hosts/{id}/schedules + /{sid}: GET (list, with the version envelope so callers can detect drift), POST (create), PUT (update — kind is immutable), DELETE. * Validation: cron expressions parse via robfig/cron/v3 (same parser the agent will use, so anything that validates here will fire there); kind ∈ {backup, forget, prune, check} (init/unlock are operator-only one-shot kinds, not schedulable); backup schedules require ≥1 path; hooks rejected on non-backup kinds (spec §14.3). * All mutations audit-logged. * Tests: store-level CRUD + version-bump invariants; REST happy path (create→list→update→delete with version progression); REST validation table covers each rejection code. newTestServerWithHub now sets BootstrapToken so the schedules handler tests can use the existing login flow without a parallel test-server constructor. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 1 + go.sum | 2 + internal/server/http/enrollment_test.go | 9 +- internal/server/http/schedules.go | 296 ++++++++++++++++++++++++ internal/server/http/schedules_test.go | 190 +++++++++++++++ internal/server/http/server.go | 8 + internal/store/schedules.go | 280 ++++++++++++++++++++++ internal/store/schedules_test.go | 122 ++++++++++ internal/store/types.go | 41 ++++ tasks.md | 2 +- 10 files changed, 946 insertions(+), 5 deletions(-) create mode 100644 internal/server/http/schedules.go create mode 100644 internal/server/http/schedules_test.go create mode 100644 internal/store/schedules.go create mode 100644 internal/store/schedules_test.go diff --git a/go.mod b/go.mod index 9af0d28..b392766 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect golang.org/x/sys v0.43.0 // indirect modernc.org/libc v1.72.0 // indirect modernc.org/mathutil v1.7.1 // indirect diff --git a/go.sum b/go.sum index e241a4c..3fcf455 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNs github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= diff --git a/internal/server/http/enrollment_test.go b/internal/server/http/enrollment_test.go index e0c1af6..65a26a7 100644 --- a/internal/server/http/enrollment_test.go +++ b/internal/server/http/enrollment_test.go @@ -36,10 +36,11 @@ func newTestServerWithHub(t *testing.T) (*Server, string, *store.Store) { aead, _ := crypto.NewAEAD(key) deps := Deps{ - Cfg: config.Config{Listen: ":0", DataDir: dir, SecretKeyFile: keyPath}, - Store: st, - AEAD: aead, - Hub: ws.NewHub(), + Cfg: config.Config{Listen: ":0", DataDir: dir, SecretKeyFile: keyPath}, + Store: st, + AEAD: aead, + Hub: ws.NewHub(), + BootstrapToken: "test-token", } s := New(deps) ts := httptest.NewServer(s.srv.Handler) diff --git a/internal/server/http/schedules.go b/internal/server/http/schedules.go new file mode 100644 index 0000000..ceff729 --- /dev/null +++ b/internal/server/http/schedules.go @@ -0,0 +1,296 @@ +package http + +import ( + "encoding/json" + "errors" + stdhttp "net/http" + "strings" + + "github.com/go-chi/chi/v5" + "github.com/oklog/ulid/v2" + "github.com/robfig/cron/v3" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// scheduleAPI is the JSON shape for /api/hosts/{id}/schedules. We +// avoid leaking host_id in the body since it's already in the URL, +// and we render booleans + retention as typed JSON rather than +// strings so the UI can edit fields directly. +type scheduleAPI struct { + ID string `json:"id,omitempty"` + Kind api.JobKind `json:"kind"` + CronExpr string `json:"cron_expr"` + Paths []string `json:"paths"` + Excludes []string `json:"excludes"` + Tags []string `json:"tags"` + RetentionPolicy store.RetentionPolicy `json:"retention_policy"` + Options store.ScheduleOptions `json:"options"` + PreHook string `json:"pre_hook,omitempty"` + PostHook string `json:"post_hook,omitempty"` + Enabled bool `json:"enabled"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +// listSchedulesResp wraps the array so the response is forward- +// compatible (we may want to add a top-level `version` later). +type listSchedulesResp struct { + Version int64 `json:"version"` + Schedules []scheduleAPI `json:"schedules"` +} + +// cron parser used for input validation. Standard 5-field syntax +// with descriptors (@hourly etc.) — same parser the agent will +// run against, so a schedule that validates here will fire there. +var cronParser = cron.NewParser( + cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, +) + +func (s *Server) handleListSchedules(w stdhttp.ResponseWriter, r *stdhttp.Request) { + if _, ok := s.requireUser(r); !ok { + writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") + return + } + hostID := chi.URLParam(r, "id") + if hostID == "" { + writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "") + return + } + if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil { + if errors.Is(err, store.ErrNotFound) { + writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "") + return + } + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + rows, err := s.deps.Store.ListSchedulesByHost(r.Context(), hostID) + if err != nil { + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + version, err := s.deps.Store.GetHostScheduleVersion(r.Context(), hostID) + if err != nil { + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + out := listSchedulesResp{Version: version, Schedules: make([]scheduleAPI, len(rows))} + for i, row := range rows { + out.Schedules[i] = toScheduleAPI(row) + } + writeJSON(w, stdhttp.StatusOK, out) +} + +func (s *Server) handleCreateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { + user, ok := s.requireUser(r) + if !ok { + writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") + return + } + hostID := chi.URLParam(r, "id") + if hostID == "" { + writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "") + return + } + if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil { + if errors.Is(err, store.ErrNotFound) { + writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "") + return + } + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + + var req scheduleAPI + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error()) + return + } + if code, msg := validateSchedule(&req); code != "" { + writeJSONError(w, stdhttp.StatusBadRequest, code, msg) + return + } + + row := store.Schedule{ + ID: ulid.Make().String(), + HostID: hostID, + Kind: string(req.Kind), + CronExpr: req.CronExpr, + Paths: req.Paths, + Excludes: req.Excludes, + Tags: req.Tags, + RetentionPolicy: req.RetentionPolicy, + Options: req.Options, + PreHook: req.PreHook, + PostHook: req.PostHook, + Enabled: req.Enabled, + } + if err := s.deps.Store.CreateSchedule(r.Context(), &row); err != nil { + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) + return + } + + _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ + ID: ulid.Make().String(), + UserID: &user.ID, + Actor: "user", + Action: "schedule.created", + TargetKind: ptr("schedule"), + TargetID: &row.ID, + TS: nowUTC(), + }) + writeJSON(w, stdhttp.StatusCreated, toScheduleAPI(row)) +} + +func (s *Server) handleUpdateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { + user, ok := s.requireUser(r) + if !ok { + writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") + return + } + hostID := chi.URLParam(r, "id") + scheduleID := chi.URLParam(r, "sid") + if hostID == "" || scheduleID == "" { + writeJSONError(w, stdhttp.StatusBadRequest, "missing_id", "") + return + } + existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "") + return + } + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + + var req scheduleAPI + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error()) + return + } + // Kind is immutable; ignore whatever was sent and fix up before + // validation so validateSchedule sees the existing kind. + req.Kind = api.JobKind(existing.Kind) + if code, msg := validateSchedule(&req); code != "" { + writeJSONError(w, stdhttp.StatusBadRequest, code, msg) + return + } + + existing.CronExpr = req.CronExpr + existing.Paths = req.Paths + existing.Excludes = req.Excludes + existing.Tags = req.Tags + existing.RetentionPolicy = req.RetentionPolicy + existing.Options = req.Options + existing.PreHook = req.PreHook + existing.PostHook = req.PostHook + existing.Enabled = req.Enabled + + if err := s.deps.Store.UpdateSchedule(r.Context(), existing); err != nil { + if errors.Is(err, store.ErrNotFound) { + writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "") + return + } + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) + return + } + + _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ + ID: ulid.Make().String(), + UserID: &user.ID, + Actor: "user", + Action: "schedule.updated", + TargetKind: ptr("schedule"), + TargetID: &existing.ID, + TS: nowUTC(), + }) + writeJSON(w, stdhttp.StatusOK, toScheduleAPI(*existing)) +} + +func (s *Server) handleDeleteSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { + user, ok := s.requireUser(r) + if !ok { + writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") + return + } + hostID := chi.URLParam(r, "id") + scheduleID := chi.URLParam(r, "sid") + if hostID == "" || scheduleID == "" { + writeJSONError(w, stdhttp.StatusBadRequest, "missing_id", "") + return + } + if err := s.deps.Store.DeleteSchedule(r.Context(), hostID, scheduleID); err != nil { + if errors.Is(err, store.ErrNotFound) { + writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "") + return + } + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) + return + } + _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ + ID: ulid.Make().String(), + UserID: &user.ID, + Actor: "user", + Action: "schedule.deleted", + TargetKind: ptr("schedule"), + TargetID: &scheduleID, + TS: nowUTC(), + }) + w.WriteHeader(stdhttp.StatusNoContent) +} + +// validateSchedule rejects malformed inputs with a stable error code. +// Returns ("", "") on success. +func validateSchedule(s *scheduleAPI) (code, msg string) { + switch s.Kind { + case api.JobBackup, api.JobForget, api.JobPrune, api.JobCheck: + // ok — valid schedule kinds (init/unlock are operator-driven only). + default: + return "invalid_kind", "kind must be one of backup|forget|prune|check" + } + if strings.TrimSpace(s.CronExpr) == "" { + return "missing_cron_expr", "cron_expr is required" + } + if _, err := cronParser.Parse(s.CronExpr); err != nil { + return "invalid_cron_expr", err.Error() + } + if s.Kind == api.JobBackup && len(s.Paths) == 0 { + return "missing_paths", "backup schedules require at least one path" + } + // Hooks are only meaningful on backup schedules (spec §14.3). + if s.Kind != api.JobBackup && (s.PreHook != "" || s.PostHook != "") { + return "hooks_not_allowed", "pre_hook / post_hook only apply to backup schedules" + } + return "", "" +} + +func toScheduleAPI(s store.Schedule) scheduleAPI { + out := scheduleAPI{ + ID: s.ID, + Kind: api.JobKind(s.Kind), + CronExpr: s.CronExpr, + Paths: s.Paths, + Excludes: s.Excludes, + Tags: s.Tags, + RetentionPolicy: s.RetentionPolicy, + Options: s.Options, + PreHook: s.PreHook, + PostHook: s.PostHook, + Enabled: s.Enabled, + CreatedAt: s.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), + UpdatedAt: s.UpdatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), + } + if out.Paths == nil { + out.Paths = []string{} + } + if out.Excludes == nil { + out.Excludes = []string{} + } + if out.Tags == nil { + out.Tags = []string{} + } + return out +} diff --git a/internal/server/http/schedules_test.go b/internal/server/http/schedules_test.go new file mode 100644 index 0000000..cc50d0a --- /dev/null +++ b/internal/server/http/schedules_test.go @@ -0,0 +1,190 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "io" + stdhttp "net/http" + "testing" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// loginAndCookie bootstraps an admin, logs in, and returns the +// session cookie ready to attach to subsequent requests. +func loginAndCookie(t *testing.T, url string) *stdhttp.Cookie { + t.Helper() + bs, _ := json.Marshal(bootstrapRequest{ + Token: "test-token", Username: "alice", Password: "averylongpassword", + }) + res, err := stdhttp.Post(url+"/api/bootstrap", "application/json", bytes.NewReader(bs)) + if err != nil { + t.Fatalf("bootstrap: %v", err) + } + res.Body.Close() + + body, _ := json.Marshal(loginRequest{Username: "alice", Password: "averylongpassword"}) + res, err = stdhttp.Post(url+"/api/auth/login", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatalf("login: %v", err) + } + defer res.Body.Close() + if res.StatusCode != stdhttp.StatusOK { + got, _ := io.ReadAll(res.Body) + t.Fatalf("login: %d %s", res.StatusCode, got) + } + for _, c := range res.Cookies() { + if c.Name == sessionCookieName { + return c + } + } + t.Fatal("no session cookie") + return nil +} + +// makeHTTPHost inserts a host directly via the store so we can hit +// the schedule endpoints without dragging in the enrollment flow. +func makeHTTPHost(t *testing.T, st *store.Store) string { + t.Helper() + const id = "01HSCHEDHTTP000000000000Z" + if err := st.CreateHost(context.Background(), store.Host{ + ID: id, Name: "h", OS: "linux", Arch: "amd64", + AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, + EnrolledAt: time.Now().UTC(), + }, "tokenhash", ""); err != nil { + t.Fatalf("create host: %v", err) + } + return id +} + +func TestSchedulesAPIHappyPath(t *testing.T) { + t.Parallel() + _, url, st := newTestServerWithHub(t) + cookie := loginAndCookie(t, url) + hostID := makeHTTPHost(t, st) + + doReq := func(method, path string, body any, want int) []byte { + t.Helper() + var b []byte + if body != nil { + b, _ = json.Marshal(body) + } + req, _ := stdhttp.NewRequest(method, url+path, bytes.NewReader(b)) + req.AddCookie(cookie) + req.Header.Set("Content-Type", "application/json") + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("%s %s: %v", method, path, err) + } + defer res.Body.Close() + got, _ := io.ReadAll(res.Body) + if res.StatusCode != want { + t.Fatalf("%s %s: status %d (want %d) body=%s", method, path, res.StatusCode, want, got) + } + return got + } + + // Empty list returns version 0. + body := doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK) + var listed listSchedulesResp + if err := json.Unmarshal(body, &listed); err != nil { + t.Fatal(err) + } + if listed.Version != 0 || len(listed.Schedules) != 0 { + t.Fatalf("initial list: %+v", listed) + } + + // Create. + keepLast := 3 + create := scheduleAPI{ + Kind: "backup", CronExpr: "0 */6 * * *", + Paths: []string{"/etc"}, + Tags: []string{"nightly"}, + RetentionPolicy: store.RetentionPolicy{KeepLast: &keepLast}, + Enabled: true, + } + body = doReq("POST", "/api/hosts/"+hostID+"/schedules", create, stdhttp.StatusCreated) + var created scheduleAPI + if err := json.Unmarshal(body, &created); err != nil { + t.Fatal(err) + } + if created.ID == "" || created.CronExpr != create.CronExpr { + t.Fatalf("create returned: %+v", created) + } + + // Version bumped. + body = doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK) + _ = json.Unmarshal(body, &listed) + if listed.Version != 1 { + t.Fatalf("version after create: %d", listed.Version) + } + + // Update changes the cron expr; kind silently preserved even if request tries otherwise. + created.CronExpr = "*/15 * * * *" + created.Kind = "prune" // should be ignored + body = doReq("PUT", "/api/hosts/"+hostID+"/schedules/"+created.ID, created, stdhttp.StatusOK) + var updated scheduleAPI + _ = json.Unmarshal(body, &updated) + if updated.Kind != "backup" || updated.CronExpr != "*/15 * * * *" { + t.Fatalf("update: %+v", updated) + } + + // Delete. + doReq("DELETE", "/api/hosts/"+hostID+"/schedules/"+created.ID, nil, stdhttp.StatusNoContent) + body = doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK) + _ = json.Unmarshal(body, &listed) + if listed.Version != 3 || len(listed.Schedules) != 0 { + t.Fatalf("after delete: %+v", listed) + } +} + +func TestSchedulesAPIValidation(t *testing.T) { + t.Parallel() + _, url, st := newTestServerWithHub(t) + cookie := loginAndCookie(t, url) + hostID := makeHTTPHost(t, st) + + post := func(s scheduleAPI) (int, []byte) { + b, _ := json.Marshal(s) + req, _ := stdhttp.NewRequest("POST", + url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(b)) + req.AddCookie(cookie) + req.Header.Set("Content-Type", "application/json") + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("post: %v", err) + } + defer res.Body.Close() + body, _ := io.ReadAll(res.Body) + return res.StatusCode, body + } + + cases := []struct { + name string + in scheduleAPI + want string // expected error code + }{ + {"bad kind", scheduleAPI{Kind: "init", CronExpr: "@hourly", Paths: []string{"/etc"}}, "invalid_kind"}, + {"missing cron", scheduleAPI{Kind: "backup", Paths: []string{"/etc"}}, "missing_cron_expr"}, + {"bad cron", scheduleAPI{Kind: "backup", CronExpr: "not a cron", Paths: []string{"/etc"}}, "invalid_cron_expr"}, + {"backup without paths", scheduleAPI{Kind: "backup", CronExpr: "@hourly"}, "missing_paths"}, + {"hooks on non-backup", scheduleAPI{Kind: "prune", CronExpr: "@daily", PreHook: "echo hi"}, "hooks_not_allowed"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + status, body := post(c.in) + if status != stdhttp.StatusBadRequest { + t.Fatalf("status %d body=%s", status, body) + } + var env struct { + Code string `json:"code"` + } + _ = json.Unmarshal(body, &env) + if env.Code != c.want { + t.Fatalf("error code: got %q want %q (body=%s)", env.Code, c.want, body) + } + }) + } +} diff --git a/internal/server/http/server.go b/internal/server/http/server.go index fc90d17..471d53a 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -104,6 +104,14 @@ func (s *Server) routes(r chi.Router) { // GET returns a redacted view (URL, username, has_password). r.Get("/hosts/{id}/repo-credentials", s.handleGetHostCredentials) r.Put("/hosts/{id}/repo-credentials", s.handleSetHostCredentials) + + // Per-host schedule CRUD. Mutations bump host_schedule_version; + // the agent sync path (P2-02) picks up the new version on the + // next reconciliation tick. + r.Get("/hosts/{id}/schedules", s.handleListSchedules) + r.Post("/hosts/{id}/schedules", s.handleCreateSchedule) + r.Put("/hosts/{id}/schedules/{sid}", s.handleUpdateSchedule) + r.Delete("/hosts/{id}/schedules/{sid}", s.handleDeleteSchedule) }) // Agent ↔ server WebSocket. Bearer-authenticated inside the handler. diff --git a/internal/store/schedules.go b/internal/store/schedules.go new file mode 100644 index 0000000..5688f88 --- /dev/null +++ b/internal/store/schedules.go @@ -0,0 +1,280 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "time" +) + +// CreateSchedule inserts a new schedule and bumps the host's +// schedule_version atomically. Returns the inserted row's +// CreatedAt / UpdatedAt timestamps written into s. +func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { + if s.ID == "" || s.HostID == "" { + return errors.New("store: schedule id and host_id required") + } + now := time.Now().UTC() + s.CreatedAt = now + s.UpdatedAt = now + if s.Paths == nil { + s.Paths = []string{} + } + if s.Excludes == nil { + s.Excludes = []string{} + } + if s.Tags == nil { + s.Tags = []string{} + } + pathsJSON, _ := json.Marshal(s.Paths) + excludesJSON, _ := json.Marshal(s.Excludes) + tagsJSON, _ := json.Marshal(s.Tags) + retentionJSON, _ := json.Marshal(s.RetentionPolicy) + optionsJSON, _ := json.Marshal(s.Options) + + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + if _, err := tx.ExecContext(ctx, + `INSERT INTO schedules ( + id, host_id, kind, cron_expr, paths, excludes, tags, + retention_policy, options, pre_hook, post_hook, enabled, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + s.ID, s.HostID, s.Kind, s.CronExpr, + string(pathsJSON), string(excludesJSON), string(tagsJSON), + string(retentionJSON), string(optionsJSON), + s.PreHook, s.PostHook, boolToInt(s.Enabled), + now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), + ); err != nil { + return fmt.Errorf("store: create schedule: %w", err) + } + if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { + return err + } + return tx.Commit() +} + +// UpdateSchedule replaces every editable field on an existing row +// and bumps host_schedule_version. ID and HostID must match an +// existing row; kind is immutable (creating a new schedule is +// cheaper than re-keying retention/hooks). +func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error { + if s.ID == "" || s.HostID == "" { + return errors.New("store: schedule id and host_id required") + } + if s.Paths == nil { + s.Paths = []string{} + } + if s.Excludes == nil { + s.Excludes = []string{} + } + if s.Tags == nil { + s.Tags = []string{} + } + pathsJSON, _ := json.Marshal(s.Paths) + excludesJSON, _ := json.Marshal(s.Excludes) + tagsJSON, _ := json.Marshal(s.Tags) + retentionJSON, _ := json.Marshal(s.RetentionPolicy) + optionsJSON, _ := json.Marshal(s.Options) + now := time.Now().UTC() + + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + res, err := tx.ExecContext(ctx, + `UPDATE schedules SET + cron_expr = ?, paths = ?, excludes = ?, tags = ?, + retention_policy = ?, options = ?, + pre_hook = ?, post_hook = ?, enabled = ?, + updated_at = ? + WHERE id = ? AND host_id = ?`, + s.CronExpr, + string(pathsJSON), string(excludesJSON), string(tagsJSON), + string(retentionJSON), string(optionsJSON), + s.PreHook, s.PostHook, boolToInt(s.Enabled), + now.Format(time.RFC3339Nano), + s.ID, s.HostID, + ) + if err != nil { + return fmt.Errorf("store: update schedule: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + s.UpdatedAt = now + if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { + return err + } + return tx.Commit() +} + +// DeleteSchedule removes a schedule and bumps host_schedule_version. +// Returns ErrNotFound if no row matched. +func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string) error { + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + res, err := tx.ExecContext(ctx, + `DELETE FROM schedules WHERE id = ? AND host_id = ?`, + scheduleID, hostID) + if err != nil { + return fmt.Errorf("store: delete schedule: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { + return err + } + return tx.Commit() +} + +// GetSchedule returns one schedule by (host_id, id). Returns +// ErrNotFound on miss. +func (st *Store) GetSchedule(ctx context.Context, hostID, scheduleID string) (*Schedule, error) { + row := st.db.QueryRowContext(ctx, + `SELECT id, host_id, kind, cron_expr, paths, excludes, tags, + retention_policy, options, pre_hook, post_hook, enabled, + created_at, updated_at + FROM schedules WHERE id = ? AND host_id = ?`, + scheduleID, hostID) + s, err := scanSchedule(row) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return s, err +} + +// ListSchedulesByHost returns every schedule for a host, ordered +// by created_at. Empty slice on miss (not an error). +func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Schedule, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT id, host_id, kind, cron_expr, paths, excludes, tags, + retention_policy, options, pre_hook, post_hook, enabled, + created_at, updated_at + FROM schedules WHERE host_id = ? ORDER BY created_at`, + hostID) + if err != nil { + return nil, fmt.Errorf("store: list schedules: %w", err) + } + defer rows.Close() + out := []Schedule{} + for rows.Next() { + s, err := scanScheduleRow(rows) + if err != nil { + return nil, err + } + out = append(out, *s) + } + return out, rows.Err() +} + +// GetHostScheduleVersion returns the current version for a host, +// or 0 if no row exists yet. +func (st *Store) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) { + var v int64 + err := st.db.QueryRowContext(ctx, + `SELECT version FROM host_schedule_version WHERE host_id = ?`, hostID).Scan(&v) + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("store: get schedule version: %w", err) + } + return v, nil +} + +// SetHostAppliedScheduleVersion records the version the agent has +// confirmed via schedule.ack. Idempotent. +func (st *Store) SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error { + _, err := st.db.ExecContext(ctx, + `UPDATE hosts SET applied_schedule_version = ? WHERE id = ?`, + version, hostID) + if err != nil { + return fmt.Errorf("store: set applied schedule version: %w", err) + } + return nil +} + +// bumpHostScheduleVersionTx upserts host_schedule_version, +1 each +// call. Caller owns the tx. +func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) error { + if _, err := tx.ExecContext(ctx, + `INSERT INTO host_schedule_version (host_id, version) + VALUES (?, 1) + ON CONFLICT(host_id) DO UPDATE SET version = version + 1`, + hostID); err != nil { + return fmt.Errorf("store: bump schedule version: %w", err) + } + return nil +} + +// ----- scan helpers -------------------------------------------------- + +func scanSchedule(row *sql.Row) (*Schedule, error) { + return scanScheduleRow(row) +} + +type scheduleScanner interface { + Scan(dest ...any) error +} + +func scanScheduleRow(s scheduleScanner) (*Schedule, error) { + var ( + out Schedule + paths, excludes, tags, retention, options string + createdAt, updatedAt string + enabled int + ) + err := s.Scan(&out.ID, &out.HostID, &out.Kind, &out.CronExpr, + &paths, &excludes, &tags, &retention, &options, + &out.PreHook, &out.PostHook, &enabled, + &createdAt, &updatedAt) + if err != nil { + return nil, err + } + if paths != "" { + _ = json.Unmarshal([]byte(paths), &out.Paths) + } + if excludes != "" { + _ = json.Unmarshal([]byte(excludes), &out.Excludes) + } + if tags != "" { + _ = json.Unmarshal([]byte(tags), &out.Tags) + } + if retention != "" { + _ = json.Unmarshal([]byte(retention), &out.RetentionPolicy) + } + if options != "" { + _ = json.Unmarshal([]byte(options), &out.Options) + } + out.Enabled = enabled != 0 + if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil { + out.CreatedAt = t + } + if t, err := time.Parse(time.RFC3339Nano, updatedAt); err == nil { + out.UpdatedAt = t + } + return &out, nil +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/internal/store/schedules_test.go b/internal/store/schedules_test.go new file mode 100644 index 0000000..10262f6 --- /dev/null +++ b/internal/store/schedules_test.go @@ -0,0 +1,122 @@ +package store + +import ( + "context" + "errors" + "testing" + "time" +) + +// makeSchedHost is a minimal host row to hang schedule tests off. +func makeSchedHost(t *testing.T, s *Store) string { + t.Helper() + const id = "01HSCHEDHOST0000000000000" + if err := s.CreateHost(context.Background(), Host{ + ID: id, Name: "sched-host", OS: "linux", Arch: "amd64", + AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, + EnrolledAt: time.Now().UTC(), + }, "tokenhash", ""); err != nil { + t.Fatalf("create host: %v", err) + } + return id +} + +func TestSchedulesCRUDAndVersionBump(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + + // Initial version is 0 (no row). + v, err := s.GetHostScheduleVersion(ctx, hostID) + if err != nil { + t.Fatal(err) + } + if v != 0 { + t.Fatalf("initial version: got %d, want 0", v) + } + + keepLast := 7 + sched := Schedule{ + ID: "01SCHED000000000000000001", HostID: hostID, + Kind: "backup", CronExpr: "0 3 * * *", + Paths: []string{"/etc", "/home"}, + Tags: []string{"nightly"}, + RetentionPolicy: RetentionPolicy{KeepLast: &keepLast}, + Enabled: true, + } + if err := s.CreateSchedule(ctx, &sched); err != nil { + t.Fatalf("create: %v", err) + } + if sched.CreatedAt.IsZero() || sched.UpdatedAt.IsZero() { + t.Fatalf("Create should populate timestamps") + } + + v, _ = s.GetHostScheduleVersion(ctx, hostID) + if v != 1 { + t.Fatalf("version after create: got %d, want 1", v) + } + + // Round-trip read. + got, err := s.GetSchedule(ctx, hostID, sched.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.CronExpr != "0 3 * * *" || len(got.Paths) != 2 { + t.Fatalf("round-trip lost data: %+v", got) + } + if got.RetentionPolicy.KeepLast == nil || *got.RetentionPolicy.KeepLast != 7 { + t.Fatalf("retention round-trip: %+v", got.RetentionPolicy) + } + + // List sees it. + list, err := s.ListSchedulesByHost(ctx, hostID) + if err != nil || len(list) != 1 || list[0].ID != sched.ID { + t.Fatalf("list: err=%v rows=%v", err, list) + } + + // Update bumps version. + sched.CronExpr = "*/30 * * * *" + sched.Enabled = false + if err := s.UpdateSchedule(ctx, &sched); err != nil { + t.Fatalf("update: %v", err) + } + v, _ = s.GetHostScheduleVersion(ctx, hostID) + if v != 2 { + t.Fatalf("version after update: got %d, want 2", v) + } + got, _ = s.GetSchedule(ctx, hostID, sched.ID) + if got.CronExpr != "*/30 * * * *" || got.Enabled { + t.Fatalf("update did not persist: %+v", got) + } + + // Delete bumps version, returns ErrNotFound on second try. + if err := s.DeleteSchedule(ctx, hostID, sched.ID); err != nil { + t.Fatalf("delete: %v", err) + } + v, _ = s.GetHostScheduleVersion(ctx, hostID) + if v != 3 { + t.Fatalf("version after delete: got %d, want 3", v) + } + if err := s.DeleteSchedule(ctx, hostID, sched.ID); !errors.Is(err, ErrNotFound) { + t.Fatalf("delete after delete: want ErrNotFound, got %v", err) + } +} + +func TestSetHostAppliedScheduleVersion(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + + if err := s.SetHostAppliedScheduleVersion(ctx, hostID, 42); err != nil { + t.Fatal(err) + } + host, err := s.GetHost(ctx, hostID) + if err != nil { + t.Fatal(err) + } + if host.AppliedScheduleVersion != 42 { + t.Fatalf("got %d, want 42", host.AppliedScheduleVersion) + } +} diff --git a/internal/store/types.go b/internal/store/types.go index 5b09732..dc77b77 100644 --- a/internal/store/types.go +++ b/internal/store/types.go @@ -70,6 +70,47 @@ type Host struct { RepoInitialisedAt *time.Time } +// Schedule mirrors one row of the schedules table. JSON columns +// (paths, excludes, tags, retention_policy, options) are decoded +// into Go-native shapes for ergonomics; the wire form on the agent +// side keeps retention_policy / options as raw JSON since the agent +// just forwards them to restic. +type Schedule struct { + ID string + HostID string + Kind string + CronExpr string + Paths []string + Excludes []string + Tags []string + RetentionPolicy RetentionPolicy + Options ScheduleOptions + PreHook string + PostHook string + Enabled bool + CreatedAt time.Time + UpdatedAt time.Time +} + +// RetentionPolicy is the typed view of `restic forget --keep-*`. +// All fields nullable so empty == "no policy / keep everything". +type RetentionPolicy struct { + KeepLast *int `json:"keep_last,omitempty"` + KeepHourly *int `json:"keep_hourly,omitempty"` + KeepDaily *int `json:"keep_daily,omitempty"` + KeepWeekly *int `json:"keep_weekly,omitempty"` + KeepMonthly *int `json:"keep_monthly,omitempty"` + KeepYearly *int `json:"keep_yearly,omitempty"` +} + +// ScheduleOptions covers per-schedule knobs that aren't core to the +// command itself — currently bandwidth caps. Stored as JSON so +// future fields don't churn the schema. +type ScheduleOptions struct { + LimitUploadKBps *int `json:"limit_upload_kbps,omitempty"` + LimitDownloadKBps *int `json:"limit_download_kbps,omitempty"` +} + // EnrollmentToken is the issuer's view of a one-time token. The // raw token is returned only at create time; the DB stores its hash. type EnrollmentToken struct { diff --git a/tasks.md b/tasks.md index 677220c..a2c2fc9 100644 --- a/tasks.md +++ b/tasks.md @@ -97,7 +97,7 @@ Sizes: **S** = under a day, **M** = 1–3 days, **L** = 3–7 days. ## Phase 2 — Scheduling, retention, repo operations -- [ ] **P2-01** (M) Schedule schema + CRUD API +- [x] **P2-01** (M) Schedule schema + CRUD API. `schedules` table was already laid down in 0001; this slice adds `store.Schedule`/`RetentionPolicy`/`ScheduleOptions` types, `CreateSchedule` / `GetSchedule` / `ListSchedulesByHost` / `UpdateSchedule` / `DeleteSchedule` / `GetHostScheduleVersion` / `SetHostAppliedScheduleVersion` (mutations bump `host_schedule_version` atomically in-tx), and REST endpoints `GET|POST /api/hosts/{id}/schedules` + `PUT|DELETE /api/hosts/{id}/schedules/{sid}`. Validation: cron-expr parses via `robfig/cron/v3` (same parser the agent will use, so anything that validates here will fire there); kind ∈ {backup, forget, prune, check} (init/unlock are operator-only); backup schedules require ≥1 path; hooks rejected on non-backup kinds (spec §14.3). Mutations audit-logged. Server + store tests cover the happy path, validation, and version bumps. - [ ] **P2-02** (L) Server-pushed schedule reconciliation (server is source of truth; agent applies) - [ ] **P2-03** (M) Agent local scheduler (`robfig/cron/v3`); persists next-fire times across restarts - [ ] **P2-04** (M) Schedule editor UI (paths, excludes, tags, cron, retention)