From 5667cdf13a1034725e853df452b2dd5687a6dc00 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sat, 2 May 2026 21:30:41 +0100 Subject: [PATCH] =?UTF-8?q?P2=20redesign=20=C2=B7=20phase=202:=20store=20r?= =?UTF-8?q?ewrite=20=E2=80=94=20sources,=20slim=20schedules,=20repo=20main?= =?UTF-8?q?tenance?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Go-side data model rebuilt against migration 0008. The fat-Schedule shape (paths/excludes/tags/retention/manual/kind/options/hooks) is gone; that surface lives on source_groups now. * store/types.go - Schedule slimmed to {id, host_id, cron, enabled, source_group_ids, timestamps}. SourceGroupIDs populated by Get/List, accepted on Create/Update so callers pass desired junction state in one shape. - SourceGroup added: name (= snapshot tag), includes/excludes, retention_policy, retry_max + retry_backoff_seconds, cached conflict_dimension. - HostRepoMaintenance added: forget/prune/check cadences + enabled. - PendingRun added: offline-retry queue. - Host loses RepoInitialisedAt; gains BandwidthUpKBps + BandwidthDownKBps. - RetentionPolicy moves home from "schedule field" to "source group field" but the type itself + Summary() method unchanged. * store/sources.go (new) — CRUD + GetByName + ConflictDimension cache. Group writes bump host_schedule_version; conflict cache writes don't (server-internal projection, agent doesn't see it). * store/maintenance.go (new) — CreateDefault is idempotent (INSERT OR IGNORE). UpdateRepoMaintenance doesn't bump schedule version because these run on the server's own ticker, not the agent's local cron. * store/pending.go (new) — Enqueue / DueRunsForRetry / Bump / Delete. * store/schedules.go — rewritten for slim shape + junction CRUD. Update wipes the schedule_source_groups junction wholesale and re-inserts (simpler than diffing). Adds SchedulesUsingGroup for retention-conflict detection + UI labels. * store/hosts.go — drops repo_initialised_at scan, adds bandwidth scan. New SetHostBandwidth helper. * HTTP layer — temporarily stubbed during this rewrite (501 returns with redesign_in_progress error code). Phase 3 fills these in against the new shape: - schedules.go REST CRUD - schedule_push.go agent reconciliation - ui_schedules.go HTML form CRUD Run-now-per-host + Init-repo handlers in ui_handlers.go also stubbed — both go away in the new model (Run-now per source group; auto-init at host enrolment). * enrollment.go — replaces "seed manual schedule from typed paths" with "seed default source group + repo-maintenance row." The default group gets the typed paths as its includes; operator edits later via Sources tab. * ws/handler.go — drops the MarkHostRepoInitialised projection (column is gone; auto-init makes it derivable from latest init job's status). Tests: * store: existing schedule test rewritten for slim shape + junction; new sources_test.go covers source-group CRUD, name uniqueness, conflict cache, repo-maintenance defaults + idempotent seed, pending-runs queue lifecycle. * http: schedules_test.go and schedule_push_test.go deleted — both exercised the obsolete fat-schedule API. Phase 3 rewrites them against the new endpoints. go test ./... green. cmd/server + cmd/agent build. The UI is broken end-to-end (schedules / sources / repo tabs all hit 501 stubs); Phase 3 restores REST + on-the-wire reconciliation; Phase 4 rewires the UI templates against the new model. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/server/http/enrollment.go | 38 +- internal/server/http/schedule_push.go | 266 +---------- internal/server/http/schedule_push_test.go | 304 ------------ internal/server/http/schedules.go | 311 +----------- internal/server/http/schedules_test.go | 190 -------- internal/server/http/ui_handlers.go | 150 +----- internal/server/http/ui_schedules.go | 526 +-------------------- internal/server/ws/handler.go | 22 +- internal/store/hosts.go | 51 +- internal/store/maintenance.go | 74 +++ internal/store/pending.go | 103 ++++ internal/store/schedules.go | 228 +++++---- internal/store/schedules_test.go | 98 ++-- internal/store/sources.go | 261 ++++++++++ internal/store/sources_test.go | 221 +++++++++ internal/store/types.go | 161 ++++--- 16 files changed, 1076 insertions(+), 1928 deletions(-) delete mode 100644 internal/server/http/schedule_push_test.go delete mode 100644 internal/server/http/schedules_test.go create mode 100644 internal/store/maintenance.go create mode 100644 internal/store/pending.go create mode 100644 internal/store/sources.go create mode 100644 internal/store/sources_test.go diff --git a/internal/server/http/enrollment.go b/internal/server/http/enrollment.go index cc1b542..e0f3f26 100644 --- a/internal/server/http/enrollment.go +++ b/internal/server/http/enrollment.go @@ -140,26 +140,24 @@ func (s *Server) handleAgentEnroll(w stdhttp.ResponseWriter, r *stdhttp.Request) return } - // Seed an initial manual schedule from whatever paths the - // operator typed into Add-host. The schedule is editable from - // the host's Schedules tab; the operator can add automated - // schedules alongside it later. We skip this when no paths - // were supplied — the host can still enrol; it just can't - // back up until the operator adds a schedule. - if len(attachments.InitialPaths) > 0 { - seed := store.Schedule{ - ID: ulid.Make().String(), - HostID: hostID, - Kind: string(api.JobBackup), - CronExpr: "", - Paths: attachments.InitialPaths, - Enabled: true, - Manual: true, - } - if err := s.deps.Store.CreateSchedule(r.Context(), &seed); err != nil { - slog.Warn("enrollment: seed manual schedule failed", - "host_id", hostID, "err", err) - } + // Seed the host's "default" source group with whatever paths the + // operator typed into Add-host (empty allowed; group is editable + // from the Sources tab post-enrol). Also seed the host's + // repo-maintenance row with default cadences so forget/prune/check + // start ticking on their own. Auto-init dispatch lands in Phase 6 + // of the redesign. + if err := s.deps.Store.CreateSourceGroup(r.Context(), &store.SourceGroup{ + ID: ulid.Make().String(), + HostID: hostID, + Name: "default", + Includes: attachments.InitialPaths, + }); err != nil { + slog.Warn("enrollment: seed default source group failed", + "host_id", hostID, "err", err) + } + if err := s.deps.Store.CreateDefaultRepoMaintenance(r.Context(), hostID); err != nil { + slog.Warn("enrollment: seed repo maintenance failed", + "host_id", hostID, "err", err) } // Promote the encrypted repo creds onto the freshly-created host diff --git a/internal/server/http/schedule_push.go b/internal/server/http/schedule_push.go index 8c45ccb..ff3ae8d 100644 --- a/internal/server/http/schedule_push.go +++ b/internal/server/http/schedule_push.go @@ -2,258 +2,36 @@ package http import ( "context" - "encoding/json" - "errors" "log/slog" "time" - "github.com/oklog/ulid/v2" - - "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" - "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) -// loadScheduleSetPayload reads the host's current schedule set + the -// canonical version into a wire-shape payload. Returns an empty -// (but well-formed) payload with version 0 if the host has nothing -// scheduled yet — that's still a valid state to push so the agent -// drops any stale cron entries from a previous deployment. -func (s *Server) loadScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) { - rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) - if err != nil { - return api.ScheduleSetPayload{}, err - } - version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) - if err != nil { - return api.ScheduleSetPayload{}, err - } - out := api.ScheduleSetPayload{ - Version: version, - Schedules: make([]api.Schedule, 0, len(rows)), - } - for _, r := range rows { - retJSON, _ := json.Marshal(r.RetentionPolicy) - optJSON, _ := json.Marshal(r.Options) - out.Schedules = append(out.Schedules, api.Schedule{ - ID: r.ID, - Kind: api.JobKind(r.Kind), - CronExpr: r.CronExpr, - Paths: r.Paths, - Excludes: r.Excludes, - Tags: r.Tags, - RetentionPolicy: retJSON, - Options: optJSON, - PreHook: r.PreHook, - PostHook: r.PostHook, - Enabled: r.Enabled, - Manual: r.Manual, - }) - } - return out, nil -} - -// pushScheduleSet ships the current schedule list to the agent over -// the hub. Caller has already determined the agent is connected. -// Errors are logged and returned; the next push (or the next hello) -// will retry. Idempotent — sending the same version twice is a -// harmless no-op on the agent side. -func (s *Server) pushScheduleSet(ctx context.Context, hostID string) error { - pl, err := s.loadScheduleSetPayload(ctx, hostID) - if err != nil { - slog.Warn("push schedule.set: load failed", "host_id", hostID, "err", err) - return err - } - env, err := api.Marshal(api.MsgScheduleSet, "", pl) - if err != nil { - return err - } - sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { - slog.Warn("push schedule.set: hub send failed", - "host_id", hostID, "version", pl.Version, "err", err) - return err - } - return nil -} - -// pushScheduleSetOnConn is the on-hello flavour: writes directly to -// the just-handshaken conn rather than racing through the hub. Used -// by onAgentHello so a brand-new connection can't miss an early -// push because Register hasn't completed yet. -func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) { - pl, err := s.loadScheduleSetPayload(ctx, hostID) - if err != nil { - slog.Warn("on-hello: load schedules", "host_id", hostID, "err", err) - return - } - env, err := api.Marshal(api.MsgScheduleSet, "", pl) - if err != nil { - slog.Error("on-hello: marshal schedule.set", "host_id", hostID, "err", err) - return - } - sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := conn.Send(sendCtx, env); err != nil { - slog.Warn("on-hello: send schedule.set", - "host_id", hostID, "version", pl.Version, "err", err) - } -} - -// pushIfConnected dispatches a schedule.set push asynchronously when -// the agent is online. Used by CRUD handlers: a missed push is -// non-fatal because the next reconnect's on-hello path will catch -// the agent up. Decoupled from the request so the operator's HTTP -// response doesn't wait on the WS round-trip. -func (s *Server) pushScheduleSetAsync(hostID string) { - if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) { - return - } - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _ = s.pushScheduleSet(ctx, hostID) - }() -} - -// applyScheduleAck records the version the agent has confirmed via -// schedule.ack. Called from the WS dispatcher (wired below). A bad -// version (zero, or higher than what we've issued) is logged but -// not fatal — the next push will set the agent straight. -func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) { - if version <= 0 { - return - } - canonical, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID) - if err != nil { - slog.Warn("schedule.ack: load canonical version", - "host_id", hostID, "err", err) - return - } - if version > canonical { - slog.Warn("schedule.ack: agent reported version ahead of server", - "host_id", hostID, "agent", version, "server", canonical) - return - } - if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil { - slog.Warn("schedule.ack: persist applied version", - "host_id", hostID, "version", version, "err", err) - return - } - slog.Info("schedule.ack: applied", - "host_id", hostID, "version", version, "applied_at", appliedAt) -} - -// dispatchScheduledJob is invoked when the agent reports a local -// cron fire via `schedule.fire`. Thin wrapper around the shared -// dispatcher; logs and discards the return values since the agent -// can't usefully act on them. -func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, _ *ws.Conn, scheduleID string, scheduledAt time.Time) { - jobID, err := s.dispatchScheduleNow(ctx, hostID, scheduleID, nil) - if err != nil { - slog.Warn("schedule.fire: dispatch failed", - "host_id", hostID, "schedule_id", scheduleID, "err", err) - return - } - slog.Info("schedule.fire: dispatched", - "host_id", hostID, "schedule_id", scheduleID, - "job_id", jobID, "scheduled_at", scheduledAt) -} - -// dispatchScheduleNow looks up a schedule, builds a CommandRunPayload, -// persists a jobs row (actor_kind=schedule, scheduled_id linking -// back), and ships MsgCommandRun to the host. Used by both the -// agent-driven path (cron fire reaches us as schedule.fire) and the -// UI-driven path (operator clicks Run-now on a schedule row). +// schedule_push.go — server → agent reconciliation push. // -// conn is optional: when set we write directly through it (no race -// against an in-flight Register). When nil we fall back to Hub.Send. -// Returns the new job_id on success. -func (s *Server) dispatchScheduleNow(ctx context.Context, hostID, scheduleID string, conn *ws.Conn) (string, error) { - sched, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - return "", errFmtf("schedule not found") - } - return "", errFmtf("internal: %s", err) - } - if !sched.Enabled { - return "", errFmtf("schedule is disabled") - } +// Stubbed during the P2 redesign rewrite. The new wire shape (slim +// schedules referencing source groups; groups inline by id at push +// time) lands in Phase 3. Until then on-hello and post-CRUD pushes +// are no-ops; the agent will keep its existing cron entries (none, +// since there are no schedules yet) and the only operator-driven +// jobs flow via run-now once the new UI is wired in Phase 4. - var args []string - if sched.Kind == string(api.JobBackup) { - args = append(args, sched.Paths...) - } - - // forget jobs need the retention policy on the wire — restic - // refuses to run without keep-* flags, and the agent doesn't - // hold a copy of the schedule (server is the source of truth). - var retentionJSON json.RawMessage - if sched.Kind == string(api.JobForget) { - if sched.RetentionPolicy == (store.RetentionPolicy{}) { - return "", errFmtf("schedule has no retention policy — refusing to forget (would delete every snapshot)") - } - b, err := json.Marshal(sched.RetentionPolicy) - if err != nil { - return "", errFmtf("marshal retention policy: %s", err) - } - retentionJSON = b - } - - jobID := ulid.Make().String() - now := time.Now().UTC() - if err := s.deps.Store.CreateJob(ctx, store.Job{ - ID: jobID, - HostID: hostID, - Kind: sched.Kind, - ScheduledID: &sched.ID, - ActorKind: "schedule", - ActorID: &sched.ID, - CreatedAt: now, - }); err != nil { - return "", errFmtf("create job: %s", err) - } - - env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{ - JobID: jobID, - Kind: api.JobKind(sched.Kind), - Args: args, - RetentionPolicy: retentionJSON, - }) - if err != nil { - return "", errFmtf("marshal command.run: %s", err) - } - sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if conn != nil { - if err := conn.Send(sendCtx, env); err != nil { - return "", errFmtf("send command.run: %s", err) - } - } else { - if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil { - return "", errFmtf("send command.run: %s", err) - } - } - - _ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ - ID: ulid.Make().String(), - Actor: "schedule", - Action: "job.run_now", - TargetKind: ptr("job"), - TargetID: &jobID, - TS: now, - }) - return jobID, nil +func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) { + slog.Debug("schedule push: stubbed during P2 redesign", "host_id", hostID) } -// Compile-time guard that the store actually implements the methods -// schedule_push.go calls. Useful when mocking the store in tests. -var _ scheduleStore = (*store.Store)(nil) - -type scheduleStore interface { - ListSchedulesByHost(ctx context.Context, hostID string) ([]store.Schedule, error) - GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) - SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error +func (s *Server) pushScheduleSetAsync(hostID string) { + slog.Debug("schedule push async: stubbed during P2 redesign", "host_id", hostID) +} + +func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) { + if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil { + slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "err", err) + } +} + +func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *ws.Conn, scheduleID string, scheduledAt time.Time) { + slog.Info("schedule.fire: stubbed during P2 redesign", + "host_id", hostID, "schedule_id", scheduleID, "scheduled_at", scheduledAt) } diff --git a/internal/server/http/schedule_push_test.go b/internal/server/http/schedule_push_test.go deleted file mode 100644 index 293b21c..0000000 --- a/internal/server/http/schedule_push_test.go +++ /dev/null @@ -1,304 +0,0 @@ -package http - -import ( - "bytes" - "context" - "encoding/json" - "io" - stdhttp "net/http" - "strings" - "testing" - "time" - - "github.com/coder/websocket" - - "gitea.dcglab.co.uk/steve/restic-manager/internal/api" - "gitea.dcglab.co.uk/steve/restic-manager/internal/auth" - "gitea.dcglab.co.uk/steve/restic-manager/internal/store" -) - -// makePushHost is like makeHTTPHost but mints a known agent token so -// the test can dial /ws/agent as the host. Returns (hostID, raw token). -func makePushHost(t *testing.T, st *store.Store) (string, string) { - t.Helper() - const id = "01HSCHEDPUSH00000000000000" - tok, _ := auth.NewToken() - if err := st.CreateHost(context.Background(), store.Host{ - ID: id, Name: "ph", OS: "linux", Arch: "amd64", - AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, - EnrolledAt: time.Now().UTC(), - }, auth.HashToken(tok), ""); err != nil { - t.Fatalf("create host: %v", err) - } - return id, tok -} - -// readUntilType pumps messages from the WS until one of the wanted -// types arrives or ctx times out. Returns the matched envelope. -// Useful because the on-hello path may push several messages -// (config.update first if creds exist, schedule.set, …). -func readUntilType(ctx context.Context, t *testing.T, c *websocket.Conn, want api.MessageType) api.Envelope { - t.Helper() - for { - _, raw, err := c.Read(ctx) - if err != nil { - t.Fatalf("ws read waiting for %s: %v", want, err) - } - var env api.Envelope - if err := json.Unmarshal(raw, &env); err != nil { - t.Fatalf("envelope: %v (raw=%s)", err, raw) - } - t.Logf("recv: type=%s payload=%s", env.Type, env.Payload) - if env.Type == want { - return env - } - } -} - -func TestSchedulePushOnHelloAndAckRoundtrip(t *testing.T) { - t.Parallel() - srv, url, st := newTestServerWithHub(t) - _ = srv - cookie := loginAndCookie(t, url) - hostID, agentToken := makePushHost(t, st) - - // Pre-populate one schedule so we have something to push. - body, _ := json.Marshal(scheduleAPI{ - Kind: "backup", - CronExpr: "@hourly", - Paths: []string{"/etc"}, - Enabled: true, - }) - req, _ := stdhttp.NewRequest("POST", url+"/api/hosts/"+hostID+"/schedules", - bytes.NewReader(body)) - req.AddCookie(cookie) - req.Header.Set("Content-Type", "application/json") - res, err := stdhttp.DefaultClient.Do(req) - if err != nil { - t.Fatalf("create schedule: %v", err) - } - got, _ := io.ReadAll(res.Body) - res.Body.Close() - if res.StatusCode != stdhttp.StatusCreated { - t.Fatalf("create schedule: %d %s", res.StatusCode, got) - } - var created scheduleAPI - _ = json.Unmarshal(got, &created) - - // Dial the WS as the agent and send hello. - wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent" - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ - HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}}, - }) - if err != nil { - t.Fatalf("dial: %v", err) - } - defer c.CloseNow() - - helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{ - ProtocolVersion: api.CurrentProtocolVersion, - AgentVersion: "test", ResticVersion: "test", - Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64, - }) - raw, _ := json.Marshal(helloEnv) - if err := c.Write(ctx, websocket.MessageText, raw); err != nil { - t.Fatalf("write hello: %v", err) - } - - // Server should push schedule.set (our host has no creds, so the - // config.update branch is silently skipped). - pushedEnv := readUntilType(ctx, t, c, api.MsgScheduleSet) - var pushed api.ScheduleSetPayload - if err := pushedEnv.UnmarshalPayload(&pushed); err != nil { - t.Fatalf("decode payload: %v", err) - } - if pushed.Version != 1 { - t.Fatalf("pushed version: got %d, want 1", pushed.Version) - } - if len(pushed.Schedules) != 1 || pushed.Schedules[0].ID != created.ID { - t.Fatalf("pushed schedules: %+v", pushed.Schedules) - } - if pushed.Schedules[0].CronExpr != "@hourly" || len(pushed.Schedules[0].Paths) != 1 { - t.Fatalf("schedule contents: %+v", pushed.Schedules[0]) - } - - // Ack the version. Server should record it on the host row. - ackEnv, _ := api.Marshal(api.MsgScheduleAck, "", api.ScheduleAckPayload{ - Version: pushed.Version, - AppliedAt: time.Now().UTC(), - }) - raw, _ = json.Marshal(ackEnv) - if err := c.Write(ctx, websocket.MessageText, raw); err != nil { - t.Fatalf("write ack: %v", err) - } - - // Wait for applied_schedule_version to flip. - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - h, err := st.GetHost(context.Background(), hostID) - if err == nil && h.AppliedScheduleVersion == pushed.Version { - return - } - time.Sleep(20 * time.Millisecond) - } - h, _ := st.GetHost(context.Background(), hostID) - t.Fatalf("applied_schedule_version did not advance: got %d, want %d", - h.AppliedScheduleVersion, pushed.Version) -} - -func TestScheduleFireDispatchesCommandRun(t *testing.T) { - t.Parallel() - srv, url, st := newTestServerWithHub(t) - _ = srv - cookie := loginAndCookie(t, url) - hostID, agentToken := makePushHost(t, st) - - // Pre-create one backup schedule. - body, _ := json.Marshal(scheduleAPI{ - Kind: "backup", CronExpr: "@hourly", - Paths: []string{"/etc/hostname"}, Enabled: true, - }) - req, _ := stdhttp.NewRequest("POST", - url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(body)) - req.AddCookie(cookie) - req.Header.Set("Content-Type", "application/json") - res, err := stdhttp.DefaultClient.Do(req) - if err != nil { - t.Fatalf("create: %v", err) - } - got, _ := io.ReadAll(res.Body) - res.Body.Close() - var created scheduleAPI - _ = json.Unmarshal(got, &created) - - // Connect as the agent. - wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent" - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ - HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}}, - }) - if err != nil { - t.Fatalf("dial: %v", err) - } - defer c.CloseNow() - - helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{ - ProtocolVersion: api.CurrentProtocolVersion, - AgentVersion: "test", ResticVersion: "test", - Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64, - }) - raw, _ := json.Marshal(helloEnv) - _ = c.Write(ctx, websocket.MessageText, raw) - - // Drain the on-hello schedule.set. - _ = readUntilType(ctx, t, c, api.MsgScheduleSet) - - // Pretend our local cron just fired this schedule. - fireEnv, _ := api.Marshal(api.MsgScheduleFire, "", api.ScheduleFirePayload{ - ScheduleID: created.ID, - ScheduledAt: time.Now().UTC(), - }) - raw, _ = json.Marshal(fireEnv) - if err := c.Write(ctx, websocket.MessageText, raw); err != nil { - t.Fatalf("write fire: %v", err) - } - - // Server should respond with command.run. - cmdEnv := readUntilType(ctx, t, c, api.MsgCommandRun) - var cmd api.CommandRunPayload - if err := cmdEnv.UnmarshalPayload(&cmd); err != nil { - t.Fatalf("decode command.run: %v", err) - } - if cmd.JobID == "" || cmd.Kind != api.JobBackup { - t.Fatalf("command.run: %+v", cmd) - } - if len(cmd.Args) != 1 || cmd.Args[0] != "/etc/hostname" { - t.Fatalf("command.run args: %+v", cmd.Args) - } - - // Verify the job row landed with actor_kind=schedule. - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - var actorKind, scheduledID string - row := st.DB().QueryRowContext(context.Background(), - `SELECT actor_kind, COALESCE(scheduled_id,'') FROM jobs WHERE id = ?`, - cmd.JobID) - if err := row.Scan(&actorKind, &scheduledID); err == nil { - if actorKind != "schedule" { - t.Fatalf("job actor_kind: %q", actorKind) - } - if scheduledID != created.ID { - t.Fatalf("job scheduled_id: %q want %q", scheduledID, created.ID) - } - return - } - time.Sleep(20 * time.Millisecond) - } - t.Fatalf("job row %s never landed", cmd.JobID) -} - -func TestSchedulePushOnCRUD(t *testing.T) { - t.Parallel() - srv, url, st := newTestServerWithHub(t) - _ = srv - cookie := loginAndCookie(t, url) - hostID, agentToken := makePushHost(t, st) - - // Connect first so the CRUD push has somewhere to land. - wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent" - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ - HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}}, - }) - if err != nil { - t.Fatalf("dial: %v", err) - } - defer c.CloseNow() - - helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{ - ProtocolVersion: api.CurrentProtocolVersion, - AgentVersion: "test", ResticVersion: "test", - Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64, - }) - raw, _ := json.Marshal(helloEnv) - _ = c.Write(ctx, websocket.MessageText, raw) - - // Drain the on-hello schedule.set (will be version 0, empty list). - first := readUntilType(ctx, t, c, api.MsgScheduleSet) - var initial api.ScheduleSetPayload - _ = first.UnmarshalPayload(&initial) - if initial.Version != 0 || len(initial.Schedules) != 0 { - t.Fatalf("initial push: %+v", initial) - } - - // Now create a schedule via REST. The handler should fire a - // schedule.set push asynchronously. - body, _ := json.Marshal(scheduleAPI{ - Kind: "backup", CronExpr: "*/30 * * * *", - Paths: []string{"/var/lib"}, Enabled: true, - }) - req, _ := stdhttp.NewRequest("POST", - url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(body)) - req.AddCookie(cookie) - req.Header.Set("Content-Type", "application/json") - res, err := stdhttp.DefaultClient.Do(req) - if err != nil { - t.Fatalf("create: %v", err) - } - res.Body.Close() - if res.StatusCode != stdhttp.StatusCreated { - t.Fatalf("create: %d", res.StatusCode) - } - - // Wait for the pushed schedule.set with version 1. - pushed := readUntilType(ctx, t, c, api.MsgScheduleSet) - var pl api.ScheduleSetPayload - _ = pushed.UnmarshalPayload(&pl) - if pl.Version != 1 || len(pl.Schedules) != 1 { - t.Fatalf("push after create: %+v", pl) - } -} diff --git a/internal/server/http/schedules.go b/internal/server/http/schedules.go index eecf9c0..6383a03 100644 --- a/internal/server/http/schedules.go +++ b/internal/server/http/schedules.go @@ -1,310 +1,37 @@ package http import ( - "encoding/json" - "errors" stdhttp "net/http" - "strings" - - "github.com/go-chi/chi/v5" - "github.com/oklog/ulid/v2" - "github.com/robfig/cron/v3" - - "gitea.dcglab.co.uk/steve/restic-manager/internal/api" - "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) -// scheduleAPI is the JSON shape for /api/hosts/{id}/schedules. We -// avoid leaking host_id in the body since it's already in the URL, -// and we render booleans + retention as typed JSON rather than -// strings so the UI can edit fields directly. -type scheduleAPI struct { - ID string `json:"id,omitempty"` - Kind api.JobKind `json:"kind"` - CronExpr string `json:"cron_expr"` - Paths []string `json:"paths"` - Excludes []string `json:"excludes"` - Tags []string `json:"tags"` - RetentionPolicy store.RetentionPolicy `json:"retention_policy"` - Options store.ScheduleOptions `json:"options"` - PreHook string `json:"pre_hook,omitempty"` - PostHook string `json:"post_hook,omitempty"` - Enabled bool `json:"enabled"` - // Manual = no cron, fires only when the operator triggers a - // run-now. Cron expr is ignored when this is true. - Manual bool `json:"manual"` - CreatedAt string `json:"created_at,omitempty"` - UpdatedAt string `json:"updated_at,omitempty"` -} - -// listSchedulesResp wraps the array so the response is forward- -// compatible (we may want to add a top-level `version` later). -type listSchedulesResp struct { - Version int64 `json:"version"` - Schedules []scheduleAPI `json:"schedules"` -} - -// cron parser used for input validation. Standard 5-field syntax -// with descriptors (@hourly etc.) — same parser the agent will -// run against, so a schedule that validates here will fire there. -var cronParser = cron.NewParser( - cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, -) +// schedules.go — REST API for /api/hosts/{id}/schedules. +// +// Stubbed during the P2 redesign data-model rewrite (commit chain). +// Phase 2 dropped the fat Schedule shape (paths/excludes/tags/ +// retention/manual/kind/options/hooks) — the slim Schedule + source +// groups model lives in store/. Phase 3 of the redesign will fill in +// these handlers against the new shape. +// +// Returning 501 here keeps the routes addressable; UI calls will +// surface the unimplemented state via the toast component until the +// new handlers land. func (s *Server) handleListSchedules(w stdhttp.ResponseWriter, r *stdhttp.Request) { - if _, ok := s.requireUser(r); !ok { - writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") - return - } - hostID := chi.URLParam(r, "id") - if hostID == "" { - writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "") - return - } - if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil { - if errors.Is(err, store.ErrNotFound) { - writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "") - return - } - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") - return - } - rows, err := s.deps.Store.ListSchedulesByHost(r.Context(), hostID) - if err != nil { - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") - return - } - version, err := s.deps.Store.GetHostScheduleVersion(r.Context(), hostID) - if err != nil { - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") - return - } - out := listSchedulesResp{Version: version, Schedules: make([]scheduleAPI, len(rows))} - for i, row := range rows { - out.Schedules[i] = toScheduleAPI(row) - } - writeJSON(w, stdhttp.StatusOK, out) + writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress", + "schedule REST API is being rebuilt — see P2 redesign Phase 3") } func (s *Server) handleCreateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { - user, ok := s.requireUser(r) - if !ok { - writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") - return - } - hostID := chi.URLParam(r, "id") - if hostID == "" { - writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "") - return - } - if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil { - if errors.Is(err, store.ErrNotFound) { - writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "") - return - } - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") - return - } - - var req scheduleAPI - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error()) - return - } - if code, msg := validateSchedule(&req); code != "" { - writeJSONError(w, stdhttp.StatusBadRequest, code, msg) - return - } - - row := store.Schedule{ - ID: ulid.Make().String(), - HostID: hostID, - Kind: string(req.Kind), - CronExpr: req.CronExpr, - Paths: req.Paths, - Excludes: req.Excludes, - Tags: req.Tags, - RetentionPolicy: req.RetentionPolicy, - Options: req.Options, - PreHook: req.PreHook, - PostHook: req.PostHook, - Enabled: req.Enabled, - } - if err := s.deps.Store.CreateSchedule(r.Context(), &row); err != nil { - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) - return - } - - _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ - ID: ulid.Make().String(), - UserID: &user.ID, - Actor: "user", - Action: "schedule.created", - TargetKind: ptr("schedule"), - TargetID: &row.ID, - TS: nowUTC(), - }) - s.pushScheduleSetAsync(hostID) - writeJSON(w, stdhttp.StatusCreated, toScheduleAPI(row)) + writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress", + "schedule REST API is being rebuilt — see P2 redesign Phase 3") } func (s *Server) handleUpdateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { - user, ok := s.requireUser(r) - if !ok { - writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") - return - } - hostID := chi.URLParam(r, "id") - scheduleID := chi.URLParam(r, "sid") - if hostID == "" || scheduleID == "" { - writeJSONError(w, stdhttp.StatusBadRequest, "missing_id", "") - return - } - existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "") - return - } - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") - return - } - - var req scheduleAPI - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error()) - return - } - // Kind is immutable; ignore whatever was sent and fix up before - // validation so validateSchedule sees the existing kind. - req.Kind = api.JobKind(existing.Kind) - if code, msg := validateSchedule(&req); code != "" { - writeJSONError(w, stdhttp.StatusBadRequest, code, msg) - return - } - - existing.CronExpr = req.CronExpr - existing.Paths = req.Paths - existing.Excludes = req.Excludes - existing.Tags = req.Tags - existing.RetentionPolicy = req.RetentionPolicy - existing.Options = req.Options - existing.PreHook = req.PreHook - existing.PostHook = req.PostHook - existing.Enabled = req.Enabled - - if err := s.deps.Store.UpdateSchedule(r.Context(), existing); err != nil { - if errors.Is(err, store.ErrNotFound) { - writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "") - return - } - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) - return - } - - _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ - ID: ulid.Make().String(), - UserID: &user.ID, - Actor: "user", - Action: "schedule.updated", - TargetKind: ptr("schedule"), - TargetID: &existing.ID, - TS: nowUTC(), - }) - s.pushScheduleSetAsync(hostID) - writeJSON(w, stdhttp.StatusOK, toScheduleAPI(*existing)) + writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress", + "schedule REST API is being rebuilt — see P2 redesign Phase 3") } func (s *Server) handleDeleteSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { - user, ok := s.requireUser(r) - if !ok { - writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") - return - } - hostID := chi.URLParam(r, "id") - scheduleID := chi.URLParam(r, "sid") - if hostID == "" || scheduleID == "" { - writeJSONError(w, stdhttp.StatusBadRequest, "missing_id", "") - return - } - if err := s.deps.Store.DeleteSchedule(r.Context(), hostID, scheduleID); err != nil { - if errors.Is(err, store.ErrNotFound) { - writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "") - return - } - writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) - return - } - _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ - ID: ulid.Make().String(), - UserID: &user.ID, - Actor: "user", - Action: "schedule.deleted", - TargetKind: ptr("schedule"), - TargetID: &scheduleID, - TS: nowUTC(), - }) - s.pushScheduleSetAsync(hostID) - w.WriteHeader(stdhttp.StatusNoContent) -} - -// validateSchedule rejects malformed inputs with a stable error code. -// Returns ("", "") on success. -func validateSchedule(s *scheduleAPI) (code, msg string) { - switch s.Kind { - case api.JobBackup, api.JobForget, api.JobPrune, api.JobCheck: - // ok — valid schedule kinds (init/unlock are operator-driven only). - default: - return "invalid_kind", "kind must be one of backup|forget|prune|check" - } - if !s.Manual { - if strings.TrimSpace(s.CronExpr) == "" { - return "missing_cron_expr", "cron_expr is required (or set manual=true)" - } - if _, err := cronParser.Parse(s.CronExpr); err != nil { - return "invalid_cron_expr", err.Error() - } - } - if s.Kind == api.JobBackup && len(s.Paths) == 0 { - return "missing_paths", "backup schedules require at least one path" - } - // forget needs at least one keep-* dimension; otherwise restic - // would happily delete every snapshot. - if s.Kind == api.JobForget && (s.RetentionPolicy == store.RetentionPolicy{}) { - return "missing_retention", "forget schedules require at least one Keep-* value" - } - // Hooks are only meaningful on backup schedules (spec §14.3). - if s.Kind != api.JobBackup && (s.PreHook != "" || s.PostHook != "") { - return "hooks_not_allowed", "pre_hook / post_hook only apply to backup schedules" - } - return "", "" -} - -func toScheduleAPI(s store.Schedule) scheduleAPI { - out := scheduleAPI{ - ID: s.ID, - Kind: api.JobKind(s.Kind), - CronExpr: s.CronExpr, - Paths: s.Paths, - Excludes: s.Excludes, - Tags: s.Tags, - RetentionPolicy: s.RetentionPolicy, - Options: s.Options, - PreHook: s.PreHook, - PostHook: s.PostHook, - Enabled: s.Enabled, - Manual: s.Manual, - CreatedAt: s.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), - UpdatedAt: s.UpdatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), - } - if out.Paths == nil { - out.Paths = []string{} - } - if out.Excludes == nil { - out.Excludes = []string{} - } - if out.Tags == nil { - out.Tags = []string{} - } - return out + writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress", + "schedule REST API is being rebuilt — see P2 redesign Phase 3") } diff --git a/internal/server/http/schedules_test.go b/internal/server/http/schedules_test.go deleted file mode 100644 index cc50d0a..0000000 --- a/internal/server/http/schedules_test.go +++ /dev/null @@ -1,190 +0,0 @@ -package http - -import ( - "bytes" - "context" - "encoding/json" - "io" - stdhttp "net/http" - "testing" - "time" - - "gitea.dcglab.co.uk/steve/restic-manager/internal/store" -) - -// loginAndCookie bootstraps an admin, logs in, and returns the -// session cookie ready to attach to subsequent requests. -func loginAndCookie(t *testing.T, url string) *stdhttp.Cookie { - t.Helper() - bs, _ := json.Marshal(bootstrapRequest{ - Token: "test-token", Username: "alice", Password: "averylongpassword", - }) - res, err := stdhttp.Post(url+"/api/bootstrap", "application/json", bytes.NewReader(bs)) - if err != nil { - t.Fatalf("bootstrap: %v", err) - } - res.Body.Close() - - body, _ := json.Marshal(loginRequest{Username: "alice", Password: "averylongpassword"}) - res, err = stdhttp.Post(url+"/api/auth/login", "application/json", bytes.NewReader(body)) - if err != nil { - t.Fatalf("login: %v", err) - } - defer res.Body.Close() - if res.StatusCode != stdhttp.StatusOK { - got, _ := io.ReadAll(res.Body) - t.Fatalf("login: %d %s", res.StatusCode, got) - } - for _, c := range res.Cookies() { - if c.Name == sessionCookieName { - return c - } - } - t.Fatal("no session cookie") - return nil -} - -// makeHTTPHost inserts a host directly via the store so we can hit -// the schedule endpoints without dragging in the enrollment flow. -func makeHTTPHost(t *testing.T, st *store.Store) string { - t.Helper() - const id = "01HSCHEDHTTP000000000000Z" - if err := st.CreateHost(context.Background(), store.Host{ - ID: id, Name: "h", OS: "linux", Arch: "amd64", - AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, - EnrolledAt: time.Now().UTC(), - }, "tokenhash", ""); err != nil { - t.Fatalf("create host: %v", err) - } - return id -} - -func TestSchedulesAPIHappyPath(t *testing.T) { - t.Parallel() - _, url, st := newTestServerWithHub(t) - cookie := loginAndCookie(t, url) - hostID := makeHTTPHost(t, st) - - doReq := func(method, path string, body any, want int) []byte { - t.Helper() - var b []byte - if body != nil { - b, _ = json.Marshal(body) - } - req, _ := stdhttp.NewRequest(method, url+path, bytes.NewReader(b)) - req.AddCookie(cookie) - req.Header.Set("Content-Type", "application/json") - res, err := stdhttp.DefaultClient.Do(req) - if err != nil { - t.Fatalf("%s %s: %v", method, path, err) - } - defer res.Body.Close() - got, _ := io.ReadAll(res.Body) - if res.StatusCode != want { - t.Fatalf("%s %s: status %d (want %d) body=%s", method, path, res.StatusCode, want, got) - } - return got - } - - // Empty list returns version 0. - body := doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK) - var listed listSchedulesResp - if err := json.Unmarshal(body, &listed); err != nil { - t.Fatal(err) - } - if listed.Version != 0 || len(listed.Schedules) != 0 { - t.Fatalf("initial list: %+v", listed) - } - - // Create. - keepLast := 3 - create := scheduleAPI{ - Kind: "backup", CronExpr: "0 */6 * * *", - Paths: []string{"/etc"}, - Tags: []string{"nightly"}, - RetentionPolicy: store.RetentionPolicy{KeepLast: &keepLast}, - Enabled: true, - } - body = doReq("POST", "/api/hosts/"+hostID+"/schedules", create, stdhttp.StatusCreated) - var created scheduleAPI - if err := json.Unmarshal(body, &created); err != nil { - t.Fatal(err) - } - if created.ID == "" || created.CronExpr != create.CronExpr { - t.Fatalf("create returned: %+v", created) - } - - // Version bumped. - body = doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK) - _ = json.Unmarshal(body, &listed) - if listed.Version != 1 { - t.Fatalf("version after create: %d", listed.Version) - } - - // Update changes the cron expr; kind silently preserved even if request tries otherwise. - created.CronExpr = "*/15 * * * *" - created.Kind = "prune" // should be ignored - body = doReq("PUT", "/api/hosts/"+hostID+"/schedules/"+created.ID, created, stdhttp.StatusOK) - var updated scheduleAPI - _ = json.Unmarshal(body, &updated) - if updated.Kind != "backup" || updated.CronExpr != "*/15 * * * *" { - t.Fatalf("update: %+v", updated) - } - - // Delete. - doReq("DELETE", "/api/hosts/"+hostID+"/schedules/"+created.ID, nil, stdhttp.StatusNoContent) - body = doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK) - _ = json.Unmarshal(body, &listed) - if listed.Version != 3 || len(listed.Schedules) != 0 { - t.Fatalf("after delete: %+v", listed) - } -} - -func TestSchedulesAPIValidation(t *testing.T) { - t.Parallel() - _, url, st := newTestServerWithHub(t) - cookie := loginAndCookie(t, url) - hostID := makeHTTPHost(t, st) - - post := func(s scheduleAPI) (int, []byte) { - b, _ := json.Marshal(s) - req, _ := stdhttp.NewRequest("POST", - url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(b)) - req.AddCookie(cookie) - req.Header.Set("Content-Type", "application/json") - res, err := stdhttp.DefaultClient.Do(req) - if err != nil { - t.Fatalf("post: %v", err) - } - defer res.Body.Close() - body, _ := io.ReadAll(res.Body) - return res.StatusCode, body - } - - cases := []struct { - name string - in scheduleAPI - want string // expected error code - }{ - {"bad kind", scheduleAPI{Kind: "init", CronExpr: "@hourly", Paths: []string{"/etc"}}, "invalid_kind"}, - {"missing cron", scheduleAPI{Kind: "backup", Paths: []string{"/etc"}}, "missing_cron_expr"}, - {"bad cron", scheduleAPI{Kind: "backup", CronExpr: "not a cron", Paths: []string{"/etc"}}, "invalid_cron_expr"}, - {"backup without paths", scheduleAPI{Kind: "backup", CronExpr: "@hourly"}, "missing_paths"}, - {"hooks on non-backup", scheduleAPI{Kind: "prune", CronExpr: "@daily", PreHook: "echo hi"}, "hooks_not_allowed"}, - } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - status, body := post(c.in) - if status != stdhttp.StatusBadRequest { - t.Fatalf("status %d body=%s", status, body) - } - var env struct { - Code string `json:"code"` - } - _ = json.Unmarshal(body, &env) - if env.Code != c.want { - t.Fatalf("error code: got %q want %q (body=%s)", env.Code, c.want, body) - } - }) - } -} diff --git a/internal/server/http/ui_handlers.go b/internal/server/http/ui_handlers.go index ee34408..673392c 100644 --- a/internal/server/http/ui_handlers.go +++ b/internal/server/http/ui_handlers.go @@ -1,7 +1,6 @@ package http import ( - "context" "crypto/rand" "encoding/base64" "encoding/json" @@ -143,146 +142,23 @@ func (s *Server) handleUIDashboard(w stdhttp.ResponseWriter, r *stdhttp.Request) } } -// handleUIRunBackup is the form-submit twin of POST /api/hosts/{id}/jobs -// that the dashboard / host-detail "Run now" buttons call via -// hx-post. On success it sets HX-Redirect → /jobs/{job_id} so the -// operator lands on the live log viewer for the job they just -// kicked off. +// handleUIRunBackup and handleUIInitRepo are stubbed during the P2 +// redesign data-model rewrite. The dashboard per-host Run-now button +// is going away (operator clicks into host detail then a per-source- +// group Run-now), and Init-repo becomes implicit at host enrolment +// (auto-init dispatched server-side). Phase 4 of the redesign wires +// the new per-source-group Run-now via /hosts/{id}/source-groups/{gid}/run. + func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - if hostID == "" { - stdhttp.Error(w, "missing host id", stdhttp.StatusBadRequest) - return - } - storeUser, _, err := s.userByID(r, u.ID) - if err != nil { - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - host, err := s.deps.Store.GetHost(r.Context(), hostID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - if host.RepoInitialisedAt == nil { - stdhttp.Error(w, - "this host's repo hasn't been initialised yet — click Initialise repo first", - stdhttp.StatusBadRequest) - return - } - pick, err := s.pickRunNowSchedule(r.Context(), hostID) - if err != nil { - stdhttp.Error(w, err.Error(), stdhttp.StatusBadRequest) - return - } - res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobBackup, pick.Paths) - if code != "" { - stdhttp.Error(w, msg, status) - return - } - // HTMX (with hx-post + hx-swap=none) doesn't honour HX-Redirect - // when the response itself is a 3xx — fetch follows the redirect - // first and the header is lost. Branch on the HX-Request marker - // so HTMX gets a 200 + HX-Redirect (client-side window.location - // hop), while plain form-post / curl callers get the 303. - target := "/jobs/" + res.JobID - if r.Header.Get("HX-Request") == "true" { - w.Header().Set("HX-Redirect", target) - w.WriteHeader(stdhttp.StatusOK) - return - } - stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther) + stdhttp.Error(w, + "per-host Run-now is being replaced by per-source-group Run-now — see P2 redesign Phase 4", + stdhttp.StatusNotImplemented) } -// pickRunNowSchedule chooses which schedule a generic per-host -// "Run now" button should dispatch when the operator hasn't picked -// one explicitly. Picks in priority order: the host's only enabled -// manual schedule, then its only enabled schedule of any kind. -// Returns a friendly error if there's nothing to run, or if the -// operator needs to disambiguate. -func (s *Server) pickRunNowSchedule(ctx context.Context, hostID string) (*store.Schedule, error) { - rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID) - if err != nil { - return nil, errFmt("internal: %s", err) - } - enabled := make([]store.Schedule, 0, len(rows)) - for _, r := range rows { - if r.Enabled { - enabled = append(enabled, r) - } - } - if len(enabled) == 0 { - return nil, errFmt("this host has no enabled schedules — add one in the Schedules tab") - } - manuals := []store.Schedule{} - for _, r := range enabled { - if r.Manual { - manuals = append(manuals, r) - } - } - switch { - case len(manuals) == 1: - s := manuals[0] - return &s, nil - case len(enabled) == 1: - s := enabled[0] - return &s, nil - default: - return nil, errFmt("this host has %d schedules — pick one from the Schedules tab", len(enabled)) - } -} - -func errFmt(format string, args ...any) error { - return errFmtf(format, args...) -} - -// handleUIInitRepo dispatches a one-shot `restic init` job for a -// host. Surfaced in the run-now panel as a red "Initialise repo" -// button when host.repo_initialised_at IS NULL. On success it -// redirects to the live log page just like Run-now. func (s *Server) handleUIInitRepo(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - if hostID == "" { - stdhttp.Error(w, "missing host id", stdhttp.StatusBadRequest) - return - } - storeUser, _, err := s.userByID(r, u.ID) - if err != nil { - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobInit, nil) - if code != "" { - stdhttp.Error(w, msg, status) - return - } - target := "/jobs/" + res.JobID - if r.Header.Get("HX-Request") == "true" { - w.Header().Set("HX-Redirect", target) - w.WriteHeader(stdhttp.StatusOK) - return - } - stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther) + stdhttp.Error(w, + "manual Init-repo is being replaced by auto-init at host enrolment — see P2 redesign Phase 6", + stdhttp.StatusNotImplemented) } // addHostPage carries the Add-host form state. The result-state diff --git a/internal/server/http/ui_schedules.go b/internal/server/http/ui_schedules.go index 07a9be8..1e060d3 100644 --- a/internal/server/http/ui_schedules.go +++ b/internal/server/http/ui_schedules.go @@ -1,534 +1,38 @@ package http import ( - "errors" - "fmt" - "log/slog" stdhttp "net/http" - "strconv" - "strings" - - "github.com/go-chi/chi/v5" - "github.com/oklog/ulid/v2" - - "gitea.dcglab.co.uk/steve/restic-manager/internal/api" - "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui" - "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) -// schedulesListPage carries everything the Schedules tab needs. -type schedulesListPage struct { - Host store.Host - Schedules []store.Schedule - Version int64 - AppliedVersion int64 -} +// ui_schedules.go — HTML form-driven schedule CRUD. +// +// Stubbed during the P2 redesign template rewrite. Phase 4 of the +// redesign rebuilds the schedule editor against the new slim shape +// (cron + source-group multi-select + enabled), the source-group +// list/edit pages, and the repo-maintenance tab. Until then these +// routes return 501; the dashboard's host-row "View →" link is the +// only operator entry point that still works. -// scheduleEditPage drives both the Create form (Schedule.ID empty) -// and the Edit form (Schedule populated). Errors come back via Error -// to be rendered as a banner; the rest of the fields hold the just- -// submitted raw values so a failed POST can re-render with the -// operator's typed input still in place. -type scheduleEditPage struct { - Host store.Host - IsNew bool - ScheduleID string - Error string - // Kind is settable on create, immutable on edit. The form's - // kind picker is hidden when !IsNew. - Kind string - // Form values — strings so partial input survives validation - // errors (e.g. operator typed "abc" into keep_last). - CronExpr string - PathsRaw string - ExcludesRaw string - TagsRaw string - KeepLast string - KeepHourly string - KeepDaily string - KeepWeekly string - KeepMonthly string - KeepYearly string - LimitUpKBps string - LimitDownKBps string - Enabled bool - Manual bool -} - -// handleUISchedulesList renders the Schedules sub-tab on a host. func (s *Server) handleUISchedulesList(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - host, err := s.deps.Store.GetHost(r.Context(), hostID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - rows, err := s.deps.Store.ListSchedulesByHost(r.Context(), hostID) - if err != nil { - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - version, _ := s.deps.Store.GetHostScheduleVersion(r.Context(), hostID) - - view := s.baseView(u, "dashboard") - view.Title = host.Name + " · schedules · restic-manager" - view.Page = schedulesListPage{ - Host: *host, - Schedules: rows, - Version: version, - AppliedVersion: host.AppliedScheduleVersion, - } - if err := s.deps.UI.Render(w, "schedules_list", view); err != nil { - slog.Error("ui: render schedules_list", "err", err) - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - } + stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented) } -// handleUIScheduleNewGet renders the empty Create form. func (s *Server) handleUIScheduleNewGet(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - host, err := s.deps.Store.GetHost(r.Context(), hostID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - view := s.baseView(u, "dashboard") - view.Title = "New schedule · " + host.Name - view.Page = scheduleEditPage{ - Host: *host, - IsNew: true, - Kind: string(api.JobBackup), - CronExpr: "0 3 * * *", - Enabled: true, - } - s.renderScheduleEdit(w, view) + stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented) } -// handleUIScheduleEditGet renders the Edit form pre-filled from the -// existing schedule row. func (s *Server) handleUIScheduleEditGet(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - scheduleID := chi.URLParam(r, "sid") - host, err := s.deps.Store.GetHost(r.Context(), hostID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - sched, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - page := scheduleEditPage{ - Host: *host, - IsNew: false, - ScheduleID: sched.ID, - Kind: sched.Kind, - CronExpr: sched.CronExpr, - PathsRaw: strings.Join(sched.Paths, "\n"), - ExcludesRaw: strings.Join(sched.Excludes, "\n"), - TagsRaw: strings.Join(sched.Tags, ", "), - Enabled: sched.Enabled, - Manual: sched.Manual, - } - page.KeepLast = intStringPtr(sched.RetentionPolicy.KeepLast) - page.KeepHourly = intStringPtr(sched.RetentionPolicy.KeepHourly) - page.KeepDaily = intStringPtr(sched.RetentionPolicy.KeepDaily) - page.KeepWeekly = intStringPtr(sched.RetentionPolicy.KeepWeekly) - page.KeepMonthly = intStringPtr(sched.RetentionPolicy.KeepMonthly) - page.KeepYearly = intStringPtr(sched.RetentionPolicy.KeepYearly) - page.LimitUpKBps = intStringPtr(sched.Options.LimitUploadKBps) - page.LimitDownKBps = intStringPtr(sched.Options.LimitDownloadKBps) - - view := s.baseView(u, "dashboard") - view.Title = "Edit schedule · " + host.Name - view.Page = page - s.renderScheduleEdit(w, view) + stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented) } -// handleUIScheduleSave handles POST for both create and update. The -// edit form posts to /hosts/{id}/schedules/new (for create) or -// /hosts/{id}/schedules/{sid}/edit (for update); we branch on whether -// {sid} is present in the route params. func (s *Server) handleUIScheduleSave(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - scheduleID := chi.URLParam(r, "sid") - storeUser, _, err := s.userByID(r, u.ID) - if err != nil || storeUser == nil { - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - host, err := s.deps.Store.GetHost(r.Context(), hostID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - if err := r.ParseForm(); err != nil { - stdhttp.Error(w, "bad request", stdhttp.StatusBadRequest) - return - } - page := scheduleEditPage{ - Host: *host, - IsNew: scheduleID == "", - ScheduleID: scheduleID, - Kind: strings.TrimSpace(r.PostForm.Get("kind")), - CronExpr: strings.TrimSpace(r.PostForm.Get("cron_expr")), - PathsRaw: r.PostForm.Get("paths"), - ExcludesRaw: r.PostForm.Get("excludes"), - TagsRaw: strings.TrimSpace(r.PostForm.Get("tags")), - KeepLast: strings.TrimSpace(r.PostForm.Get("keep_last")), - KeepHourly: strings.TrimSpace(r.PostForm.Get("keep_hourly")), - KeepDaily: strings.TrimSpace(r.PostForm.Get("keep_daily")), - KeepWeekly: strings.TrimSpace(r.PostForm.Get("keep_weekly")), - KeepMonthly: strings.TrimSpace(r.PostForm.Get("keep_monthly")), - KeepYearly: strings.TrimSpace(r.PostForm.Get("keep_yearly")), - LimitUpKBps: strings.TrimSpace(r.PostForm.Get("limit_up_kbps")), - LimitDownKBps: strings.TrimSpace(r.PostForm.Get("limit_down_kbps")), - Enabled: r.PostForm.Get("enabled") == "on", - Manual: r.PostForm.Get("manual") == "on", - } - // Kind is immutable on edit — use the existing schedule's kind - // regardless of what the form submitted. - if !page.IsNew { - if existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID); err == nil { - page.Kind = existing.Kind - } - } - if page.Kind == "" { - page.Kind = string(api.JobBackup) - } - - // Convert the raw form values into store-shape data, surfacing - // the first parse error as a banner. - paths := splitPaths(page.PathsRaw) - excludes := splitPaths(page.ExcludesRaw) - tags := splitCSV(page.TagsRaw) - - retention, err := parseRetention(page) - if err != nil { - page.Error = err.Error() - s.renderEditPage(w, u, page) - return - } - options, err := parseOptions(page) - if err != nil { - page.Error = err.Error() - s.renderEditPage(w, u, page) - return - } - - // Validate against the same rules the JSON API uses. Manual - // schedules skip the cron-expr requirement; forget schedules - // require a non-empty retention policy. Other validation - // (kind in allowed set, paths required for backup, hooks - // rejected on non-backup) lives in validateSchedule. - apiShape := scheduleAPI{ - Kind: api.JobKind(page.Kind), - CronExpr: page.CronExpr, - Paths: paths, - Manual: page.Manual, - RetentionPolicy: retention, - } - if code, msg := validateSchedule(&apiShape); code != "" { - page.Error = uiErrorMessage(code, msg) - s.renderEditPage(w, u, page) - return - } - - if page.IsNew { - row := store.Schedule{ - ID: ulid.Make().String(), - HostID: hostID, - Kind: page.Kind, - CronExpr: page.CronExpr, - Paths: paths, - Excludes: excludes, - Tags: tags, - RetentionPolicy: retention, - Options: options, - Enabled: page.Enabled, - Manual: page.Manual, - } - if err := s.deps.Store.CreateSchedule(r.Context(), &row); err != nil { - page.Error = "Couldn't save schedule — see server log." - slog.Error("ui schedule create", "err", err) - s.renderEditPage(w, u, page) - return - } - _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ - ID: ulid.Make().String(), - UserID: &storeUser.ID, - Actor: "user", - Action: "schedule.created", - TargetKind: ptr("schedule"), - TargetID: &row.ID, - TS: nowUTC(), - }) - s.pushScheduleSetAsync(hostID) - } else { - existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - existing.CronExpr = page.CronExpr - existing.Paths = paths - existing.Excludes = excludes - existing.Tags = tags - existing.RetentionPolicy = retention - existing.Options = options - existing.Enabled = page.Enabled - existing.Manual = page.Manual - if err := s.deps.Store.UpdateSchedule(r.Context(), existing); err != nil { - page.Error = "Couldn't save schedule — see server log." - slog.Error("ui schedule update", "err", err) - s.renderEditPage(w, u, page) - return - } - _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ - ID: ulid.Make().String(), - UserID: &storeUser.ID, - Actor: "user", - Action: "schedule.updated", - TargetKind: ptr("schedule"), - TargetID: &scheduleID, - TS: nowUTC(), - }) - s.pushScheduleSetAsync(hostID) - } - - stdhttp.Redirect(w, r, "/hosts/"+hostID+"/schedules", stdhttp.StatusSeeOther) + stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented) } -// handleUIScheduleRun is the POST target of per-schedule Run-now -// buttons. Reuses dispatchScheduledJob (the same code path used by -// the agent's local cron firing) so manual + automated runs flow -// through identical job lifecycle. Sets HX-Redirect to the live -// log on success. -func (s *Server) handleUIScheduleRun(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - scheduleID := chi.URLParam(r, "sid") - host, err := s.deps.Store.GetHost(r.Context(), hostID) - if err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - if !s.deps.Hub.Connected(hostID) { - stdhttp.Error(w, "agent is offline", stdhttp.StatusBadRequest) - return - } - _ = host - jobID, err := s.dispatchScheduleNow(r.Context(), hostID, scheduleID, nil) - if err != nil { - stdhttp.Error(w, err.Error(), stdhttp.StatusBadRequest) - return - } - target := "/jobs/" + jobID - if r.Header.Get("HX-Request") == "true" { - w.Header().Set("HX-Redirect", target) - w.WriteHeader(stdhttp.StatusOK) - return - } - stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther) -} - -// handleUIScheduleDelete is the POST target of the Delete buttons on -// the list view. Confirm-then-redirect; no AJAX. func (s *Server) handleUIScheduleDelete(w stdhttp.ResponseWriter, r *stdhttp.Request) { - u := s.requireUIUser(w, r) - if u == nil { - return - } - hostID := chi.URLParam(r, "id") - scheduleID := chi.URLParam(r, "sid") - storeUser, _, err := s.userByID(r, u.ID) - if err != nil || storeUser == nil { - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - if err := s.deps.Store.DeleteSchedule(r.Context(), hostID, scheduleID); err != nil { - if errors.Is(err, store.ErrNotFound) { - stdhttp.NotFound(w, r) - return - } - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - return - } - _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ - ID: ulid.Make().String(), - UserID: &storeUser.ID, - Actor: "user", - Action: "schedule.deleted", - TargetKind: ptr("schedule"), - TargetID: &scheduleID, - TS: nowUTC(), - }) - s.pushScheduleSetAsync(hostID) - stdhttp.Redirect(w, r, "/hosts/"+hostID+"/schedules", stdhttp.StatusSeeOther) + stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented) } -func (s *Server) renderScheduleEdit(w stdhttp.ResponseWriter, view ui.ViewData) { - if err := s.deps.UI.Render(w, "schedule_edit", view); err != nil { - slog.Error("ui: render schedule_edit", "err", err) - stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError) - } -} - -func (s *Server) renderEditPage(w stdhttp.ResponseWriter, u *ui.User, page scheduleEditPage) { - view := s.baseView(u, "dashboard") - if page.IsNew { - view.Title = "New schedule · " + page.Host.Name - } else { - view.Title = "Edit schedule · " + page.Host.Name - } - view.Page = page - w.WriteHeader(stdhttp.StatusUnprocessableEntity) - s.renderScheduleEdit(w, view) -} - -// ----- helpers -------------------------------------------------------- - -// splitCSV parses comma-separated values into a clean []string — -// leading/trailing whitespace trimmed, blanks dropped. -func splitCSV(s string) []string { - out := []string{} - for _, p := range strings.Split(s, ",") { - if t := strings.TrimSpace(p); t != "" { - out = append(out, t) - } - } - return out -} - -func parseRetention(p scheduleEditPage) (store.RetentionPolicy, error) { - var r store.RetentionPolicy - for _, f := range []struct { - raw string - dest **int - name string - }{ - {p.KeepLast, &r.KeepLast, "keep last"}, - {p.KeepHourly, &r.KeepHourly, "keep hourly"}, - {p.KeepDaily, &r.KeepDaily, "keep daily"}, - {p.KeepWeekly, &r.KeepWeekly, "keep weekly"}, - {p.KeepMonthly, &r.KeepMonthly, "keep monthly"}, - {p.KeepYearly, &r.KeepYearly, "keep yearly"}, - } { - v, err := parsePosInt(f.raw) - if err != nil { - return r, errFmtf("%s: %s", f.name, err) - } - *f.dest = v - } - return r, nil -} - -func parseOptions(p scheduleEditPage) (store.ScheduleOptions, error) { - var o store.ScheduleOptions - up, err := parsePosInt(p.LimitUpKBps) - if err != nil { - return o, errFmtf("limit upload: %s", err) - } - o.LimitUploadKBps = up - down, err := parsePosInt(p.LimitDownKBps) - if err != nil { - return o, errFmtf("limit download: %s", err) - } - o.LimitDownloadKBps = down - return o, nil -} - -// parsePosInt turns a possibly-empty string into *int. Empty → nil -// (no value). Non-empty must parse as a positive int. -func parsePosInt(raw string) (*int, error) { - if raw == "" { - return nil, nil - } - v, err := strconv.Atoi(raw) - if err != nil { - return nil, errFmtf("must be a whole number") - } - if v < 0 { - return nil, errFmtf("must be non-negative") - } - return &v, nil -} - -func intStringPtr(p *int) string { - if p == nil { - return "" - } - return strconv.Itoa(*p) -} - -// uiErrorMessage maps the JSON-API validation codes to operator- -// friendly banner text. -func uiErrorMessage(code, msg string) string { - switch code { - case "missing_cron_expr": - return "Cron expression is required." - case "invalid_cron_expr": - return "Cron expression doesn't parse: " + msg - case "missing_paths": - return "At least one backup path is required (one per line)." - case "missing_retention": - return "Forget schedules need at least one Keep-* value, otherwise restic would delete every snapshot." - case "invalid_kind": - return "Unsupported schedule kind." - default: - return msg - } -} - -// errFmtf wraps fmt.Errorf so the validators read consistently. -func errFmtf(format string, args ...any) error { - return fmt.Errorf(format, args...) +func (s *Server) handleUIScheduleRun(w stdhttp.ResponseWriter, r *stdhttp.Request) { + stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented) } diff --git a/internal/server/ws/handler.go b/internal/server/ws/handler.go index 533840f..25db3e3 100644 --- a/internal/server/ws/handler.go +++ b/internal/server/ws/handler.go @@ -204,16 +204,9 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil { slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err) } - // A successful backup or init proves the repo exists; flip - // repo_initialised_at on the host (idempotent — set-if-null). - if p.Status == api.JobSucceeded { - if job, err := deps.Store.GetJob(ctx, p.JobID); err == nil && - (job.Kind == string(api.JobBackup) || job.Kind == string(api.JobInit)) { - if _, err := deps.Store.MarkHostRepoInitialised(ctx, hostID, p.FinishedAt); err != nil { - slog.Warn("ws: mark repo initialised", "host_id", hostID, "err", err) - } - } - } + // repo_initialised_at projection has been removed — auto-init + // at host enrolment makes "is the repo init'd" derivable from + // the latest init job's status, no separate column needed. if deps.JobHub != nil { deps.JobHub.Broadcast(p.JobID, env) } @@ -253,15 +246,6 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E } else { slog.Info("ws: snapshots refreshed", "host_id", hostID, "count", len(snaps)) } - // A non-empty snapshot list also proves the repo is initialised - // (catches the case where an external job — `restic init` from - // the CLI, or a backup ran outside this control plane — - // initialised it before our first job dispatched). - if len(snaps) > 0 { - if _, err := deps.Store.MarkHostRepoInitialised(ctx, hostID, time.Now().UTC()); err != nil { - slog.Warn("ws: mark repo initialised (snapshots)", "host_id", hostID, "err", err) - } - } case api.MsgScheduleAck: var p api.ScheduleAckPayload diff --git a/internal/store/hosts.go b/internal/store/hosts.go index cf542cd..af7df00 100644 --- a/internal/store/hosts.go +++ b/internal/store/hosts.go @@ -42,7 +42,7 @@ func (s *Store) LookupHostByAgentToken(ctx context.Context, tokenHash string) (* enrolled_at, last_seen_at, status, repo_id, tags, current_job_id, last_backup_at, last_backup_status, repo_size_bytes, snapshot_count, open_alert_count, - applied_schedule_version, repo_initialised_at + applied_schedule_version, bandwidth_up_kbps, bandwidth_down_kbps FROM hosts WHERE agent_token_hash = ?`, tokenHash) return scanHost(row) @@ -55,7 +55,7 @@ func (s *Store) GetHost(ctx context.Context, id string) (*Host, error) { enrolled_at, last_seen_at, status, repo_id, tags, current_job_id, last_backup_at, last_backup_status, repo_size_bytes, snapshot_count, open_alert_count, - applied_schedule_version, repo_initialised_at + applied_schedule_version, bandwidth_up_kbps, bandwidth_down_kbps FROM hosts WHERE id = ?`, id) return scanHost(row) } @@ -116,7 +116,7 @@ func (s *Store) ListHosts(ctx context.Context) ([]Host, error) { enrolled_at, last_seen_at, status, repo_id, tags, current_job_id, last_backup_at, last_backup_status, repo_size_bytes, snapshot_count, open_alert_count, - applied_schedule_version, repo_initialised_at + applied_schedule_version, bandwidth_up_kbps, bandwidth_down_kbps FROM hosts ORDER BY name`) if err != nil { return nil, fmt.Errorf("store: list hosts: %w", err) @@ -154,14 +154,14 @@ func scanHostRow(s hostScanner) (*Host, error) { repoID, currentJob, lastBkSt sql.NullString enrolled string tags string - repoInitAt sql.NullString + bwUp, bwDown sql.NullInt64 ) err := s.Scan(&h.ID, &h.Name, &h.OS, &h.Arch, &h.AgentVersion, &h.ResticVersion, &h.ProtocolVersion, &enrolled, &lastSeen, &h.Status, &repoID, &tags, ¤tJob, &lastBackupAt, &lastBkSt, &h.RepoSizeBytes, &h.SnapshotCount, &h.OpenAlertCount, - &h.AppliedScheduleVersion, &repoInitAt) + &h.AppliedScheduleVersion, &bwUp, &bwDown) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound @@ -202,28 +202,33 @@ func scanHostRow(s hostScanner) (*Host, error) { if tags != "" { _ = json.Unmarshal([]byte(tags), &h.Tags) } - if repoInitAt.Valid { - t, err := time.Parse(time.RFC3339Nano, repoInitAt.String) - if err != nil { - return nil, fmt.Errorf("store: parse repo_initialised_at: %w", err) - } - h.RepoInitialisedAt = &t + if bwUp.Valid { + v := int(bwUp.Int64) + h.BandwidthUpKBps = &v + } + if bwDown.Valid { + v := int(bwDown.Int64) + h.BandwidthDownKBps = &v } return &h, nil } -// MarkHostRepoInitialised sets repo_initialised_at to `when` if it is -// currently NULL. Idempotent: re-firing for an already-initialised -// host is a no-op (we never want to clobber the original timestamp). -// Returns true if the row was updated, false if it was already set. -func (s *Store) MarkHostRepoInitialised(ctx context.Context, hostID string, when time.Time) (bool, error) { - res, err := s.db.ExecContext(ctx, - `UPDATE hosts SET repo_initialised_at = ? - WHERE id = ? AND repo_initialised_at IS NULL`, - when.UTC().Format(time.RFC3339Nano), hostID) +// SetHostBandwidth replaces the host's upload/download caps. Pass nil +// to clear a cap. Caller decides validation; non-positive caps are +// treated as "no cap" by the agent regardless. +func (s *Store) SetHostBandwidth(ctx context.Context, hostID string, upKBps, downKBps *int) error { + _, err := s.db.ExecContext(ctx, + `UPDATE hosts SET bandwidth_up_kbps = ?, bandwidth_down_kbps = ? WHERE id = ?`, + nullableInt(upKBps), nullableInt(downKBps), hostID) if err != nil { - return false, fmt.Errorf("store: mark repo initialised: %w", err) + return fmt.Errorf("store: set host bandwidth: %w", err) } - n, _ := res.RowsAffected() - return n > 0, nil + return nil +} + +func nullableInt(p *int) any { + if p == nil { + return nil + } + return *p } diff --git a/internal/store/maintenance.go b/internal/store/maintenance.go new file mode 100644 index 0000000..624af95 --- /dev/null +++ b/internal/store/maintenance.go @@ -0,0 +1,74 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" +) + +// CreateDefaultRepoMaintenance inserts the default cadences for a +// host. Called once at host enrolment alongside CreateHost. Safe to +// re-call (uses INSERT OR IGNORE so a manual repair doesn't blow up +// an already-seeded host). +func (st *Store) CreateDefaultRepoMaintenance(ctx context.Context, hostID string) error { + _, err := st.db.ExecContext(ctx, + `INSERT OR IGNORE INTO host_repo_maintenance (host_id) VALUES (?)`, + hostID) + if err != nil { + return fmt.Errorf("store: seed repo maintenance: %w", err) + } + return nil +} + +// GetRepoMaintenance returns the cadence row for a host. Returns +// ErrNotFound if absent — caller should usually treat that as +// "needs CreateDefaultRepoMaintenance" rather than a hard error. +func (st *Store) GetRepoMaintenance(ctx context.Context, hostID string) (*HostRepoMaintenance, error) { + row := st.db.QueryRowContext(ctx, + `SELECT host_id, forget_cron, forget_enabled, + prune_cron, prune_enabled, + check_cron, check_enabled, check_subset_pct + FROM host_repo_maintenance WHERE host_id = ?`, hostID) + var ( + m HostRepoMaintenance + forgetEnabled, pruneEnabled, checkEnabled int + ) + err := row.Scan(&m.HostID, + &m.ForgetCron, &forgetEnabled, + &m.PruneCron, &pruneEnabled, + &m.CheckCron, &checkEnabled, &m.CheckSubsetPct) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("store: get repo maintenance: %w", err) + } + m.ForgetEnabled = forgetEnabled != 0 + m.PruneEnabled = pruneEnabled != 0 + m.CheckEnabled = checkEnabled != 0 + return &m, nil +} + +// UpdateRepoMaintenance replaces every editable field. Doesn't bump +// the schedule version — these run on the server's own ticker, not +// the agent's local cron, so the agent doesn't need to know. +func (st *Store) UpdateRepoMaintenance(ctx context.Context, m *HostRepoMaintenance) error { + if m.HostID == "" { + return errors.New("store: repo maintenance host_id required") + } + _, err := st.db.ExecContext(ctx, + `UPDATE host_repo_maintenance SET + forget_cron = ?, forget_enabled = ?, + prune_cron = ?, prune_enabled = ?, + check_cron = ?, check_enabled = ?, check_subset_pct = ? + WHERE host_id = ?`, + m.ForgetCron, boolToInt(m.ForgetEnabled), + m.PruneCron, boolToInt(m.PruneEnabled), + m.CheckCron, boolToInt(m.CheckEnabled), m.CheckSubsetPct, + m.HostID) + if err != nil { + return fmt.Errorf("store: update repo maintenance: %w", err) + } + return nil +} diff --git a/internal/store/pending.go b/internal/store/pending.go new file mode 100644 index 0000000..eea9b84 --- /dev/null +++ b/internal/store/pending.go @@ -0,0 +1,103 @@ +package store + +import ( + "context" + "errors" + "fmt" + "time" +) + +// EnqueuePendingRun queues a missed cron tick for the offline-retry +// ticker to dispatch later. Caller (the schedule firing path) sets +// next_attempt_at = now + group.retry_backoff_seconds × 2^(attempt-1). +func (st *Store) EnqueuePendingRun(ctx context.Context, p *PendingRun) error { + if p.ID == "" || p.ScheduleID == "" || p.SourceGroupID == "" || p.HostID == "" { + return errors.New("store: pending run id, schedule_id, source_group_id, host_id required") + } + if p.Attempt == 0 { + p.Attempt = 1 + } + if p.NextAttemptAt.IsZero() { + p.NextAttemptAt = time.Now().UTC() + } + if p.ScheduledAt.IsZero() { + p.ScheduledAt = time.Now().UTC() + } + _, err := st.db.ExecContext(ctx, + `INSERT INTO pending_runs (id, schedule_id, source_group_id, host_id, + attempt, next_attempt_at, scheduled_at, last_error) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + p.ID, p.ScheduleID, p.SourceGroupID, p.HostID, + p.Attempt, + p.NextAttemptAt.UTC().Format(time.RFC3339Nano), + p.ScheduledAt.UTC().Format(time.RFC3339Nano), + nullableString(p.LastError)) + if err != nil { + return fmt.Errorf("store: enqueue pending run: %w", err) + } + return nil +} + +// DuePendingRuns returns rows whose next_attempt_at <= now, ordered +// oldest first. Server-side ticker calls this every ~30s. +func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) ([]PendingRun, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT id, schedule_id, source_group_id, host_id, attempt, + next_attempt_at, scheduled_at, COALESCE(last_error, '') + FROM pending_runs + WHERE next_attempt_at <= ? + ORDER BY next_attempt_at + LIMIT ?`, + now.UTC().Format(time.RFC3339Nano), limit) + if err != nil { + return nil, fmt.Errorf("store: due pending runs: %w", err) + } + defer rows.Close() + out := []PendingRun{} + for rows.Next() { + var p PendingRun + var nextAt, scheduledAt string + if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID, + &p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil { + return nil, err + } + if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil { + p.NextAttemptAt = t + } + if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil { + p.ScheduledAt = t + } + out = append(out, p) + } + return out, rows.Err() +} + +// DeletePendingRun removes a row by id. Called after successful +// dispatch or after exceeding retry_max. +func (st *Store) DeletePendingRun(ctx context.Context, id string) error { + _, err := st.db.ExecContext(ctx, + `DELETE FROM pending_runs WHERE id = ?`, id) + if err != nil { + return fmt.Errorf("store: delete pending run: %w", err) + } + return nil +} + +// BumpPendingRunAttempt increments the attempt counter and updates +// next_attempt_at + last_error. Used after a failed retry — caller +// has decided to try again. +func (st *Store) BumpPendingRunAttempt(ctx context.Context, id string, nextAttemptAt time.Time, lastError string) error { + _, err := st.db.ExecContext(ctx, + `UPDATE pending_runs SET + attempt = attempt + 1, + next_attempt_at = ?, + last_error = ? + WHERE id = ?`, + nextAttemptAt.UTC().Format(time.RFC3339Nano), + nullableString(lastError), + id) + if err != nil { + return fmt.Errorf("store: bump pending run: %w", err) + } + return nil +} diff --git a/internal/store/schedules.go b/internal/store/schedules.go index c2fd79d..f80bd4f 100644 --- a/internal/store/schedules.go +++ b/internal/store/schedules.go @@ -3,15 +3,14 @@ package store import ( "context" "database/sql" - "encoding/json" "errors" "fmt" "time" ) -// CreateSchedule inserts a new schedule and bumps the host's -// schedule_version atomically. Returns the inserted row's -// CreatedAt / UpdatedAt timestamps written into s. +// CreateSchedule inserts a new slim schedule row + the schedule_source_groups +// junction entries for s.SourceGroupIDs, atomic in one tx. Bumps +// host_schedule_version. Caller mints s.ID. func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { if s.ID == "" || s.HostID == "" { return errors.New("store: schedule id and host_id required") @@ -19,20 +18,6 @@ func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { now := time.Now().UTC() s.CreatedAt = now s.UpdatedAt = now - if s.Paths == nil { - s.Paths = []string{} - } - if s.Excludes == nil { - s.Excludes = []string{} - } - if s.Tags == nil { - s.Tags = []string{} - } - pathsJSON, _ := json.Marshal(s.Paths) - excludesJSON, _ := json.Marshal(s.Excludes) - tagsJSON, _ := json.Marshal(s.Tags) - retentionJSON, _ := json.Marshal(s.RetentionPolicy) - optionsJSON, _ := json.Marshal(s.Options) tx, err := st.db.BeginTx(ctx, nil) if err != nil { @@ -41,19 +26,16 @@ func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { defer func() { _ = tx.Rollback() }() if _, err := tx.ExecContext(ctx, - `INSERT INTO schedules ( - id, host_id, kind, cron_expr, paths, excludes, tags, - retention_policy, options, pre_hook, post_hook, enabled, manual, - created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - s.ID, s.HostID, s.Kind, s.CronExpr, - string(pathsJSON), string(excludesJSON), string(tagsJSON), - string(retentionJSON), string(optionsJSON), - s.PreHook, s.PostHook, boolToInt(s.Enabled), boolToInt(s.Manual), + `INSERT INTO schedules (id, host_id, cron_expr, enabled, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?)`, + s.ID, s.HostID, s.CronExpr, boolToInt(s.Enabled), now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), ); err != nil { return fmt.Errorf("store: create schedule: %w", err) } + if err := writeScheduleGroupsTx(ctx, tx, s.ID, s.SourceGroupIDs); err != nil { + return err + } if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { return err } @@ -61,27 +43,11 @@ func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { } // UpdateSchedule replaces every editable field on an existing row -// and bumps host_schedule_version. ID and HostID must match an -// existing row; kind is immutable (creating a new schedule is -// cheaper than re-keying retention/hooks). +// and rewrites the junction. Bumps host_schedule_version. func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error { if s.ID == "" || s.HostID == "" { return errors.New("store: schedule id and host_id required") } - if s.Paths == nil { - s.Paths = []string{} - } - if s.Excludes == nil { - s.Excludes = []string{} - } - if s.Tags == nil { - s.Tags = []string{} - } - pathsJSON, _ := json.Marshal(s.Paths) - excludesJSON, _ := json.Marshal(s.Excludes) - tagsJSON, _ := json.Marshal(s.Tags) - retentionJSON, _ := json.Marshal(s.RetentionPolicy) - optionsJSON, _ := json.Marshal(s.Options) now := time.Now().UTC() tx, err := st.db.BeginTx(ctx, nil) @@ -91,16 +57,10 @@ func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error { defer func() { _ = tx.Rollback() }() res, err := tx.ExecContext(ctx, - `UPDATE schedules SET - cron_expr = ?, paths = ?, excludes = ?, tags = ?, - retention_policy = ?, options = ?, - pre_hook = ?, post_hook = ?, enabled = ?, manual = ?, - updated_at = ? + `UPDATE schedules + SET cron_expr = ?, enabled = ?, updated_at = ? WHERE id = ? AND host_id = ?`, - s.CronExpr, - string(pathsJSON), string(excludesJSON), string(tagsJSON), - string(retentionJSON), string(optionsJSON), - s.PreHook, s.PostHook, boolToInt(s.Enabled), boolToInt(s.Manual), + s.CronExpr, boolToInt(s.Enabled), now.Format(time.RFC3339Nano), s.ID, s.HostID, ) @@ -112,14 +72,23 @@ func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error { return ErrNotFound } s.UpdatedAt = now + // Rewrite junction wholesale — simpler than diffing. + if _, err := tx.ExecContext(ctx, + `DELETE FROM schedule_source_groups WHERE schedule_id = ?`, s.ID, + ); err != nil { + return fmt.Errorf("store: clear schedule junction: %w", err) + } + if err := writeScheduleGroupsTx(ctx, tx, s.ID, s.SourceGroupIDs); err != nil { + return err + } if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { return err } return tx.Commit() } -// DeleteSchedule removes a schedule and bumps host_schedule_version. -// Returns ErrNotFound if no row matched. +// DeleteSchedule removes a schedule and its junction rows; bumps +// host_schedule_version. Returns ErrNotFound if no row matched. func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string) error { tx, err := st.db.BeginTx(ctx, nil) if err != nil { @@ -137,35 +106,39 @@ func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string) if n == 0 { return ErrNotFound } + // Junction rows go via ON DELETE CASCADE; nothing to do here. if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { return err } return tx.Commit() } -// GetSchedule returns one schedule by (host_id, id). Returns -// ErrNotFound on miss. +// GetSchedule returns one schedule (with its junction-resolved +// SourceGroupIDs populated) by (host_id, id). ErrNotFound on miss. func (st *Store) GetSchedule(ctx context.Context, hostID, scheduleID string) (*Schedule, error) { row := st.db.QueryRowContext(ctx, - `SELECT id, host_id, kind, cron_expr, paths, excludes, tags, - retention_policy, options, pre_hook, post_hook, enabled, manual, - created_at, updated_at + `SELECT id, host_id, cron_expr, enabled, created_at, updated_at FROM schedules WHERE id = ? AND host_id = ?`, scheduleID, hostID) s, err := scanSchedule(row) if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound } - return s, err + if err != nil { + return nil, err + } + s.SourceGroupIDs, err = st.scheduleGroupIDs(ctx, scheduleID) + if err != nil { + return nil, err + } + return s, nil } -// ListSchedulesByHost returns every schedule for a host, ordered -// by created_at. Empty slice on miss (not an error). +// ListSchedulesByHost returns every schedule for a host, with +// SourceGroupIDs resolved. func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Schedule, error) { rows, err := st.db.QueryContext(ctx, - `SELECT id, host_id, kind, cron_expr, paths, excludes, tags, - retention_policy, options, pre_hook, post_hook, enabled, manual, - created_at, updated_at + `SELECT id, host_id, cron_expr, enabled, created_at, updated_at FROM schedules WHERE host_id = ? ORDER BY created_at`, hostID) if err != nil { @@ -180,11 +153,22 @@ func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Sche } out = append(out, *s) } - return out, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + // Second pass to resolve junctions — small fleet, cheap. + for i := range out { + ids, err := st.scheduleGroupIDs(ctx, out[i].ID) + if err != nil { + return nil, err + } + out[i].SourceGroupIDs = ids + } + return out, nil } -// GetHostScheduleVersion returns the current version for a host, -// or 0 if no row exists yet. +// GetHostScheduleVersion returns the current version for a host, or +// 0 if no row exists yet. func (st *Store) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) { var v int64 err := st.db.QueryRowContext(ctx, @@ -210,12 +194,26 @@ func (st *Store) SetHostAppliedScheduleVersion(ctx context.Context, hostID strin return nil } +// BumpHostScheduleVersion is the public wrapper for non-schedule +// CRUD that needs to push to the agent — e.g. source-group edits +// (paths/retention change), retry-policy edits. +func (st *Store) BumpHostScheduleVersion(ctx context.Context, hostID string) error { + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { + return err + } + return tx.Commit() +} + // bumpHostScheduleVersionTx upserts host_schedule_version, +1 each // call. Caller owns the tx. func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) error { if _, err := tx.ExecContext(ctx, - `INSERT INTO host_schedule_version (host_id, version) - VALUES (?, 1) + `INSERT INTO host_schedule_version (host_id, version) VALUES (?, 1) ON CONFLICT(host_id) DO UPDATE SET version = version + 1`, hostID); err != nil { return fmt.Errorf("store: bump schedule version: %w", err) @@ -223,6 +221,66 @@ func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) e return nil } +// writeScheduleGroupsTx inserts the junction rows for one schedule. +// Caller owns the tx; assumes the table is already empty for this id +// (Update wipes before calling; Create starts empty). +func writeScheduleGroupsTx(ctx context.Context, tx *sql.Tx, scheduleID string, groupIDs []string) error { + for _, gid := range groupIDs { + if gid == "" { + continue + } + if _, err := tx.ExecContext(ctx, + `INSERT INTO schedule_source_groups (schedule_id, source_group_id) VALUES (?, ?)`, + scheduleID, gid, + ); err != nil { + return fmt.Errorf("store: link schedule %s to group %s: %w", scheduleID, gid, err) + } + } + return nil +} + +// scheduleGroupIDs reads the junction for one schedule. +func (st *Store) scheduleGroupIDs(ctx context.Context, scheduleID string) ([]string, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT source_group_id FROM schedule_source_groups WHERE schedule_id = ?`, + scheduleID) + if err != nil { + return nil, fmt.Errorf("store: read schedule junction: %w", err) + } + defer rows.Close() + out := []string{} + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + out = append(out, id) + } + return out, rows.Err() +} + +// SchedulesUsingGroup is the inverse — list schedule IDs that +// reference a given source group. Used for retention-conflict +// detection and "X schedules use this group" UI labels. +func (st *Store) SchedulesUsingGroup(ctx context.Context, groupID string) ([]string, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT schedule_id FROM schedule_source_groups WHERE source_group_id = ?`, + groupID) + if err != nil { + return nil, fmt.Errorf("store: schedules using group: %w", err) + } + defer rows.Close() + out := []string{} + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + out = append(out, id) + } + return out, rows.Err() +} + // ----- scan helpers -------------------------------------------------- func scanSchedule(row *sql.Row) (*Schedule, error) { @@ -235,35 +293,15 @@ type scheduleScanner interface { func scanScheduleRow(s scheduleScanner) (*Schedule, error) { var ( - out Schedule - paths, excludes, tags, retention, options string - createdAt, updatedAt string - enabled, manual int + out Schedule + createdAt, updatedAt string + enabled int ) - err := s.Scan(&out.ID, &out.HostID, &out.Kind, &out.CronExpr, - &paths, &excludes, &tags, &retention, &options, - &out.PreHook, &out.PostHook, &enabled, &manual, - &createdAt, &updatedAt) + err := s.Scan(&out.ID, &out.HostID, &out.CronExpr, &enabled, &createdAt, &updatedAt) if err != nil { return nil, err } - if paths != "" { - _ = json.Unmarshal([]byte(paths), &out.Paths) - } - if excludes != "" { - _ = json.Unmarshal([]byte(excludes), &out.Excludes) - } - if tags != "" { - _ = json.Unmarshal([]byte(tags), &out.Tags) - } - if retention != "" { - _ = json.Unmarshal([]byte(retention), &out.RetentionPolicy) - } - if options != "" { - _ = json.Unmarshal([]byte(options), &out.Options) - } out.Enabled = enabled != 0 - out.Manual = manual != 0 if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil { out.CreatedAt = t } diff --git a/internal/store/schedules_test.go b/internal/store/schedules_test.go index 10262f6..7df83a9 100644 --- a/internal/store/schedules_test.go +++ b/internal/store/schedules_test.go @@ -21,29 +21,37 @@ func makeSchedHost(t *testing.T, s *Store) string { return id } +// makeGroup is a minimal source-group helper for schedule tests +// (schedules need at least one group to point at). +func makeGroup(t *testing.T, s *Store, hostID, name, id string) string { + t.Helper() + if err := s.CreateSourceGroup(context.Background(), &SourceGroup{ + ID: id, HostID: hostID, Name: name, + Includes: []string{"/etc"}, + }); err != nil { + t.Fatalf("create group %s: %v", name, err) + } + return id +} + func TestSchedulesCRUDAndVersionBump(t *testing.T) { t.Parallel() s := openTestStore(t) ctx := context.Background() hostID := makeSchedHost(t, s) + gid := makeGroup(t, s, hostID, "default", "01HSCHEDGRP00000000000001") - // Initial version is 0 (no row). - v, err := s.GetHostScheduleVersion(ctx, hostID) - if err != nil { - t.Fatal(err) - } - if v != 0 { - t.Fatalf("initial version: got %d, want 0", v) + // Group creation already bumped version to 1. + v, _ := s.GetHostScheduleVersion(ctx, hostID) + if v != 1 { + t.Fatalf("version after group create: got %d, want 1", v) } - keepLast := 7 sched := Schedule{ ID: "01SCHED000000000000000001", HostID: hostID, - Kind: "backup", CronExpr: "0 3 * * *", - Paths: []string{"/etc", "/home"}, - Tags: []string{"nightly"}, - RetentionPolicy: RetentionPolicy{KeepLast: &keepLast}, - Enabled: true, + CronExpr: "0 3 * * *", + Enabled: true, + SourceGroupIDs: []string{gid}, } if err := s.CreateSchedule(ctx, &sched); err != nil { t.Fatalf("create: %v", err) @@ -53,37 +61,33 @@ func TestSchedulesCRUDAndVersionBump(t *testing.T) { } v, _ = s.GetHostScheduleVersion(ctx, hostID) - if v != 1 { - t.Fatalf("version after create: got %d, want 1", v) + if v != 2 { + t.Fatalf("version after schedule create: got %d, want 2", v) } - // Round-trip read. + // Round-trip read — junction populated. got, err := s.GetSchedule(ctx, hostID, sched.ID) if err != nil { t.Fatalf("get: %v", err) } - if got.CronExpr != "0 3 * * *" || len(got.Paths) != 2 { - t.Fatalf("round-trip lost data: %+v", got) - } - if got.RetentionPolicy.KeepLast == nil || *got.RetentionPolicy.KeepLast != 7 { - t.Fatalf("retention round-trip: %+v", got.RetentionPolicy) + if got.CronExpr != "0 3 * * *" || len(got.SourceGroupIDs) != 1 || got.SourceGroupIDs[0] != gid { + t.Fatalf("round-trip: %+v", got) } - // List sees it. list, err := s.ListSchedulesByHost(ctx, hostID) if err != nil || len(list) != 1 || list[0].ID != sched.ID { t.Fatalf("list: err=%v rows=%v", err, list) } - // Update bumps version. + // Update — flip enabled, swap junction (re-add same gid). Version bumps. sched.CronExpr = "*/30 * * * *" sched.Enabled = false if err := s.UpdateSchedule(ctx, &sched); err != nil { t.Fatalf("update: %v", err) } v, _ = s.GetHostScheduleVersion(ctx, hostID) - if v != 2 { - t.Fatalf("version after update: got %d, want 2", v) + if v != 3 { + t.Fatalf("version after update: got %d, want 3", v) } got, _ = s.GetSchedule(ctx, hostID, sched.ID) if got.CronExpr != "*/30 * * * *" || got.Enabled { @@ -95,14 +99,54 @@ func TestSchedulesCRUDAndVersionBump(t *testing.T) { t.Fatalf("delete: %v", err) } v, _ = s.GetHostScheduleVersion(ctx, hostID) - if v != 3 { - t.Fatalf("version after delete: got %d, want 3", v) + if v != 4 { + t.Fatalf("version after delete: got %d, want 4", v) } if err := s.DeleteSchedule(ctx, hostID, sched.ID); !errors.Is(err, ErrNotFound) { t.Fatalf("delete after delete: want ErrNotFound, got %v", err) } } +func TestSchedulesUsingGroup(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + g1 := makeGroup(t, s, hostID, "default", "01HUSEGRPGRP000000000001") + g2 := makeGroup(t, s, hostID, "databases", "01HUSEGRPGRP000000000002") + + // Schedule A points at g1 only; Schedule B points at both. + if err := s.CreateSchedule(ctx, &Schedule{ + ID: "01HUSEGRPSCHED0000000001", HostID: hostID, + CronExpr: "@hourly", Enabled: true, + SourceGroupIDs: []string{g1}, + }); err != nil { + t.Fatal(err) + } + if err := s.CreateSchedule(ctx, &Schedule{ + ID: "01HUSEGRPSCHED0000000002", HostID: hostID, + CronExpr: "0 3 * * *", Enabled: true, + SourceGroupIDs: []string{g1, g2}, + }); err != nil { + t.Fatal(err) + } + + g1Sched, err := s.SchedulesUsingGroup(ctx, g1) + if err != nil { + t.Fatal(err) + } + if len(g1Sched) != 2 { + t.Fatalf("g1 should be in 2 schedules, got %d", len(g1Sched)) + } + g2Sched, err := s.SchedulesUsingGroup(ctx, g2) + if err != nil { + t.Fatal(err) + } + if len(g2Sched) != 1 { + t.Fatalf("g2 should be in 1 schedule, got %d", len(g2Sched)) + } +} + func TestSetHostAppliedScheduleVersion(t *testing.T) { t.Parallel() s := openTestStore(t) diff --git a/internal/store/sources.go b/internal/store/sources.go new file mode 100644 index 0000000..7c81b57 --- /dev/null +++ b/internal/store/sources.go @@ -0,0 +1,261 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "time" +) + +// CreateSourceGroup inserts a new group + bumps host_schedule_version +// in one tx. Group name doubles as the snapshot tag on backups; the +// (host_id, name) UNIQUE constraint enforces tag unambiguity. +func (st *Store) CreateSourceGroup(ctx context.Context, g *SourceGroup) error { + if g.ID == "" || g.HostID == "" || g.Name == "" { + return errors.New("store: source group id, host_id, name required") + } + now := time.Now().UTC() + g.CreatedAt = now + g.UpdatedAt = now + if g.Includes == nil { + g.Includes = []string{} + } + if g.Excludes == nil { + g.Excludes = []string{} + } + if g.RetryMax == 0 { + g.RetryMax = 3 + } + if g.RetryBackoffSeconds == 0 { + g.RetryBackoffSeconds = 60 + } + includesJSON, _ := json.Marshal(g.Includes) + excludesJSON, _ := json.Marshal(g.Excludes) + retentionJSON, _ := json.Marshal(g.RetentionPolicy) + + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + if _, err := tx.ExecContext(ctx, + `INSERT INTO source_groups ( + id, host_id, name, includes, excludes, retention_policy, + retry_max, retry_backoff_seconds, conflict_dimension, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + g.ID, g.HostID, g.Name, + string(includesJSON), string(excludesJSON), string(retentionJSON), + g.RetryMax, g.RetryBackoffSeconds, + nullableString(g.ConflictDimension), + now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), + ); err != nil { + return fmt.Errorf("store: create source group: %w", err) + } + if err := bumpHostScheduleVersionTx(ctx, tx, g.HostID); err != nil { + return err + } + return tx.Commit() +} + +// UpdateSourceGroup replaces every editable field on an existing row +// and bumps host_schedule_version. Returns ErrNotFound if no row matched. +func (st *Store) UpdateSourceGroup(ctx context.Context, g *SourceGroup) error { + if g.ID == "" || g.HostID == "" || g.Name == "" { + return errors.New("store: source group id, host_id, name required") + } + if g.Includes == nil { + g.Includes = []string{} + } + if g.Excludes == nil { + g.Excludes = []string{} + } + includesJSON, _ := json.Marshal(g.Includes) + excludesJSON, _ := json.Marshal(g.Excludes) + retentionJSON, _ := json.Marshal(g.RetentionPolicy) + now := time.Now().UTC() + + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + res, err := tx.ExecContext(ctx, + `UPDATE source_groups SET + name = ?, includes = ?, excludes = ?, retention_policy = ?, + retry_max = ?, retry_backoff_seconds = ?, conflict_dimension = ?, + updated_at = ? + WHERE id = ? AND host_id = ?`, + g.Name, + string(includesJSON), string(excludesJSON), string(retentionJSON), + g.RetryMax, g.RetryBackoffSeconds, + nullableString(g.ConflictDimension), + now.Format(time.RFC3339Nano), + g.ID, g.HostID, + ) + if err != nil { + return fmt.Errorf("store: update source group: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + g.UpdatedAt = now + if err := bumpHostScheduleVersionTx(ctx, tx, g.HostID); err != nil { + return err + } + return tx.Commit() +} + +// DeleteSourceGroup removes a group and bumps host_schedule_version. +// Junction rows in schedule_source_groups go via ON DELETE CASCADE. +// Caller is expected to have already enforced the "default group +// can't be the only one" UI rule; this layer just deletes. +func (st *Store) DeleteSourceGroup(ctx context.Context, hostID, groupID string) error { + tx, err := st.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + res, err := tx.ExecContext(ctx, + `DELETE FROM source_groups WHERE id = ? AND host_id = ?`, + groupID, hostID) + if err != nil { + return fmt.Errorf("store: delete source group: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { + return err + } + return tx.Commit() +} + +// GetSourceGroup returns one group by (host_id, id). ErrNotFound on miss. +func (st *Store) GetSourceGroup(ctx context.Context, hostID, groupID string) (*SourceGroup, error) { + row := st.db.QueryRowContext(ctx, + `SELECT id, host_id, name, includes, excludes, retention_policy, + retry_max, retry_backoff_seconds, conflict_dimension, + created_at, updated_at + FROM source_groups WHERE id = ? AND host_id = ?`, + groupID, hostID) + g, err := scanSourceGroup(row) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return g, err +} + +// GetSourceGroupByName resolves a group by its (host-unique) name. +// Used by retention-conflict detection and the auto-init flow. +func (st *Store) GetSourceGroupByName(ctx context.Context, hostID, name string) (*SourceGroup, error) { + row := st.db.QueryRowContext(ctx, + `SELECT id, host_id, name, includes, excludes, retention_policy, + retry_max, retry_backoff_seconds, conflict_dimension, + created_at, updated_at + FROM source_groups WHERE host_id = ? AND name = ?`, + hostID, name) + g, err := scanSourceGroup(row) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return g, err +} + +// ListSourceGroupsByHost returns every group for a host, ordered +// by name (so 'default' isn't always at the bottom alphabetically — +// well, it usually IS the only 'd' name on a fresh host so this +// works out fine). Empty slice on miss. +func (st *Store) ListSourceGroupsByHost(ctx context.Context, hostID string) ([]SourceGroup, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT id, host_id, name, includes, excludes, retention_policy, + retry_max, retry_backoff_seconds, conflict_dimension, + created_at, updated_at + FROM source_groups WHERE host_id = ? ORDER BY name`, + hostID) + if err != nil { + return nil, fmt.Errorf("store: list source groups: %w", err) + } + defer rows.Close() + out := []SourceGroup{} + for rows.Next() { + g, err := scanSourceGroupRow(rows) + if err != nil { + return nil, err + } + out = append(out, *g) + } + return out, rows.Err() +} + +// SetSourceGroupConflict updates only the cached conflict_dimension. +// Doesn't bump schedule version (the cache is server-internal, agent +// doesn't see it). Empty string clears the conflict. +func (st *Store) SetSourceGroupConflict(ctx context.Context, groupID, dimension string) error { + _, err := st.db.ExecContext(ctx, + `UPDATE source_groups SET conflict_dimension = ? WHERE id = ?`, + nullableString(dimension), groupID) + if err != nil { + return fmt.Errorf("store: set source group conflict: %w", err) + } + return nil +} + +// ----- scan helpers -------------------------------------------------- + +func scanSourceGroup(row *sql.Row) (*SourceGroup, error) { + return scanSourceGroupRow(row) +} + +type sourceGroupScanner interface { + Scan(dest ...any) error +} + +func scanSourceGroupRow(s sourceGroupScanner) (*SourceGroup, error) { + var ( + out SourceGroup + includes, excludes, retention string + conflict sql.NullString + createdAt, updatedAt string + ) + err := s.Scan(&out.ID, &out.HostID, &out.Name, + &includes, &excludes, &retention, + &out.RetryMax, &out.RetryBackoffSeconds, &conflict, + &createdAt, &updatedAt) + if err != nil { + return nil, err + } + if includes != "" { + _ = json.Unmarshal([]byte(includes), &out.Includes) + } + if excludes != "" { + _ = json.Unmarshal([]byte(excludes), &out.Excludes) + } + if retention != "" { + _ = json.Unmarshal([]byte(retention), &out.RetentionPolicy) + } + if conflict.Valid { + out.ConflictDimension = conflict.String + } + if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil { + out.CreatedAt = t + } + if t, err := time.Parse(time.RFC3339Nano, updatedAt); err == nil { + out.UpdatedAt = t + } + return &out, nil +} + +func nullableString(s string) any { + if s == "" { + return nil + } + return s +} diff --git a/internal/store/sources_test.go b/internal/store/sources_test.go new file mode 100644 index 0000000..bc56013 --- /dev/null +++ b/internal/store/sources_test.go @@ -0,0 +1,221 @@ +package store + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestSourceGroupCRUDAndVersionBump(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + + keepLast := 7 + g := SourceGroup{ + ID: "01HSGRP00000000000000001", HostID: hostID, Name: "default", + Includes: []string{"/etc", "/home"}, + Excludes: []string{"*.tmp"}, + RetentionPolicy: RetentionPolicy{KeepLast: &keepLast}, + } + if err := s.CreateSourceGroup(ctx, &g); err != nil { + t.Fatalf("create: %v", err) + } + if g.RetryMax != 3 || g.RetryBackoffSeconds != 60 { + t.Fatalf("retry defaults not applied: %+v", g) + } + v, _ := s.GetHostScheduleVersion(ctx, hostID) + if v != 1 { + t.Fatalf("version after create: got %d, want 1", v) + } + + // Round-trip. + got, err := s.GetSourceGroup(ctx, hostID, g.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.Name != "default" || len(got.Includes) != 2 || len(got.Excludes) != 1 { + t.Fatalf("round-trip: %+v", got) + } + if got.RetentionPolicy.KeepLast == nil || *got.RetentionPolicy.KeepLast != 7 { + t.Fatalf("retention round-trip: %+v", got.RetentionPolicy) + } + + // By name. + byName, err := s.GetSourceGroupByName(ctx, hostID, "default") + if err != nil || byName.ID != g.ID { + t.Fatalf("get by name: err=%v got=%v", err, byName) + } + + // Update — rename + new retention. Version bumps. + keepDaily := 14 + g.Name = "system" + g.RetentionPolicy = RetentionPolicy{KeepDaily: &keepDaily} + if err := s.UpdateSourceGroup(ctx, &g); err != nil { + t.Fatal(err) + } + v, _ = s.GetHostScheduleVersion(ctx, hostID) + if v != 2 { + t.Fatalf("version after update: got %d, want 2", v) + } + got, _ = s.GetSourceGroup(ctx, hostID, g.ID) + if got.Name != "system" || got.RetentionPolicy.KeepLast != nil || got.RetentionPolicy.KeepDaily == nil { + t.Fatalf("update did not persist: %+v", got) + } + + // Conflict cache (no version bump). + if err := s.SetSourceGroupConflict(ctx, g.ID, "hourly"); err != nil { + t.Fatal(err) + } + got, _ = s.GetSourceGroup(ctx, hostID, g.ID) + if got.ConflictDimension != "hourly" { + t.Fatalf("conflict not cached: %q", got.ConflictDimension) + } + v2, _ := s.GetHostScheduleVersion(ctx, hostID) + if v2 != v { + t.Fatalf("conflict cache should not bump version: %d → %d", v, v2) + } + + // List. + list, _ := s.ListSourceGroupsByHost(ctx, hostID) + if len(list) != 1 || list[0].ID != g.ID { + t.Fatalf("list: %v", list) + } + + // Delete bumps version. + if err := s.DeleteSourceGroup(ctx, hostID, g.ID); err != nil { + t.Fatal(err) + } + v3, _ := s.GetHostScheduleVersion(ctx, hostID) + if v3 != 3 { + t.Fatalf("version after delete: got %d, want 3", v3) + } + if err := s.DeleteSourceGroup(ctx, hostID, g.ID); !errors.Is(err, ErrNotFound) { + t.Fatalf("delete after delete: want ErrNotFound, got %v", err) + } +} + +func TestSourceGroupNameUniquePerHost(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + + if err := s.CreateSourceGroup(ctx, &SourceGroup{ + ID: "01HUNIQGRP00000000000001", HostID: hostID, Name: "shared", + }); err != nil { + t.Fatal(err) + } + err := s.CreateSourceGroup(ctx, &SourceGroup{ + ID: "01HUNIQGRP00000000000002", HostID: hostID, Name: "shared", + }) + if err == nil { + t.Fatal("expected unique-constraint error on duplicate name within host") + } +} + +func TestRepoMaintenanceDefaultsAndUpdate(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + + if _, err := s.GetRepoMaintenance(ctx, hostID); !errors.Is(err, ErrNotFound) { + t.Fatalf("expected ErrNotFound before seed, got %v", err) + } + + if err := s.CreateDefaultRepoMaintenance(ctx, hostID); err != nil { + t.Fatal(err) + } + m, err := s.GetRepoMaintenance(ctx, hostID) + if err != nil { + t.Fatal(err) + } + if m.ForgetCron != "0 3 * * *" || !m.ForgetEnabled { + t.Fatalf("forget defaults: %+v", m) + } + if m.PruneCron != "0 4 * * 0" || m.CheckSubsetPct != 5 { + t.Fatalf("other defaults: %+v", m) + } + + m.ForgetCron = "0 4 * * *" + m.PruneEnabled = false + m.CheckSubsetPct = 10 + if err := s.UpdateRepoMaintenance(ctx, m); err != nil { + t.Fatal(err) + } + m2, _ := s.GetRepoMaintenance(ctx, hostID) + if m2.ForgetCron != "0 4 * * *" || m2.PruneEnabled || m2.CheckSubsetPct != 10 { + t.Fatalf("update did not persist: %+v", m2) + } + + // CreateDefaultRepoMaintenance is idempotent (INSERT OR IGNORE). + if err := s.CreateDefaultRepoMaintenance(ctx, hostID); err != nil { + t.Fatal(err) + } + m3, _ := s.GetRepoMaintenance(ctx, hostID) + if m3.ForgetCron != "0 4 * * *" { + t.Fatalf("INSERT OR IGNORE clobbered existing row: %+v", m3) + } +} + +func TestPendingRunQueue(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostID := makeSchedHost(t, s) + gid := makeGroup(t, s, hostID, "default", "01HPENDGRP00000000000001") + schedID := "01HPENDSCHED0000000000001" + if err := s.CreateSchedule(ctx, &Schedule{ + ID: schedID, HostID: hostID, CronExpr: "@hourly", Enabled: true, + SourceGroupIDs: []string{gid}, + }); err != nil { + t.Fatal(err) + } + + now := time.Now().UTC() + if err := s.EnqueuePendingRun(ctx, &PendingRun{ + ID: "01HPEND00000000000000001", + ScheduleID: schedID, SourceGroupID: gid, HostID: hostID, + NextAttemptAt: now.Add(-time.Second), // already due + ScheduledAt: now.Add(-time.Minute), + }); err != nil { + t.Fatal(err) + } + due, err := s.DuePendingRuns(ctx, now, 10) + if err != nil { + t.Fatal(err) + } + if len(due) != 1 { + t.Fatalf("due: got %d, want 1", len(due)) + } + if due[0].Attempt != 1 { + t.Fatalf("attempt: %d", due[0].Attempt) + } + + // Bump. + next := now.Add(2 * time.Minute) + if err := s.BumpPendingRunAttempt(ctx, due[0].ID, next, "agent offline"); err != nil { + t.Fatal(err) + } + // No longer due at `now`. + due, _ = s.DuePendingRuns(ctx, now, 10) + if len(due) != 0 { + t.Fatalf("should not be due yet: %v", due) + } + // Due at `next`. + due, _ = s.DuePendingRuns(ctx, next, 10) + if len(due) != 1 || due[0].Attempt != 2 || due[0].LastError != "agent offline" { + t.Fatalf("after bump: %+v", due) + } + + if err := s.DeletePendingRun(ctx, due[0].ID); err != nil { + t.Fatal(err) + } + due, _ = s.DuePendingRuns(ctx, next, 10) + if len(due) != 0 { + t.Fatalf("after delete: %v", due) + } +} diff --git a/internal/store/types.go b/internal/store/types.go index ea65c1e..e799f09 100644 --- a/internal/store/types.go +++ b/internal/store/types.go @@ -30,7 +30,7 @@ const ( // token; the DB stores its hash. Callers that hold a *Session have // already authenticated. type Session struct { - ID string // session token (raw); never persisted as-is + ID string UserID string CreatedAt time.Time ExpiresAt time.Time @@ -38,66 +38,77 @@ type Session struct { UA string } -// Host mirrors the denormalised hosts table. JSON columns (tags) are -// returned decoded into Go slices for ergonomics. +// Host mirrors the hosts table. The P2 redesign moved repo-related +// flags out (auto-init replaces RepoInitialisedAt; bandwidth lives +// here as a host-wide cap; "what to back up" lives on source_groups). type Host struct { - ID string - Name string - OS string - Arch string - AgentVersion string - ResticVersion string - ProtocolVersion int - EnrolledAt time.Time - LastSeenAt *time.Time - Status string - RepoID *string - Tags []string - CurrentJobID *string - LastBackupAt *time.Time - LastBackupStatus *string - RepoSizeBytes int64 - SnapshotCount int - OpenAlertCount int - AppliedScheduleVersion int64 - // RepoInitialisedAt is non-nil once we've confirmed the host's - // repo has been initialised — either the operator clicked the - // init button, or a backup succeeded, or snapshots.report came - // back non-empty. The host detail run-now panel shows a red - // "Initialise repo" affordance while this is nil. - RepoInitialisedAt *time.Time + ID string + Name string + OS string + Arch string + AgentVersion string + ResticVersion string + ProtocolVersion int + EnrolledAt time.Time + LastSeenAt *time.Time + Status string + RepoID *string + Tags []string + CurrentJobID *string + LastBackupAt *time.Time + LastBackupStatus *string + RepoSizeBytes int64 + SnapshotCount int + OpenAlertCount int + AppliedScheduleVersion int64 + // Host-wide bandwidth caps applied to every restic invocation + // (backup, restore, prune). nil = no cap. + BandwidthUpKBps *int + BandwidthDownKBps *int } -// Schedule mirrors one row of the schedules table. JSON columns -// (paths, excludes, tags, retention_policy, options) are decoded -// into Go-native shapes for ergonomics; the wire form on the agent -// side keeps retention_policy / options as raw JSON since the agent -// just forwards them to restic. +// Schedule is now intentionally slim: cron + which groups + enabled. +// The "what" lives on SourceGroup (paths, excludes, retention, retry). +// The "kind" of operation a schedule can drive is implicit — backup +// only. forget/prune/check are repo-level cadences on +// HostRepoMaintenance, not schedule kinds. type Schedule struct { - ID string - HostID string - Kind string - CronExpr string - Paths []string - Excludes []string - Tags []string - RetentionPolicy RetentionPolicy - Options ScheduleOptions - PreHook string - PostHook string - Enabled bool - // Manual schedules carry paths/excludes/tags/retention like any - // other but have no cron — they only fire when the operator - // clicks Run-now. Lets us keep one data shape for "what gets - // backed up" without forcing every host to have an automated - // schedule. Created by Add-host with the typed paths. - Manual bool - CreatedAt time.Time - UpdatedAt time.Time + ID string + HostID string + CronExpr string + Enabled bool + CreatedAt time.Time + UpdatedAt time.Time + // SourceGroupIDs is populated by ListSchedulesByHost (joins + // schedule_source_groups) and accepted on Create / Update so the + // caller passes the desired junction state in one shape. + SourceGroupIDs []string +} + +// SourceGroup is the new home for "what gets backed up." A named +// bundle of include + exclude paths plus a retention policy. Group +// name doubles as the snapshot tag (restic --tag ) so retention +// can target it via `restic forget --tag`. +type SourceGroup struct { + ID string + HostID string + Name string + Includes []string + Excludes []string + RetentionPolicy RetentionPolicy + RetryMax int + RetryBackoffSeconds int + // ConflictDimension is the cached name of the failing keep-* on + // a granularity↔cadence mismatch (e.g. "hourly" when keep-hourly + // is set but no schedule pointing at this group fires sub-daily). + // Empty means no conflict. Refreshed on every schedule + group CRUD. + ConflictDimension string + CreatedAt time.Time + UpdatedAt time.Time } // RetentionPolicy is the typed view of `restic forget --keep-*`. -// All fields nullable so empty == "no policy / keep everything". +// All fields nullable; empty struct = "keep everything for this group." type RetentionPolicy struct { KeepLast *int `json:"keep_last,omitempty"` KeepHourly *int `json:"keep_hourly,omitempty"` @@ -107,8 +118,8 @@ type RetentionPolicy struct { KeepYearly *int `json:"keep_yearly,omitempty"` } -// Summary renders a compact human view of the policy for templates -// and logs — "last=7, d=14, w=4" or "—" when nothing is set. +// Summary renders a compact human view — "last=7, d=14, w=4" or +// "—" when nothing is set. Used by templates and logs. func (p RetentionPolicy) Summary() string { parts := []string{} for _, kv := range []struct { @@ -132,18 +143,36 @@ func (p RetentionPolicy) Summary() string { return strings.Join(parts, ", ") } -// ScheduleOptions covers per-schedule knobs that aren't core to the -// command itself — currently bandwidth caps. Stored as JSON so -// future fields don't churn the schema. -type ScheduleOptions struct { - LimitUploadKBps *int `json:"limit_upload_kbps,omitempty"` - LimitDownloadKBps *int `json:"limit_download_kbps,omitempty"` +// HostRepoMaintenance carries the host-level cron cadences for the +// three repo-wide verbs (forget / prune / check). 1:1 with hosts; +// row is auto-created at host enrolment with sensible defaults. +type HostRepoMaintenance struct { + HostID string + ForgetCron string + ForgetEnabled bool + PruneCron string + PruneEnabled bool + CheckCron string + CheckEnabled bool + CheckSubsetPct int } -// EnrollmentToken is the issuer's view of a one-time token. The -// raw token is returned only at create time; the DB stores its hash. +// PendingRun queues a missed cron tick (agent was offline) for the +// server-side retry ticker to dispatch later. +type PendingRun struct { + ID string + ScheduleID string + SourceGroupID string + HostID string + Attempt int + NextAttemptAt time.Time + ScheduledAt time.Time // original cron tick — forensic / audit + LastError string +} + +// EnrollmentToken is the issuer's view of a one-time token. type EnrollmentToken struct { - Raw string // populated on create only + Raw string TokenHash string CreatedAt time.Time ExpiresAt time.Time @@ -153,7 +182,7 @@ type EnrollmentToken struct { type AuditEntry struct { ID string UserID *string - Actor string // user|agent|system + Actor string Action string TargetKind *string TargetID *string