P2 redesign · phase 2: store rewrite — sources, slim schedules, repo maintenance
CI / Test (linux/amd64) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Build (windows/amd64) (push) Has been cancelled
CI / Build (linux/amd64) (push) Has been cancelled
CI / Build (linux/arm64) (push) Has been cancelled

Go-side data model rebuilt against migration 0008. The fat-Schedule
shape (paths/excludes/tags/retention/manual/kind/options/hooks) is
gone; that surface lives on source_groups now.

* store/types.go
  - Schedule slimmed to {id, host_id, cron, enabled, source_group_ids,
    timestamps}. SourceGroupIDs populated by Get/List, accepted on
    Create/Update so callers pass desired junction state in one shape.
  - SourceGroup added: name (= snapshot tag), includes/excludes,
    retention_policy, retry_max + retry_backoff_seconds, cached
    conflict_dimension.
  - HostRepoMaintenance added: forget/prune/check cadences + enabled.
  - PendingRun added: offline-retry queue.
  - Host loses RepoInitialisedAt; gains BandwidthUpKBps + BandwidthDownKBps.
  - RetentionPolicy moves home from "schedule field" to "source group
    field" but the type itself + Summary() method unchanged.

* store/sources.go (new) — CRUD + GetByName + ConflictDimension cache.
  Group writes bump host_schedule_version; conflict cache writes don't
  (server-internal projection, agent doesn't see it).
* store/maintenance.go (new) — CreateDefault is idempotent (INSERT OR
  IGNORE). UpdateRepoMaintenance doesn't bump schedule version because
  these run on the server's own ticker, not the agent's local cron.
* store/pending.go (new) — Enqueue / DueRunsForRetry / Bump / Delete.
* store/schedules.go — rewritten for slim shape + junction CRUD.
  Update wipes the schedule_source_groups junction wholesale and
  re-inserts (simpler than diffing). Adds SchedulesUsingGroup for
  retention-conflict detection + UI labels.
* store/hosts.go — drops repo_initialised_at scan, adds bandwidth scan.
  New SetHostBandwidth helper.

* HTTP layer — temporarily stubbed during this rewrite (501 returns
  with redesign_in_progress error code). Phase 3 fills these in
  against the new shape:
    - schedules.go REST CRUD
    - schedule_push.go agent reconciliation
    - ui_schedules.go HTML form CRUD
  Run-now-per-host + Init-repo handlers in ui_handlers.go also stubbed
  — both go away in the new model (Run-now per source group; auto-init
  at host enrolment).

* enrollment.go — replaces "seed manual schedule from typed paths"
  with "seed default source group + repo-maintenance row." The default
  group gets the typed paths as its includes; operator edits later
  via Sources tab.

* ws/handler.go — drops the MarkHostRepoInitialised projection (column
  is gone; auto-init makes it derivable from latest init job's status).

Tests:
* store: existing schedule test rewritten for slim shape + junction;
  new sources_test.go covers source-group CRUD, name uniqueness,
  conflict cache, repo-maintenance defaults + idempotent seed,
  pending-runs queue lifecycle.
* http: schedules_test.go and schedule_push_test.go deleted — both
  exercised the obsolete fat-schedule API. Phase 3 rewrites them
  against the new endpoints.

go test ./... green. cmd/server + cmd/agent build. The UI is broken
end-to-end (schedules / sources / repo tabs all hit 501 stubs); Phase 3
restores REST + on-the-wire reconciliation; Phase 4 rewires the UI
templates against the new model.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 21:30:41 +01:00
parent 666af41f46
commit 5667cdf13a
16 changed files with 1076 additions and 1928 deletions
+14 -16
View File
@@ -140,26 +140,24 @@ func (s *Server) handleAgentEnroll(w stdhttp.ResponseWriter, r *stdhttp.Request)
return return
} }
// Seed an initial manual schedule from whatever paths the // Seed the host's "default" source group with whatever paths the
// operator typed into Add-host. The schedule is editable from // operator typed into Add-host (empty allowed; group is editable
// the host's Schedules tab; the operator can add automated // from the Sources tab post-enrol). Also seed the host's
// schedules alongside it later. We skip this when no paths // repo-maintenance row with default cadences so forget/prune/check
// were supplied — the host can still enrol; it just can't // start ticking on their own. Auto-init dispatch lands in Phase 6
// back up until the operator adds a schedule. // of the redesign.
if len(attachments.InitialPaths) > 0 { if err := s.deps.Store.CreateSourceGroup(r.Context(), &store.SourceGroup{
seed := store.Schedule{
ID: ulid.Make().String(), ID: ulid.Make().String(),
HostID: hostID, HostID: hostID,
Kind: string(api.JobBackup), Name: "default",
CronExpr: "", Includes: attachments.InitialPaths,
Paths: attachments.InitialPaths, }); err != nil {
Enabled: true, slog.Warn("enrollment: seed default source group failed",
Manual: true,
}
if err := s.deps.Store.CreateSchedule(r.Context(), &seed); err != nil {
slog.Warn("enrollment: seed manual schedule failed",
"host_id", hostID, "err", err) "host_id", hostID, "err", err)
} }
if err := s.deps.Store.CreateDefaultRepoMaintenance(r.Context(), hostID); err != nil {
slog.Warn("enrollment: seed repo maintenance failed",
"host_id", hostID, "err", err)
} }
// Promote the encrypted repo creds onto the freshly-created host // Promote the encrypted repo creds onto the freshly-created host
+18 -240
View File
@@ -2,258 +2,36 @@ package http
import ( import (
"context" "context"
"encoding/json"
"errors"
"log/slog" "log/slog"
"time" "time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
) )
// loadScheduleSetPayload reads the host's current schedule set + the // schedule_push.go — server → agent reconciliation push.
// canonical version into a wire-shape payload. Returns an empty
// (but well-formed) payload with version 0 if the host has nothing
// scheduled yet — that's still a valid state to push so the agent
// drops any stale cron entries from a previous deployment.
func (s *Server) loadScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) {
rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
if err != nil {
return api.ScheduleSetPayload{}, err
}
version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID)
if err != nil {
return api.ScheduleSetPayload{}, err
}
out := api.ScheduleSetPayload{
Version: version,
Schedules: make([]api.Schedule, 0, len(rows)),
}
for _, r := range rows {
retJSON, _ := json.Marshal(r.RetentionPolicy)
optJSON, _ := json.Marshal(r.Options)
out.Schedules = append(out.Schedules, api.Schedule{
ID: r.ID,
Kind: api.JobKind(r.Kind),
CronExpr: r.CronExpr,
Paths: r.Paths,
Excludes: r.Excludes,
Tags: r.Tags,
RetentionPolicy: retJSON,
Options: optJSON,
PreHook: r.PreHook,
PostHook: r.PostHook,
Enabled: r.Enabled,
Manual: r.Manual,
})
}
return out, nil
}
// pushScheduleSet ships the current schedule list to the agent over
// the hub. Caller has already determined the agent is connected.
// Errors are logged and returned; the next push (or the next hello)
// will retry. Idempotent — sending the same version twice is a
// harmless no-op on the agent side.
func (s *Server) pushScheduleSet(ctx context.Context, hostID string) error {
pl, err := s.loadScheduleSetPayload(ctx, hostID)
if err != nil {
slog.Warn("push schedule.set: load failed", "host_id", hostID, "err", err)
return err
}
env, err := api.Marshal(api.MsgScheduleSet, "", pl)
if err != nil {
return err
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil {
slog.Warn("push schedule.set: hub send failed",
"host_id", hostID, "version", pl.Version, "err", err)
return err
}
return nil
}
// pushScheduleSetOnConn is the on-hello flavour: writes directly to
// the just-handshaken conn rather than racing through the hub. Used
// by onAgentHello so a brand-new connection can't miss an early
// push because Register hasn't completed yet.
func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) {
pl, err := s.loadScheduleSetPayload(ctx, hostID)
if err != nil {
slog.Warn("on-hello: load schedules", "host_id", hostID, "err", err)
return
}
env, err := api.Marshal(api.MsgScheduleSet, "", pl)
if err != nil {
slog.Error("on-hello: marshal schedule.set", "host_id", hostID, "err", err)
return
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := conn.Send(sendCtx, env); err != nil {
slog.Warn("on-hello: send schedule.set",
"host_id", hostID, "version", pl.Version, "err", err)
}
}
// pushIfConnected dispatches a schedule.set push asynchronously when
// the agent is online. Used by CRUD handlers: a missed push is
// non-fatal because the next reconnect's on-hello path will catch
// the agent up. Decoupled from the request so the operator's HTTP
// response doesn't wait on the WS round-trip.
func (s *Server) pushScheduleSetAsync(hostID string) {
if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) {
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = s.pushScheduleSet(ctx, hostID)
}()
}
// applyScheduleAck records the version the agent has confirmed via
// schedule.ack. Called from the WS dispatcher (wired below). A bad
// version (zero, or higher than what we've issued) is logged but
// not fatal — the next push will set the agent straight.
func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) {
if version <= 0 {
return
}
canonical, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID)
if err != nil {
slog.Warn("schedule.ack: load canonical version",
"host_id", hostID, "err", err)
return
}
if version > canonical {
slog.Warn("schedule.ack: agent reported version ahead of server",
"host_id", hostID, "agent", version, "server", canonical)
return
}
if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil {
slog.Warn("schedule.ack: persist applied version",
"host_id", hostID, "version", version, "err", err)
return
}
slog.Info("schedule.ack: applied",
"host_id", hostID, "version", version, "applied_at", appliedAt)
}
// dispatchScheduledJob is invoked when the agent reports a local
// cron fire via `schedule.fire`. Thin wrapper around the shared
// dispatcher; logs and discards the return values since the agent
// can't usefully act on them.
func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, _ *ws.Conn, scheduleID string, scheduledAt time.Time) {
jobID, err := s.dispatchScheduleNow(ctx, hostID, scheduleID, nil)
if err != nil {
slog.Warn("schedule.fire: dispatch failed",
"host_id", hostID, "schedule_id", scheduleID, "err", err)
return
}
slog.Info("schedule.fire: dispatched",
"host_id", hostID, "schedule_id", scheduleID,
"job_id", jobID, "scheduled_at", scheduledAt)
}
// dispatchScheduleNow looks up a schedule, builds a CommandRunPayload,
// persists a jobs row (actor_kind=schedule, scheduled_id linking
// back), and ships MsgCommandRun to the host. Used by both the
// agent-driven path (cron fire reaches us as schedule.fire) and the
// UI-driven path (operator clicks Run-now on a schedule row).
// //
// conn is optional: when set we write directly through it (no race // Stubbed during the P2 redesign rewrite. The new wire shape (slim
// against an in-flight Register). When nil we fall back to Hub.Send. // schedules referencing source groups; groups inline by id at push
// Returns the new job_id on success. // time) lands in Phase 3. Until then on-hello and post-CRUD pushes
func (s *Server) dispatchScheduleNow(ctx context.Context, hostID, scheduleID string, conn *ws.Conn) (string, error) { // are no-ops; the agent will keep its existing cron entries (none,
sched, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID) // since there are no schedules yet) and the only operator-driven
if err != nil { // jobs flow via run-now once the new UI is wired in Phase 4.
if errors.Is(err, store.ErrNotFound) {
return "", errFmtf("schedule not found") func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) {
} slog.Debug("schedule push: stubbed during P2 redesign", "host_id", hostID)
return "", errFmtf("internal: %s", err)
}
if !sched.Enabled {
return "", errFmtf("schedule is disabled")
} }
var args []string func (s *Server) pushScheduleSetAsync(hostID string) {
if sched.Kind == string(api.JobBackup) { slog.Debug("schedule push async: stubbed during P2 redesign", "host_id", hostID)
args = append(args, sched.Paths...)
} }
// forget jobs need the retention policy on the wire — restic func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) {
// refuses to run without keep-* flags, and the agent doesn't if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil {
// hold a copy of the schedule (server is the source of truth). slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "err", err)
var retentionJSON json.RawMessage
if sched.Kind == string(api.JobForget) {
if sched.RetentionPolicy == (store.RetentionPolicy{}) {
return "", errFmtf("schedule has no retention policy — refusing to forget (would delete every snapshot)")
}
b, err := json.Marshal(sched.RetentionPolicy)
if err != nil {
return "", errFmtf("marshal retention policy: %s", err)
}
retentionJSON = b
}
jobID := ulid.Make().String()
now := time.Now().UTC()
if err := s.deps.Store.CreateJob(ctx, store.Job{
ID: jobID,
HostID: hostID,
Kind: sched.Kind,
ScheduledID: &sched.ID,
ActorKind: "schedule",
ActorID: &sched.ID,
CreatedAt: now,
}); err != nil {
return "", errFmtf("create job: %s", err)
}
env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{
JobID: jobID,
Kind: api.JobKind(sched.Kind),
Args: args,
RetentionPolicy: retentionJSON,
})
if err != nil {
return "", errFmtf("marshal command.run: %s", err)
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if conn != nil {
if err := conn.Send(sendCtx, env); err != nil {
return "", errFmtf("send command.run: %s", err)
}
} else {
if err := s.deps.Hub.Send(sendCtx, hostID, env); err != nil {
return "", errFmtf("send command.run: %s", err)
} }
} }
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *ws.Conn, scheduleID string, scheduledAt time.Time) {
ID: ulid.Make().String(), slog.Info("schedule.fire: stubbed during P2 redesign",
Actor: "schedule", "host_id", hostID, "schedule_id", scheduleID, "scheduled_at", scheduledAt)
Action: "job.run_now",
TargetKind: ptr("job"),
TargetID: &jobID,
TS: now,
})
return jobID, nil
}
// Compile-time guard that the store actually implements the methods
// schedule_push.go calls. Useful when mocking the store in tests.
var _ scheduleStore = (*store.Store)(nil)
type scheduleStore interface {
ListSchedulesByHost(ctx context.Context, hostID string) ([]store.Schedule, error)
GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error)
SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error
} }
-304
View File
@@ -1,304 +0,0 @@
package http
import (
"bytes"
"context"
"encoding/json"
"io"
stdhttp "net/http"
"strings"
"testing"
"time"
"github.com/coder/websocket"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// makePushHost is like makeHTTPHost but mints a known agent token so
// the test can dial /ws/agent as the host. Returns (hostID, raw token).
func makePushHost(t *testing.T, st *store.Store) (string, string) {
t.Helper()
const id = "01HSCHEDPUSH00000000000000"
tok, _ := auth.NewToken()
if err := st.CreateHost(context.Background(), store.Host{
ID: id, Name: "ph", OS: "linux", Arch: "amd64",
AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1,
EnrolledAt: time.Now().UTC(),
}, auth.HashToken(tok), ""); err != nil {
t.Fatalf("create host: %v", err)
}
return id, tok
}
// readUntilType pumps messages from the WS until one of the wanted
// types arrives or ctx times out. Returns the matched envelope.
// Useful because the on-hello path may push several messages
// (config.update first if creds exist, schedule.set, …).
func readUntilType(ctx context.Context, t *testing.T, c *websocket.Conn, want api.MessageType) api.Envelope {
t.Helper()
for {
_, raw, err := c.Read(ctx)
if err != nil {
t.Fatalf("ws read waiting for %s: %v", want, err)
}
var env api.Envelope
if err := json.Unmarshal(raw, &env); err != nil {
t.Fatalf("envelope: %v (raw=%s)", err, raw)
}
t.Logf("recv: type=%s payload=%s", env.Type, env.Payload)
if env.Type == want {
return env
}
}
}
func TestSchedulePushOnHelloAndAckRoundtrip(t *testing.T) {
t.Parallel()
srv, url, st := newTestServerWithHub(t)
_ = srv
cookie := loginAndCookie(t, url)
hostID, agentToken := makePushHost(t, st)
// Pre-populate one schedule so we have something to push.
body, _ := json.Marshal(scheduleAPI{
Kind: "backup",
CronExpr: "@hourly",
Paths: []string{"/etc"},
Enabled: true,
})
req, _ := stdhttp.NewRequest("POST", url+"/api/hosts/"+hostID+"/schedules",
bytes.NewReader(body))
req.AddCookie(cookie)
req.Header.Set("Content-Type", "application/json")
res, err := stdhttp.DefaultClient.Do(req)
if err != nil {
t.Fatalf("create schedule: %v", err)
}
got, _ := io.ReadAll(res.Body)
res.Body.Close()
if res.StatusCode != stdhttp.StatusCreated {
t.Fatalf("create schedule: %d %s", res.StatusCode, got)
}
var created scheduleAPI
_ = json.Unmarshal(got, &created)
// Dial the WS as the agent and send hello.
wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{
HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}},
})
if err != nil {
t.Fatalf("dial: %v", err)
}
defer c.CloseNow()
helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{
ProtocolVersion: api.CurrentProtocolVersion,
AgentVersion: "test", ResticVersion: "test",
Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64,
})
raw, _ := json.Marshal(helloEnv)
if err := c.Write(ctx, websocket.MessageText, raw); err != nil {
t.Fatalf("write hello: %v", err)
}
// Server should push schedule.set (our host has no creds, so the
// config.update branch is silently skipped).
pushedEnv := readUntilType(ctx, t, c, api.MsgScheduleSet)
var pushed api.ScheduleSetPayload
if err := pushedEnv.UnmarshalPayload(&pushed); err != nil {
t.Fatalf("decode payload: %v", err)
}
if pushed.Version != 1 {
t.Fatalf("pushed version: got %d, want 1", pushed.Version)
}
if len(pushed.Schedules) != 1 || pushed.Schedules[0].ID != created.ID {
t.Fatalf("pushed schedules: %+v", pushed.Schedules)
}
if pushed.Schedules[0].CronExpr != "@hourly" || len(pushed.Schedules[0].Paths) != 1 {
t.Fatalf("schedule contents: %+v", pushed.Schedules[0])
}
// Ack the version. Server should record it on the host row.
ackEnv, _ := api.Marshal(api.MsgScheduleAck, "", api.ScheduleAckPayload{
Version: pushed.Version,
AppliedAt: time.Now().UTC(),
})
raw, _ = json.Marshal(ackEnv)
if err := c.Write(ctx, websocket.MessageText, raw); err != nil {
t.Fatalf("write ack: %v", err)
}
// Wait for applied_schedule_version to flip.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
h, err := st.GetHost(context.Background(), hostID)
if err == nil && h.AppliedScheduleVersion == pushed.Version {
return
}
time.Sleep(20 * time.Millisecond)
}
h, _ := st.GetHost(context.Background(), hostID)
t.Fatalf("applied_schedule_version did not advance: got %d, want %d",
h.AppliedScheduleVersion, pushed.Version)
}
func TestScheduleFireDispatchesCommandRun(t *testing.T) {
t.Parallel()
srv, url, st := newTestServerWithHub(t)
_ = srv
cookie := loginAndCookie(t, url)
hostID, agentToken := makePushHost(t, st)
// Pre-create one backup schedule.
body, _ := json.Marshal(scheduleAPI{
Kind: "backup", CronExpr: "@hourly",
Paths: []string{"/etc/hostname"}, Enabled: true,
})
req, _ := stdhttp.NewRequest("POST",
url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(body))
req.AddCookie(cookie)
req.Header.Set("Content-Type", "application/json")
res, err := stdhttp.DefaultClient.Do(req)
if err != nil {
t.Fatalf("create: %v", err)
}
got, _ := io.ReadAll(res.Body)
res.Body.Close()
var created scheduleAPI
_ = json.Unmarshal(got, &created)
// Connect as the agent.
wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{
HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}},
})
if err != nil {
t.Fatalf("dial: %v", err)
}
defer c.CloseNow()
helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{
ProtocolVersion: api.CurrentProtocolVersion,
AgentVersion: "test", ResticVersion: "test",
Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64,
})
raw, _ := json.Marshal(helloEnv)
_ = c.Write(ctx, websocket.MessageText, raw)
// Drain the on-hello schedule.set.
_ = readUntilType(ctx, t, c, api.MsgScheduleSet)
// Pretend our local cron just fired this schedule.
fireEnv, _ := api.Marshal(api.MsgScheduleFire, "", api.ScheduleFirePayload{
ScheduleID: created.ID,
ScheduledAt: time.Now().UTC(),
})
raw, _ = json.Marshal(fireEnv)
if err := c.Write(ctx, websocket.MessageText, raw); err != nil {
t.Fatalf("write fire: %v", err)
}
// Server should respond with command.run.
cmdEnv := readUntilType(ctx, t, c, api.MsgCommandRun)
var cmd api.CommandRunPayload
if err := cmdEnv.UnmarshalPayload(&cmd); err != nil {
t.Fatalf("decode command.run: %v", err)
}
if cmd.JobID == "" || cmd.Kind != api.JobBackup {
t.Fatalf("command.run: %+v", cmd)
}
if len(cmd.Args) != 1 || cmd.Args[0] != "/etc/hostname" {
t.Fatalf("command.run args: %+v", cmd.Args)
}
// Verify the job row landed with actor_kind=schedule.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
var actorKind, scheduledID string
row := st.DB().QueryRowContext(context.Background(),
`SELECT actor_kind, COALESCE(scheduled_id,'') FROM jobs WHERE id = ?`,
cmd.JobID)
if err := row.Scan(&actorKind, &scheduledID); err == nil {
if actorKind != "schedule" {
t.Fatalf("job actor_kind: %q", actorKind)
}
if scheduledID != created.ID {
t.Fatalf("job scheduled_id: %q want %q", scheduledID, created.ID)
}
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("job row %s never landed", cmd.JobID)
}
func TestSchedulePushOnCRUD(t *testing.T) {
t.Parallel()
srv, url, st := newTestServerWithHub(t)
_ = srv
cookie := loginAndCookie(t, url)
hostID, agentToken := makePushHost(t, st)
// Connect first so the CRUD push has somewhere to land.
wsURL := "ws" + strings.TrimPrefix(url, "http") + "/ws/agent"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{
HTTPHeader: stdhttp.Header{"Authorization": []string{"Bearer " + agentToken}},
})
if err != nil {
t.Fatalf("dial: %v", err)
}
defer c.CloseNow()
helloEnv, _ := api.Marshal(api.MsgHello, "", api.HelloPayload{
ProtocolVersion: api.CurrentProtocolVersion,
AgentVersion: "test", ResticVersion: "test",
Hostname: "ph", OS: api.OSLinux, Arch: api.ArchAmd64,
})
raw, _ := json.Marshal(helloEnv)
_ = c.Write(ctx, websocket.MessageText, raw)
// Drain the on-hello schedule.set (will be version 0, empty list).
first := readUntilType(ctx, t, c, api.MsgScheduleSet)
var initial api.ScheduleSetPayload
_ = first.UnmarshalPayload(&initial)
if initial.Version != 0 || len(initial.Schedules) != 0 {
t.Fatalf("initial push: %+v", initial)
}
// Now create a schedule via REST. The handler should fire a
// schedule.set push asynchronously.
body, _ := json.Marshal(scheduleAPI{
Kind: "backup", CronExpr: "*/30 * * * *",
Paths: []string{"/var/lib"}, Enabled: true,
})
req, _ := stdhttp.NewRequest("POST",
url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(body))
req.AddCookie(cookie)
req.Header.Set("Content-Type", "application/json")
res, err := stdhttp.DefaultClient.Do(req)
if err != nil {
t.Fatalf("create: %v", err)
}
res.Body.Close()
if res.StatusCode != stdhttp.StatusCreated {
t.Fatalf("create: %d", res.StatusCode)
}
// Wait for the pushed schedule.set with version 1.
pushed := readUntilType(ctx, t, c, api.MsgScheduleSet)
var pl api.ScheduleSetPayload
_ = pushed.UnmarshalPayload(&pl)
if pl.Version != 1 || len(pl.Schedules) != 1 {
t.Fatalf("push after create: %+v", pl)
}
}
+19 -292
View File
@@ -1,310 +1,37 @@
package http package http
import ( import (
"encoding/json"
"errors"
stdhttp "net/http" stdhttp "net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/oklog/ulid/v2"
"github.com/robfig/cron/v3"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
) )
// scheduleAPI is the JSON shape for /api/hosts/{id}/schedules. We // schedules.go — REST API for /api/hosts/{id}/schedules.
// avoid leaking host_id in the body since it's already in the URL, //
// and we render booleans + retention as typed JSON rather than // Stubbed during the P2 redesign data-model rewrite (commit chain).
// strings so the UI can edit fields directly. // Phase 2 dropped the fat Schedule shape (paths/excludes/tags/
type scheduleAPI struct { // retention/manual/kind/options/hooks) — the slim Schedule + source
ID string `json:"id,omitempty"` // groups model lives in store/. Phase 3 of the redesign will fill in
Kind api.JobKind `json:"kind"` // these handlers against the new shape.
CronExpr string `json:"cron_expr"` //
Paths []string `json:"paths"` // Returning 501 here keeps the routes addressable; UI calls will
Excludes []string `json:"excludes"` // surface the unimplemented state via the toast component until the
Tags []string `json:"tags"` // new handlers land.
RetentionPolicy store.RetentionPolicy `json:"retention_policy"`
Options store.ScheduleOptions `json:"options"`
PreHook string `json:"pre_hook,omitempty"`
PostHook string `json:"post_hook,omitempty"`
Enabled bool `json:"enabled"`
// Manual = no cron, fires only when the operator triggers a
// run-now. Cron expr is ignored when this is true.
Manual bool `json:"manual"`
CreatedAt string `json:"created_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
}
// listSchedulesResp wraps the array so the response is forward-
// compatible (we may want to add a top-level `version` later).
type listSchedulesResp struct {
Version int64 `json:"version"`
Schedules []scheduleAPI `json:"schedules"`
}
// cron parser used for input validation. Standard 5-field syntax
// with descriptors (@hourly etc.) — same parser the agent will
// run against, so a schedule that validates here will fire there.
var cronParser = cron.NewParser(
cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
func (s *Server) handleListSchedules(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleListSchedules(w stdhttp.ResponseWriter, r *stdhttp.Request) {
if _, ok := s.requireUser(r); !ok { writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress",
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") "schedule REST API is being rebuilt — see P2 redesign Phase 3")
return
}
hostID := chi.URLParam(r, "id")
if hostID == "" {
writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "")
return
}
if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil {
if errors.Is(err, store.ErrNotFound) {
writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "")
return
}
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
return
}
rows, err := s.deps.Store.ListSchedulesByHost(r.Context(), hostID)
if err != nil {
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
return
}
version, err := s.deps.Store.GetHostScheduleVersion(r.Context(), hostID)
if err != nil {
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
return
}
out := listSchedulesResp{Version: version, Schedules: make([]scheduleAPI, len(rows))}
for i, row := range rows {
out.Schedules[i] = toScheduleAPI(row)
}
writeJSON(w, stdhttp.StatusOK, out)
} }
func (s *Server) handleCreateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleCreateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) {
user, ok := s.requireUser(r) writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress",
if !ok { "schedule REST API is being rebuilt — see P2 redesign Phase 3")
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "")
return
}
hostID := chi.URLParam(r, "id")
if hostID == "" {
writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "")
return
}
if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil {
if errors.Is(err, store.ErrNotFound) {
writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "")
return
}
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
return
}
var req scheduleAPI
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error())
return
}
if code, msg := validateSchedule(&req); code != "" {
writeJSONError(w, stdhttp.StatusBadRequest, code, msg)
return
}
row := store.Schedule{
ID: ulid.Make().String(),
HostID: hostID,
Kind: string(req.Kind),
CronExpr: req.CronExpr,
Paths: req.Paths,
Excludes: req.Excludes,
Tags: req.Tags,
RetentionPolicy: req.RetentionPolicy,
Options: req.Options,
PreHook: req.PreHook,
PostHook: req.PostHook,
Enabled: req.Enabled,
}
if err := s.deps.Store.CreateSchedule(r.Context(), &row); err != nil {
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
return
}
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
ID: ulid.Make().String(),
UserID: &user.ID,
Actor: "user",
Action: "schedule.created",
TargetKind: ptr("schedule"),
TargetID: &row.ID,
TS: nowUTC(),
})
s.pushScheduleSetAsync(hostID)
writeJSON(w, stdhttp.StatusCreated, toScheduleAPI(row))
} }
func (s *Server) handleUpdateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUpdateSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) {
user, ok := s.requireUser(r) writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress",
if !ok { "schedule REST API is being rebuilt — see P2 redesign Phase 3")
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "")
return
}
hostID := chi.URLParam(r, "id")
scheduleID := chi.URLParam(r, "sid")
if hostID == "" || scheduleID == "" {
writeJSONError(w, stdhttp.StatusBadRequest, "missing_id", "")
return
}
existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "")
return
}
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
return
}
var req scheduleAPI
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error())
return
}
// Kind is immutable; ignore whatever was sent and fix up before
// validation so validateSchedule sees the existing kind.
req.Kind = api.JobKind(existing.Kind)
if code, msg := validateSchedule(&req); code != "" {
writeJSONError(w, stdhttp.StatusBadRequest, code, msg)
return
}
existing.CronExpr = req.CronExpr
existing.Paths = req.Paths
existing.Excludes = req.Excludes
existing.Tags = req.Tags
existing.RetentionPolicy = req.RetentionPolicy
existing.Options = req.Options
existing.PreHook = req.PreHook
existing.PostHook = req.PostHook
existing.Enabled = req.Enabled
if err := s.deps.Store.UpdateSchedule(r.Context(), existing); err != nil {
if errors.Is(err, store.ErrNotFound) {
writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "")
return
}
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
return
}
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
ID: ulid.Make().String(),
UserID: &user.ID,
Actor: "user",
Action: "schedule.updated",
TargetKind: ptr("schedule"),
TargetID: &existing.ID,
TS: nowUTC(),
})
s.pushScheduleSetAsync(hostID)
writeJSON(w, stdhttp.StatusOK, toScheduleAPI(*existing))
} }
func (s *Server) handleDeleteSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleDeleteSchedule(w stdhttp.ResponseWriter, r *stdhttp.Request) {
user, ok := s.requireUser(r) writeJSONError(w, stdhttp.StatusNotImplemented, "redesign_in_progress",
if !ok { "schedule REST API is being rebuilt — see P2 redesign Phase 3")
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "")
return
}
hostID := chi.URLParam(r, "id")
scheduleID := chi.URLParam(r, "sid")
if hostID == "" || scheduleID == "" {
writeJSONError(w, stdhttp.StatusBadRequest, "missing_id", "")
return
}
if err := s.deps.Store.DeleteSchedule(r.Context(), hostID, scheduleID); err != nil {
if errors.Is(err, store.ErrNotFound) {
writeJSONError(w, stdhttp.StatusNotFound, "schedule_not_found", "")
return
}
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
return
}
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
ID: ulid.Make().String(),
UserID: &user.ID,
Actor: "user",
Action: "schedule.deleted",
TargetKind: ptr("schedule"),
TargetID: &scheduleID,
TS: nowUTC(),
})
s.pushScheduleSetAsync(hostID)
w.WriteHeader(stdhttp.StatusNoContent)
}
// validateSchedule rejects malformed inputs with a stable error code.
// Returns ("", "") on success.
func validateSchedule(s *scheduleAPI) (code, msg string) {
switch s.Kind {
case api.JobBackup, api.JobForget, api.JobPrune, api.JobCheck:
// ok — valid schedule kinds (init/unlock are operator-driven only).
default:
return "invalid_kind", "kind must be one of backup|forget|prune|check"
}
if !s.Manual {
if strings.TrimSpace(s.CronExpr) == "" {
return "missing_cron_expr", "cron_expr is required (or set manual=true)"
}
if _, err := cronParser.Parse(s.CronExpr); err != nil {
return "invalid_cron_expr", err.Error()
}
}
if s.Kind == api.JobBackup && len(s.Paths) == 0 {
return "missing_paths", "backup schedules require at least one path"
}
// forget needs at least one keep-* dimension; otherwise restic
// would happily delete every snapshot.
if s.Kind == api.JobForget && (s.RetentionPolicy == store.RetentionPolicy{}) {
return "missing_retention", "forget schedules require at least one Keep-* value"
}
// Hooks are only meaningful on backup schedules (spec §14.3).
if s.Kind != api.JobBackup && (s.PreHook != "" || s.PostHook != "") {
return "hooks_not_allowed", "pre_hook / post_hook only apply to backup schedules"
}
return "", ""
}
func toScheduleAPI(s store.Schedule) scheduleAPI {
out := scheduleAPI{
ID: s.ID,
Kind: api.JobKind(s.Kind),
CronExpr: s.CronExpr,
Paths: s.Paths,
Excludes: s.Excludes,
Tags: s.Tags,
RetentionPolicy: s.RetentionPolicy,
Options: s.Options,
PreHook: s.PreHook,
PostHook: s.PostHook,
Enabled: s.Enabled,
Manual: s.Manual,
CreatedAt: s.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
UpdatedAt: s.UpdatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
}
if out.Paths == nil {
out.Paths = []string{}
}
if out.Excludes == nil {
out.Excludes = []string{}
}
if out.Tags == nil {
out.Tags = []string{}
}
return out
} }
-190
View File
@@ -1,190 +0,0 @@
package http
import (
"bytes"
"context"
"encoding/json"
"io"
stdhttp "net/http"
"testing"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// loginAndCookie bootstraps an admin, logs in, and returns the
// session cookie ready to attach to subsequent requests.
func loginAndCookie(t *testing.T, url string) *stdhttp.Cookie {
t.Helper()
bs, _ := json.Marshal(bootstrapRequest{
Token: "test-token", Username: "alice", Password: "averylongpassword",
})
res, err := stdhttp.Post(url+"/api/bootstrap", "application/json", bytes.NewReader(bs))
if err != nil {
t.Fatalf("bootstrap: %v", err)
}
res.Body.Close()
body, _ := json.Marshal(loginRequest{Username: "alice", Password: "averylongpassword"})
res, err = stdhttp.Post(url+"/api/auth/login", "application/json", bytes.NewReader(body))
if err != nil {
t.Fatalf("login: %v", err)
}
defer res.Body.Close()
if res.StatusCode != stdhttp.StatusOK {
got, _ := io.ReadAll(res.Body)
t.Fatalf("login: %d %s", res.StatusCode, got)
}
for _, c := range res.Cookies() {
if c.Name == sessionCookieName {
return c
}
}
t.Fatal("no session cookie")
return nil
}
// makeHTTPHost inserts a host directly via the store so we can hit
// the schedule endpoints without dragging in the enrollment flow.
func makeHTTPHost(t *testing.T, st *store.Store) string {
t.Helper()
const id = "01HSCHEDHTTP000000000000Z"
if err := st.CreateHost(context.Background(), store.Host{
ID: id, Name: "h", OS: "linux", Arch: "amd64",
AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1,
EnrolledAt: time.Now().UTC(),
}, "tokenhash", ""); err != nil {
t.Fatalf("create host: %v", err)
}
return id
}
func TestSchedulesAPIHappyPath(t *testing.T) {
t.Parallel()
_, url, st := newTestServerWithHub(t)
cookie := loginAndCookie(t, url)
hostID := makeHTTPHost(t, st)
doReq := func(method, path string, body any, want int) []byte {
t.Helper()
var b []byte
if body != nil {
b, _ = json.Marshal(body)
}
req, _ := stdhttp.NewRequest(method, url+path, bytes.NewReader(b))
req.AddCookie(cookie)
req.Header.Set("Content-Type", "application/json")
res, err := stdhttp.DefaultClient.Do(req)
if err != nil {
t.Fatalf("%s %s: %v", method, path, err)
}
defer res.Body.Close()
got, _ := io.ReadAll(res.Body)
if res.StatusCode != want {
t.Fatalf("%s %s: status %d (want %d) body=%s", method, path, res.StatusCode, want, got)
}
return got
}
// Empty list returns version 0.
body := doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK)
var listed listSchedulesResp
if err := json.Unmarshal(body, &listed); err != nil {
t.Fatal(err)
}
if listed.Version != 0 || len(listed.Schedules) != 0 {
t.Fatalf("initial list: %+v", listed)
}
// Create.
keepLast := 3
create := scheduleAPI{
Kind: "backup", CronExpr: "0 */6 * * *",
Paths: []string{"/etc"},
Tags: []string{"nightly"},
RetentionPolicy: store.RetentionPolicy{KeepLast: &keepLast},
Enabled: true,
}
body = doReq("POST", "/api/hosts/"+hostID+"/schedules", create, stdhttp.StatusCreated)
var created scheduleAPI
if err := json.Unmarshal(body, &created); err != nil {
t.Fatal(err)
}
if created.ID == "" || created.CronExpr != create.CronExpr {
t.Fatalf("create returned: %+v", created)
}
// Version bumped.
body = doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK)
_ = json.Unmarshal(body, &listed)
if listed.Version != 1 {
t.Fatalf("version after create: %d", listed.Version)
}
// Update changes the cron expr; kind silently preserved even if request tries otherwise.
created.CronExpr = "*/15 * * * *"
created.Kind = "prune" // should be ignored
body = doReq("PUT", "/api/hosts/"+hostID+"/schedules/"+created.ID, created, stdhttp.StatusOK)
var updated scheduleAPI
_ = json.Unmarshal(body, &updated)
if updated.Kind != "backup" || updated.CronExpr != "*/15 * * * *" {
t.Fatalf("update: %+v", updated)
}
// Delete.
doReq("DELETE", "/api/hosts/"+hostID+"/schedules/"+created.ID, nil, stdhttp.StatusNoContent)
body = doReq("GET", "/api/hosts/"+hostID+"/schedules", nil, stdhttp.StatusOK)
_ = json.Unmarshal(body, &listed)
if listed.Version != 3 || len(listed.Schedules) != 0 {
t.Fatalf("after delete: %+v", listed)
}
}
func TestSchedulesAPIValidation(t *testing.T) {
t.Parallel()
_, url, st := newTestServerWithHub(t)
cookie := loginAndCookie(t, url)
hostID := makeHTTPHost(t, st)
post := func(s scheduleAPI) (int, []byte) {
b, _ := json.Marshal(s)
req, _ := stdhttp.NewRequest("POST",
url+"/api/hosts/"+hostID+"/schedules", bytes.NewReader(b))
req.AddCookie(cookie)
req.Header.Set("Content-Type", "application/json")
res, err := stdhttp.DefaultClient.Do(req)
if err != nil {
t.Fatalf("post: %v", err)
}
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
return res.StatusCode, body
}
cases := []struct {
name string
in scheduleAPI
want string // expected error code
}{
{"bad kind", scheduleAPI{Kind: "init", CronExpr: "@hourly", Paths: []string{"/etc"}}, "invalid_kind"},
{"missing cron", scheduleAPI{Kind: "backup", Paths: []string{"/etc"}}, "missing_cron_expr"},
{"bad cron", scheduleAPI{Kind: "backup", CronExpr: "not a cron", Paths: []string{"/etc"}}, "invalid_cron_expr"},
{"backup without paths", scheduleAPI{Kind: "backup", CronExpr: "@hourly"}, "missing_paths"},
{"hooks on non-backup", scheduleAPI{Kind: "prune", CronExpr: "@daily", PreHook: "echo hi"}, "hooks_not_allowed"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
status, body := post(c.in)
if status != stdhttp.StatusBadRequest {
t.Fatalf("status %d body=%s", status, body)
}
var env struct {
Code string `json:"code"`
}
_ = json.Unmarshal(body, &env)
if env.Code != c.want {
t.Fatalf("error code: got %q want %q (body=%s)", env.Code, c.want, body)
}
})
}
}
+12 -136
View File
@@ -1,7 +1,6 @@
package http package http
import ( import (
"context"
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
@@ -143,146 +142,23 @@ func (s *Server) handleUIDashboard(w stdhttp.ResponseWriter, r *stdhttp.Request)
} }
} }
// handleUIRunBackup is the form-submit twin of POST /api/hosts/{id}/jobs // handleUIRunBackup and handleUIInitRepo are stubbed during the P2
// that the dashboard / host-detail "Run now" buttons call via // redesign data-model rewrite. The dashboard per-host Run-now button
// hx-post. On success it sets HX-Redirect → /jobs/{job_id} so the // is going away (operator clicks into host detail then a per-source-
// operator lands on the live log viewer for the job they just // group Run-now), and Init-repo becomes implicit at host enrolment
// kicked off. // (auto-init dispatched server-side). Phase 4 of the redesign wires
// the new per-source-group Run-now via /hosts/{id}/source-groups/{gid}/run.
func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
if hostID == "" {
stdhttp.Error(w, "missing host id", stdhttp.StatusBadRequest)
return
}
storeUser, _, err := s.userByID(r, u.ID)
if err != nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
host, err := s.deps.Store.GetHost(r.Context(), hostID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
if host.RepoInitialisedAt == nil {
stdhttp.Error(w, stdhttp.Error(w,
"this host's repo hasn't been initialised yet — click Initialise repo first", "per-host Run-now is being replaced by per-source-group Run-now — see P2 redesign Phase 4",
stdhttp.StatusBadRequest) stdhttp.StatusNotImplemented)
return
}
pick, err := s.pickRunNowSchedule(r.Context(), hostID)
if err != nil {
stdhttp.Error(w, err.Error(), stdhttp.StatusBadRequest)
return
}
res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobBackup, pick.Paths)
if code != "" {
stdhttp.Error(w, msg, status)
return
}
// HTMX (with hx-post + hx-swap=none) doesn't honour HX-Redirect
// when the response itself is a 3xx — fetch follows the redirect
// first and the header is lost. Branch on the HX-Request marker
// so HTMX gets a 200 + HX-Redirect (client-side window.location
// hop), while plain form-post / curl callers get the 303.
target := "/jobs/" + res.JobID
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("HX-Redirect", target)
w.WriteHeader(stdhttp.StatusOK)
return
}
stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther)
} }
// pickRunNowSchedule chooses which schedule a generic per-host
// "Run now" button should dispatch when the operator hasn't picked
// one explicitly. Picks in priority order: the host's only enabled
// manual schedule, then its only enabled schedule of any kind.
// Returns a friendly error if there's nothing to run, or if the
// operator needs to disambiguate.
func (s *Server) pickRunNowSchedule(ctx context.Context, hostID string) (*store.Schedule, error) {
rows, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
if err != nil {
return nil, errFmt("internal: %s", err)
}
enabled := make([]store.Schedule, 0, len(rows))
for _, r := range rows {
if r.Enabled {
enabled = append(enabled, r)
}
}
if len(enabled) == 0 {
return nil, errFmt("this host has no enabled schedules — add one in the Schedules tab")
}
manuals := []store.Schedule{}
for _, r := range enabled {
if r.Manual {
manuals = append(manuals, r)
}
}
switch {
case len(manuals) == 1:
s := manuals[0]
return &s, nil
case len(enabled) == 1:
s := enabled[0]
return &s, nil
default:
return nil, errFmt("this host has %d schedules — pick one from the Schedules tab", len(enabled))
}
}
func errFmt(format string, args ...any) error {
return errFmtf(format, args...)
}
// handleUIInitRepo dispatches a one-shot `restic init` job for a
// host. Surfaced in the run-now panel as a red "Initialise repo"
// button when host.repo_initialised_at IS NULL. On success it
// redirects to the live log page just like Run-now.
func (s *Server) handleUIInitRepo(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUIInitRepo(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r) stdhttp.Error(w,
if u == nil { "manual Init-repo is being replaced by auto-init at host enrolment — see P2 redesign Phase 6",
return stdhttp.StatusNotImplemented)
}
hostID := chi.URLParam(r, "id")
if hostID == "" {
stdhttp.Error(w, "missing host id", stdhttp.StatusBadRequest)
return
}
storeUser, _, err := s.userByID(r, u.ID)
if err != nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
if _, err := s.deps.Store.GetHost(r.Context(), hostID); err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobInit, nil)
if code != "" {
stdhttp.Error(w, msg, status)
return
}
target := "/jobs/" + res.JobID
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("HX-Redirect", target)
w.WriteHeader(stdhttp.StatusOK)
return
}
stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther)
} }
// addHostPage carries the Add-host form state. The result-state // addHostPage carries the Add-host form state. The result-state
+15 -511
View File
@@ -1,534 +1,38 @@
package http package http
import ( import (
"errors"
"fmt"
"log/slog"
stdhttp "net/http" stdhttp "net/http"
"strconv"
"strings"
"github.com/go-chi/chi/v5"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
) )
// schedulesListPage carries everything the Schedules tab needs. // ui_schedules.go — HTML form-driven schedule CRUD.
type schedulesListPage struct { //
Host store.Host // Stubbed during the P2 redesign template rewrite. Phase 4 of the
Schedules []store.Schedule // redesign rebuilds the schedule editor against the new slim shape
Version int64 // (cron + source-group multi-select + enabled), the source-group
AppliedVersion int64 // list/edit pages, and the repo-maintenance tab. Until then these
} // routes return 501; the dashboard's host-row "View →" link is the
// only operator entry point that still works.
// scheduleEditPage drives both the Create form (Schedule.ID empty)
// and the Edit form (Schedule populated). Errors come back via Error
// to be rendered as a banner; the rest of the fields hold the just-
// submitted raw values so a failed POST can re-render with the
// operator's typed input still in place.
type scheduleEditPage struct {
Host store.Host
IsNew bool
ScheduleID string
Error string
// Kind is settable on create, immutable on edit. The form's
// kind picker is hidden when !IsNew.
Kind string
// Form values — strings so partial input survives validation
// errors (e.g. operator typed "abc" into keep_last).
CronExpr string
PathsRaw string
ExcludesRaw string
TagsRaw string
KeepLast string
KeepHourly string
KeepDaily string
KeepWeekly string
KeepMonthly string
KeepYearly string
LimitUpKBps string
LimitDownKBps string
Enabled bool
Manual bool
}
// handleUISchedulesList renders the Schedules sub-tab on a host.
func (s *Server) handleUISchedulesList(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUISchedulesList(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r) stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
host, err := s.deps.Store.GetHost(r.Context(), hostID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
rows, err := s.deps.Store.ListSchedulesByHost(r.Context(), hostID)
if err != nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
version, _ := s.deps.Store.GetHostScheduleVersion(r.Context(), hostID)
view := s.baseView(u, "dashboard")
view.Title = host.Name + " · schedules · restic-manager"
view.Page = schedulesListPage{
Host: *host,
Schedules: rows,
Version: version,
AppliedVersion: host.AppliedScheduleVersion,
}
if err := s.deps.UI.Render(w, "schedules_list", view); err != nil {
slog.Error("ui: render schedules_list", "err", err)
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
}
} }
// handleUIScheduleNewGet renders the empty Create form.
func (s *Server) handleUIScheduleNewGet(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUIScheduleNewGet(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r) stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
host, err := s.deps.Store.GetHost(r.Context(), hostID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
view := s.baseView(u, "dashboard")
view.Title = "New schedule · " + host.Name
view.Page = scheduleEditPage{
Host: *host,
IsNew: true,
Kind: string(api.JobBackup),
CronExpr: "0 3 * * *",
Enabled: true,
}
s.renderScheduleEdit(w, view)
} }
// handleUIScheduleEditGet renders the Edit form pre-filled from the
// existing schedule row.
func (s *Server) handleUIScheduleEditGet(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUIScheduleEditGet(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r) stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
scheduleID := chi.URLParam(r, "sid")
host, err := s.deps.Store.GetHost(r.Context(), hostID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
sched, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
page := scheduleEditPage{
Host: *host,
IsNew: false,
ScheduleID: sched.ID,
Kind: sched.Kind,
CronExpr: sched.CronExpr,
PathsRaw: strings.Join(sched.Paths, "\n"),
ExcludesRaw: strings.Join(sched.Excludes, "\n"),
TagsRaw: strings.Join(sched.Tags, ", "),
Enabled: sched.Enabled,
Manual: sched.Manual,
}
page.KeepLast = intStringPtr(sched.RetentionPolicy.KeepLast)
page.KeepHourly = intStringPtr(sched.RetentionPolicy.KeepHourly)
page.KeepDaily = intStringPtr(sched.RetentionPolicy.KeepDaily)
page.KeepWeekly = intStringPtr(sched.RetentionPolicy.KeepWeekly)
page.KeepMonthly = intStringPtr(sched.RetentionPolicy.KeepMonthly)
page.KeepYearly = intStringPtr(sched.RetentionPolicy.KeepYearly)
page.LimitUpKBps = intStringPtr(sched.Options.LimitUploadKBps)
page.LimitDownKBps = intStringPtr(sched.Options.LimitDownloadKBps)
view := s.baseView(u, "dashboard")
view.Title = "Edit schedule · " + host.Name
view.Page = page
s.renderScheduleEdit(w, view)
} }
// handleUIScheduleSave handles POST for both create and update. The
// edit form posts to /hosts/{id}/schedules/new (for create) or
// /hosts/{id}/schedules/{sid}/edit (for update); we branch on whether
// {sid} is present in the route params.
func (s *Server) handleUIScheduleSave(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUIScheduleSave(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r) stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
scheduleID := chi.URLParam(r, "sid")
storeUser, _, err := s.userByID(r, u.ID)
if err != nil || storeUser == nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
host, err := s.deps.Store.GetHost(r.Context(), hostID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
if err := r.ParseForm(); err != nil {
stdhttp.Error(w, "bad request", stdhttp.StatusBadRequest)
return
}
page := scheduleEditPage{
Host: *host,
IsNew: scheduleID == "",
ScheduleID: scheduleID,
Kind: strings.TrimSpace(r.PostForm.Get("kind")),
CronExpr: strings.TrimSpace(r.PostForm.Get("cron_expr")),
PathsRaw: r.PostForm.Get("paths"),
ExcludesRaw: r.PostForm.Get("excludes"),
TagsRaw: strings.TrimSpace(r.PostForm.Get("tags")),
KeepLast: strings.TrimSpace(r.PostForm.Get("keep_last")),
KeepHourly: strings.TrimSpace(r.PostForm.Get("keep_hourly")),
KeepDaily: strings.TrimSpace(r.PostForm.Get("keep_daily")),
KeepWeekly: strings.TrimSpace(r.PostForm.Get("keep_weekly")),
KeepMonthly: strings.TrimSpace(r.PostForm.Get("keep_monthly")),
KeepYearly: strings.TrimSpace(r.PostForm.Get("keep_yearly")),
LimitUpKBps: strings.TrimSpace(r.PostForm.Get("limit_up_kbps")),
LimitDownKBps: strings.TrimSpace(r.PostForm.Get("limit_down_kbps")),
Enabled: r.PostForm.Get("enabled") == "on",
Manual: r.PostForm.Get("manual") == "on",
}
// Kind is immutable on edit — use the existing schedule's kind
// regardless of what the form submitted.
if !page.IsNew {
if existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID); err == nil {
page.Kind = existing.Kind
}
}
if page.Kind == "" {
page.Kind = string(api.JobBackup)
} }
// Convert the raw form values into store-shape data, surfacing
// the first parse error as a banner.
paths := splitPaths(page.PathsRaw)
excludes := splitPaths(page.ExcludesRaw)
tags := splitCSV(page.TagsRaw)
retention, err := parseRetention(page)
if err != nil {
page.Error = err.Error()
s.renderEditPage(w, u, page)
return
}
options, err := parseOptions(page)
if err != nil {
page.Error = err.Error()
s.renderEditPage(w, u, page)
return
}
// Validate against the same rules the JSON API uses. Manual
// schedules skip the cron-expr requirement; forget schedules
// require a non-empty retention policy. Other validation
// (kind in allowed set, paths required for backup, hooks
// rejected on non-backup) lives in validateSchedule.
apiShape := scheduleAPI{
Kind: api.JobKind(page.Kind),
CronExpr: page.CronExpr,
Paths: paths,
Manual: page.Manual,
RetentionPolicy: retention,
}
if code, msg := validateSchedule(&apiShape); code != "" {
page.Error = uiErrorMessage(code, msg)
s.renderEditPage(w, u, page)
return
}
if page.IsNew {
row := store.Schedule{
ID: ulid.Make().String(),
HostID: hostID,
Kind: page.Kind,
CronExpr: page.CronExpr,
Paths: paths,
Excludes: excludes,
Tags: tags,
RetentionPolicy: retention,
Options: options,
Enabled: page.Enabled,
Manual: page.Manual,
}
if err := s.deps.Store.CreateSchedule(r.Context(), &row); err != nil {
page.Error = "Couldn't save schedule — see server log."
slog.Error("ui schedule create", "err", err)
s.renderEditPage(w, u, page)
return
}
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
ID: ulid.Make().String(),
UserID: &storeUser.ID,
Actor: "user",
Action: "schedule.created",
TargetKind: ptr("schedule"),
TargetID: &row.ID,
TS: nowUTC(),
})
s.pushScheduleSetAsync(hostID)
} else {
existing, err := s.deps.Store.GetSchedule(r.Context(), hostID, scheduleID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
existing.CronExpr = page.CronExpr
existing.Paths = paths
existing.Excludes = excludes
existing.Tags = tags
existing.RetentionPolicy = retention
existing.Options = options
existing.Enabled = page.Enabled
existing.Manual = page.Manual
if err := s.deps.Store.UpdateSchedule(r.Context(), existing); err != nil {
page.Error = "Couldn't save schedule — see server log."
slog.Error("ui schedule update", "err", err)
s.renderEditPage(w, u, page)
return
}
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
ID: ulid.Make().String(),
UserID: &storeUser.ID,
Actor: "user",
Action: "schedule.updated",
TargetKind: ptr("schedule"),
TargetID: &scheduleID,
TS: nowUTC(),
})
s.pushScheduleSetAsync(hostID)
}
stdhttp.Redirect(w, r, "/hosts/"+hostID+"/schedules", stdhttp.StatusSeeOther)
}
// handleUIScheduleRun is the POST target of per-schedule Run-now
// buttons. Reuses dispatchScheduledJob (the same code path used by
// the agent's local cron firing) so manual + automated runs flow
// through identical job lifecycle. Sets HX-Redirect to the live
// log on success.
func (s *Server) handleUIScheduleRun(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
scheduleID := chi.URLParam(r, "sid")
host, err := s.deps.Store.GetHost(r.Context(), hostID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
if !s.deps.Hub.Connected(hostID) {
stdhttp.Error(w, "agent is offline", stdhttp.StatusBadRequest)
return
}
_ = host
jobID, err := s.dispatchScheduleNow(r.Context(), hostID, scheduleID, nil)
if err != nil {
stdhttp.Error(w, err.Error(), stdhttp.StatusBadRequest)
return
}
target := "/jobs/" + jobID
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("HX-Redirect", target)
w.WriteHeader(stdhttp.StatusOK)
return
}
stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther)
}
// handleUIScheduleDelete is the POST target of the Delete buttons on
// the list view. Confirm-then-redirect; no AJAX.
func (s *Server) handleUIScheduleDelete(w stdhttp.ResponseWriter, r *stdhttp.Request) { func (s *Server) handleUIScheduleDelete(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r) stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented)
if u == nil {
return
}
hostID := chi.URLParam(r, "id")
scheduleID := chi.URLParam(r, "sid")
storeUser, _, err := s.userByID(r, u.ID)
if err != nil || storeUser == nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
if err := s.deps.Store.DeleteSchedule(r.Context(), hostID, scheduleID); err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
ID: ulid.Make().String(),
UserID: &storeUser.ID,
Actor: "user",
Action: "schedule.deleted",
TargetKind: ptr("schedule"),
TargetID: &scheduleID,
TS: nowUTC(),
})
s.pushScheduleSetAsync(hostID)
stdhttp.Redirect(w, r, "/hosts/"+hostID+"/schedules", stdhttp.StatusSeeOther)
} }
func (s *Server) renderScheduleEdit(w stdhttp.ResponseWriter, view ui.ViewData) { func (s *Server) handleUIScheduleRun(w stdhttp.ResponseWriter, r *stdhttp.Request) {
if err := s.deps.UI.Render(w, "schedule_edit", view); err != nil { stdhttp.Error(w, "schedules UI is being rebuilt — see P2 redesign Phase 4", stdhttp.StatusNotImplemented)
slog.Error("ui: render schedule_edit", "err", err)
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
}
}
func (s *Server) renderEditPage(w stdhttp.ResponseWriter, u *ui.User, page scheduleEditPage) {
view := s.baseView(u, "dashboard")
if page.IsNew {
view.Title = "New schedule · " + page.Host.Name
} else {
view.Title = "Edit schedule · " + page.Host.Name
}
view.Page = page
w.WriteHeader(stdhttp.StatusUnprocessableEntity)
s.renderScheduleEdit(w, view)
}
// ----- helpers --------------------------------------------------------
// splitCSV parses comma-separated values into a clean []string —
// leading/trailing whitespace trimmed, blanks dropped.
func splitCSV(s string) []string {
out := []string{}
for _, p := range strings.Split(s, ",") {
if t := strings.TrimSpace(p); t != "" {
out = append(out, t)
}
}
return out
}
func parseRetention(p scheduleEditPage) (store.RetentionPolicy, error) {
var r store.RetentionPolicy
for _, f := range []struct {
raw string
dest **int
name string
}{
{p.KeepLast, &r.KeepLast, "keep last"},
{p.KeepHourly, &r.KeepHourly, "keep hourly"},
{p.KeepDaily, &r.KeepDaily, "keep daily"},
{p.KeepWeekly, &r.KeepWeekly, "keep weekly"},
{p.KeepMonthly, &r.KeepMonthly, "keep monthly"},
{p.KeepYearly, &r.KeepYearly, "keep yearly"},
} {
v, err := parsePosInt(f.raw)
if err != nil {
return r, errFmtf("%s: %s", f.name, err)
}
*f.dest = v
}
return r, nil
}
func parseOptions(p scheduleEditPage) (store.ScheduleOptions, error) {
var o store.ScheduleOptions
up, err := parsePosInt(p.LimitUpKBps)
if err != nil {
return o, errFmtf("limit upload: %s", err)
}
o.LimitUploadKBps = up
down, err := parsePosInt(p.LimitDownKBps)
if err != nil {
return o, errFmtf("limit download: %s", err)
}
o.LimitDownloadKBps = down
return o, nil
}
// parsePosInt turns a possibly-empty string into *int. Empty → nil
// (no value). Non-empty must parse as a positive int.
func parsePosInt(raw string) (*int, error) {
if raw == "" {
return nil, nil
}
v, err := strconv.Atoi(raw)
if err != nil {
return nil, errFmtf("must be a whole number")
}
if v < 0 {
return nil, errFmtf("must be non-negative")
}
return &v, nil
}
func intStringPtr(p *int) string {
if p == nil {
return ""
}
return strconv.Itoa(*p)
}
// uiErrorMessage maps the JSON-API validation codes to operator-
// friendly banner text.
func uiErrorMessage(code, msg string) string {
switch code {
case "missing_cron_expr":
return "Cron expression is required."
case "invalid_cron_expr":
return "Cron expression doesn't parse: " + msg
case "missing_paths":
return "At least one backup path is required (one per line)."
case "missing_retention":
return "Forget schedules need at least one Keep-* value, otherwise restic would delete every snapshot."
case "invalid_kind":
return "Unsupported schedule kind."
default:
return msg
}
}
// errFmtf wraps fmt.Errorf so the validators read consistently.
func errFmtf(format string, args ...any) error {
return fmt.Errorf(format, args...)
} }
+3 -19
View File
@@ -204,16 +204,9 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil { string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil {
slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err) slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err)
} }
// A successful backup or init proves the repo exists; flip // repo_initialised_at projection has been removed — auto-init
// repo_initialised_at on the host (idempotent — set-if-null). // at host enrolment makes "is the repo init'd" derivable from
if p.Status == api.JobSucceeded { // the latest init job's status, no separate column needed.
if job, err := deps.Store.GetJob(ctx, p.JobID); err == nil &&
(job.Kind == string(api.JobBackup) || job.Kind == string(api.JobInit)) {
if _, err := deps.Store.MarkHostRepoInitialised(ctx, hostID, p.FinishedAt); err != nil {
slog.Warn("ws: mark repo initialised", "host_id", hostID, "err", err)
}
}
}
if deps.JobHub != nil { if deps.JobHub != nil {
deps.JobHub.Broadcast(p.JobID, env) deps.JobHub.Broadcast(p.JobID, env)
} }
@@ -253,15 +246,6 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
} else { } else {
slog.Info("ws: snapshots refreshed", "host_id", hostID, "count", len(snaps)) slog.Info("ws: snapshots refreshed", "host_id", hostID, "count", len(snaps))
} }
// A non-empty snapshot list also proves the repo is initialised
// (catches the case where an external job — `restic init` from
// the CLI, or a backup ran outside this control plane —
// initialised it before our first job dispatched).
if len(snaps) > 0 {
if _, err := deps.Store.MarkHostRepoInitialised(ctx, hostID, time.Now().UTC()); err != nil {
slog.Warn("ws: mark repo initialised (snapshots)", "host_id", hostID, "err", err)
}
}
case api.MsgScheduleAck: case api.MsgScheduleAck:
var p api.ScheduleAckPayload var p api.ScheduleAckPayload
+27 -22
View File
@@ -42,7 +42,7 @@ func (s *Store) LookupHostByAgentToken(ctx context.Context, tokenHash string) (*
enrolled_at, last_seen_at, status, repo_id, tags, enrolled_at, last_seen_at, status, repo_id, tags,
current_job_id, last_backup_at, last_backup_status, current_job_id, last_backup_at, last_backup_status,
repo_size_bytes, snapshot_count, open_alert_count, repo_size_bytes, snapshot_count, open_alert_count,
applied_schedule_version, repo_initialised_at applied_schedule_version, bandwidth_up_kbps, bandwidth_down_kbps
FROM hosts WHERE agent_token_hash = ?`, FROM hosts WHERE agent_token_hash = ?`,
tokenHash) tokenHash)
return scanHost(row) return scanHost(row)
@@ -55,7 +55,7 @@ func (s *Store) GetHost(ctx context.Context, id string) (*Host, error) {
enrolled_at, last_seen_at, status, repo_id, tags, enrolled_at, last_seen_at, status, repo_id, tags,
current_job_id, last_backup_at, last_backup_status, current_job_id, last_backup_at, last_backup_status,
repo_size_bytes, snapshot_count, open_alert_count, repo_size_bytes, snapshot_count, open_alert_count,
applied_schedule_version, repo_initialised_at applied_schedule_version, bandwidth_up_kbps, bandwidth_down_kbps
FROM hosts WHERE id = ?`, id) FROM hosts WHERE id = ?`, id)
return scanHost(row) return scanHost(row)
} }
@@ -116,7 +116,7 @@ func (s *Store) ListHosts(ctx context.Context) ([]Host, error) {
enrolled_at, last_seen_at, status, repo_id, tags, enrolled_at, last_seen_at, status, repo_id, tags,
current_job_id, last_backup_at, last_backup_status, current_job_id, last_backup_at, last_backup_status,
repo_size_bytes, snapshot_count, open_alert_count, repo_size_bytes, snapshot_count, open_alert_count,
applied_schedule_version, repo_initialised_at applied_schedule_version, bandwidth_up_kbps, bandwidth_down_kbps
FROM hosts ORDER BY name`) FROM hosts ORDER BY name`)
if err != nil { if err != nil {
return nil, fmt.Errorf("store: list hosts: %w", err) return nil, fmt.Errorf("store: list hosts: %w", err)
@@ -154,14 +154,14 @@ func scanHostRow(s hostScanner) (*Host, error) {
repoID, currentJob, lastBkSt sql.NullString repoID, currentJob, lastBkSt sql.NullString
enrolled string enrolled string
tags string tags string
repoInitAt sql.NullString bwUp, bwDown sql.NullInt64
) )
err := s.Scan(&h.ID, &h.Name, &h.OS, &h.Arch, err := s.Scan(&h.ID, &h.Name, &h.OS, &h.Arch,
&h.AgentVersion, &h.ResticVersion, &h.ProtocolVersion, &h.AgentVersion, &h.ResticVersion, &h.ProtocolVersion,
&enrolled, &lastSeen, &h.Status, &repoID, &tags, &enrolled, &lastSeen, &h.Status, &repoID, &tags,
&currentJob, &lastBackupAt, &lastBkSt, &currentJob, &lastBackupAt, &lastBkSt,
&h.RepoSizeBytes, &h.SnapshotCount, &h.OpenAlertCount, &h.RepoSizeBytes, &h.SnapshotCount, &h.OpenAlertCount,
&h.AppliedScheduleVersion, &repoInitAt) &h.AppliedScheduleVersion, &bwUp, &bwDown)
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound return nil, ErrNotFound
@@ -202,28 +202,33 @@ func scanHostRow(s hostScanner) (*Host, error) {
if tags != "" { if tags != "" {
_ = json.Unmarshal([]byte(tags), &h.Tags) _ = json.Unmarshal([]byte(tags), &h.Tags)
} }
if repoInitAt.Valid { if bwUp.Valid {
t, err := time.Parse(time.RFC3339Nano, repoInitAt.String) v := int(bwUp.Int64)
if err != nil { h.BandwidthUpKBps = &v
return nil, fmt.Errorf("store: parse repo_initialised_at: %w", err)
} }
h.RepoInitialisedAt = &t if bwDown.Valid {
v := int(bwDown.Int64)
h.BandwidthDownKBps = &v
} }
return &h, nil return &h, nil
} }
// MarkHostRepoInitialised sets repo_initialised_at to `when` if it is // SetHostBandwidth replaces the host's upload/download caps. Pass nil
// currently NULL. Idempotent: re-firing for an already-initialised // to clear a cap. Caller decides validation; non-positive caps are
// host is a no-op (we never want to clobber the original timestamp). // treated as "no cap" by the agent regardless.
// Returns true if the row was updated, false if it was already set. func (s *Store) SetHostBandwidth(ctx context.Context, hostID string, upKBps, downKBps *int) error {
func (s *Store) MarkHostRepoInitialised(ctx context.Context, hostID string, when time.Time) (bool, error) { _, err := s.db.ExecContext(ctx,
res, err := s.db.ExecContext(ctx, `UPDATE hosts SET bandwidth_up_kbps = ?, bandwidth_down_kbps = ? WHERE id = ?`,
`UPDATE hosts SET repo_initialised_at = ? nullableInt(upKBps), nullableInt(downKBps), hostID)
WHERE id = ? AND repo_initialised_at IS NULL`,
when.UTC().Format(time.RFC3339Nano), hostID)
if err != nil { if err != nil {
return false, fmt.Errorf("store: mark repo initialised: %w", err) return fmt.Errorf("store: set host bandwidth: %w", err)
} }
n, _ := res.RowsAffected() return nil
return n > 0, nil }
func nullableInt(p *int) any {
if p == nil {
return nil
}
return *p
} }
+74
View File
@@ -0,0 +1,74 @@
package store
import (
"context"
"database/sql"
"errors"
"fmt"
)
// CreateDefaultRepoMaintenance inserts the default cadences for a
// host. Called once at host enrolment alongside CreateHost. Safe to
// re-call (uses INSERT OR IGNORE so a manual repair doesn't blow up
// an already-seeded host).
func (st *Store) CreateDefaultRepoMaintenance(ctx context.Context, hostID string) error {
_, err := st.db.ExecContext(ctx,
`INSERT OR IGNORE INTO host_repo_maintenance (host_id) VALUES (?)`,
hostID)
if err != nil {
return fmt.Errorf("store: seed repo maintenance: %w", err)
}
return nil
}
// GetRepoMaintenance returns the cadence row for a host. Returns
// ErrNotFound if absent — caller should usually treat that as
// "needs CreateDefaultRepoMaintenance" rather than a hard error.
func (st *Store) GetRepoMaintenance(ctx context.Context, hostID string) (*HostRepoMaintenance, error) {
row := st.db.QueryRowContext(ctx,
`SELECT host_id, forget_cron, forget_enabled,
prune_cron, prune_enabled,
check_cron, check_enabled, check_subset_pct
FROM host_repo_maintenance WHERE host_id = ?`, hostID)
var (
m HostRepoMaintenance
forgetEnabled, pruneEnabled, checkEnabled int
)
err := row.Scan(&m.HostID,
&m.ForgetCron, &forgetEnabled,
&m.PruneCron, &pruneEnabled,
&m.CheckCron, &checkEnabled, &m.CheckSubsetPct)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("store: get repo maintenance: %w", err)
}
m.ForgetEnabled = forgetEnabled != 0
m.PruneEnabled = pruneEnabled != 0
m.CheckEnabled = checkEnabled != 0
return &m, nil
}
// UpdateRepoMaintenance replaces every editable field. Doesn't bump
// the schedule version — these run on the server's own ticker, not
// the agent's local cron, so the agent doesn't need to know.
func (st *Store) UpdateRepoMaintenance(ctx context.Context, m *HostRepoMaintenance) error {
if m.HostID == "" {
return errors.New("store: repo maintenance host_id required")
}
_, err := st.db.ExecContext(ctx,
`UPDATE host_repo_maintenance SET
forget_cron = ?, forget_enabled = ?,
prune_cron = ?, prune_enabled = ?,
check_cron = ?, check_enabled = ?, check_subset_pct = ?
WHERE host_id = ?`,
m.ForgetCron, boolToInt(m.ForgetEnabled),
m.PruneCron, boolToInt(m.PruneEnabled),
m.CheckCron, boolToInt(m.CheckEnabled), m.CheckSubsetPct,
m.HostID)
if err != nil {
return fmt.Errorf("store: update repo maintenance: %w", err)
}
return nil
}
+103
View File
@@ -0,0 +1,103 @@
package store
import (
"context"
"errors"
"fmt"
"time"
)
// EnqueuePendingRun queues a missed cron tick for the offline-retry
// ticker to dispatch later. Caller (the schedule firing path) sets
// next_attempt_at = now + group.retry_backoff_seconds × 2^(attempt-1).
func (st *Store) EnqueuePendingRun(ctx context.Context, p *PendingRun) error {
if p.ID == "" || p.ScheduleID == "" || p.SourceGroupID == "" || p.HostID == "" {
return errors.New("store: pending run id, schedule_id, source_group_id, host_id required")
}
if p.Attempt == 0 {
p.Attempt = 1
}
if p.NextAttemptAt.IsZero() {
p.NextAttemptAt = time.Now().UTC()
}
if p.ScheduledAt.IsZero() {
p.ScheduledAt = time.Now().UTC()
}
_, err := st.db.ExecContext(ctx,
`INSERT INTO pending_runs (id, schedule_id, source_group_id, host_id,
attempt, next_attempt_at, scheduled_at, last_error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
p.ID, p.ScheduleID, p.SourceGroupID, p.HostID,
p.Attempt,
p.NextAttemptAt.UTC().Format(time.RFC3339Nano),
p.ScheduledAt.UTC().Format(time.RFC3339Nano),
nullableString(p.LastError))
if err != nil {
return fmt.Errorf("store: enqueue pending run: %w", err)
}
return nil
}
// DuePendingRuns returns rows whose next_attempt_at <= now, ordered
// oldest first. Server-side ticker calls this every ~30s.
func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) ([]PendingRun, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT id, schedule_id, source_group_id, host_id, attempt,
next_attempt_at, scheduled_at, COALESCE(last_error, '')
FROM pending_runs
WHERE next_attempt_at <= ?
ORDER BY next_attempt_at
LIMIT ?`,
now.UTC().Format(time.RFC3339Nano), limit)
if err != nil {
return nil, fmt.Errorf("store: due pending runs: %w", err)
}
defer rows.Close()
out := []PendingRun{}
for rows.Next() {
var p PendingRun
var nextAt, scheduledAt string
if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID,
&p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil {
return nil, err
}
if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil {
p.NextAttemptAt = t
}
if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil {
p.ScheduledAt = t
}
out = append(out, p)
}
return out, rows.Err()
}
// DeletePendingRun removes a row by id. Called after successful
// dispatch or after exceeding retry_max.
func (st *Store) DeletePendingRun(ctx context.Context, id string) error {
_, err := st.db.ExecContext(ctx,
`DELETE FROM pending_runs WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("store: delete pending run: %w", err)
}
return nil
}
// BumpPendingRunAttempt increments the attempt counter and updates
// next_attempt_at + last_error. Used after a failed retry — caller
// has decided to try again.
func (st *Store) BumpPendingRunAttempt(ctx context.Context, id string, nextAttemptAt time.Time, lastError string) error {
_, err := st.db.ExecContext(ctx,
`UPDATE pending_runs SET
attempt = attempt + 1,
next_attempt_at = ?,
last_error = ?
WHERE id = ?`,
nextAttemptAt.UTC().Format(time.RFC3339Nano),
nullableString(lastError),
id)
if err != nil {
return fmt.Errorf("store: bump pending run: %w", err)
}
return nil
}
+131 -93
View File
@@ -3,15 +3,14 @@ package store
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"time" "time"
) )
// CreateSchedule inserts a new schedule and bumps the host's // CreateSchedule inserts a new slim schedule row + the schedule_source_groups
// schedule_version atomically. Returns the inserted row's // junction entries for s.SourceGroupIDs, atomic in one tx. Bumps
// CreatedAt / UpdatedAt timestamps written into s. // host_schedule_version. Caller mints s.ID.
func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error {
if s.ID == "" || s.HostID == "" { if s.ID == "" || s.HostID == "" {
return errors.New("store: schedule id and host_id required") return errors.New("store: schedule id and host_id required")
@@ -19,20 +18,6 @@ func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error {
now := time.Now().UTC() now := time.Now().UTC()
s.CreatedAt = now s.CreatedAt = now
s.UpdatedAt = now s.UpdatedAt = now
if s.Paths == nil {
s.Paths = []string{}
}
if s.Excludes == nil {
s.Excludes = []string{}
}
if s.Tags == nil {
s.Tags = []string{}
}
pathsJSON, _ := json.Marshal(s.Paths)
excludesJSON, _ := json.Marshal(s.Excludes)
tagsJSON, _ := json.Marshal(s.Tags)
retentionJSON, _ := json.Marshal(s.RetentionPolicy)
optionsJSON, _ := json.Marshal(s.Options)
tx, err := st.db.BeginTx(ctx, nil) tx, err := st.db.BeginTx(ctx, nil)
if err != nil { if err != nil {
@@ -41,19 +26,16 @@ func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error {
defer func() { _ = tx.Rollback() }() defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx, if _, err := tx.ExecContext(ctx,
`INSERT INTO schedules ( `INSERT INTO schedules (id, host_id, cron_expr, enabled, created_at, updated_at)
id, host_id, kind, cron_expr, paths, excludes, tags, VALUES (?, ?, ?, ?, ?, ?)`,
retention_policy, options, pre_hook, post_hook, enabled, manual, s.ID, s.HostID, s.CronExpr, boolToInt(s.Enabled),
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
s.ID, s.HostID, s.Kind, s.CronExpr,
string(pathsJSON), string(excludesJSON), string(tagsJSON),
string(retentionJSON), string(optionsJSON),
s.PreHook, s.PostHook, boolToInt(s.Enabled), boolToInt(s.Manual),
now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano),
); err != nil { ); err != nil {
return fmt.Errorf("store: create schedule: %w", err) return fmt.Errorf("store: create schedule: %w", err)
} }
if err := writeScheduleGroupsTx(ctx, tx, s.ID, s.SourceGroupIDs); err != nil {
return err
}
if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil {
return err return err
} }
@@ -61,27 +43,11 @@ func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error {
} }
// UpdateSchedule replaces every editable field on an existing row // UpdateSchedule replaces every editable field on an existing row
// and bumps host_schedule_version. ID and HostID must match an // and rewrites the junction. Bumps host_schedule_version.
// existing row; kind is immutable (creating a new schedule is
// cheaper than re-keying retention/hooks).
func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error { func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error {
if s.ID == "" || s.HostID == "" { if s.ID == "" || s.HostID == "" {
return errors.New("store: schedule id and host_id required") return errors.New("store: schedule id and host_id required")
} }
if s.Paths == nil {
s.Paths = []string{}
}
if s.Excludes == nil {
s.Excludes = []string{}
}
if s.Tags == nil {
s.Tags = []string{}
}
pathsJSON, _ := json.Marshal(s.Paths)
excludesJSON, _ := json.Marshal(s.Excludes)
tagsJSON, _ := json.Marshal(s.Tags)
retentionJSON, _ := json.Marshal(s.RetentionPolicy)
optionsJSON, _ := json.Marshal(s.Options)
now := time.Now().UTC() now := time.Now().UTC()
tx, err := st.db.BeginTx(ctx, nil) tx, err := st.db.BeginTx(ctx, nil)
@@ -91,16 +57,10 @@ func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error {
defer func() { _ = tx.Rollback() }() defer func() { _ = tx.Rollback() }()
res, err := tx.ExecContext(ctx, res, err := tx.ExecContext(ctx,
`UPDATE schedules SET `UPDATE schedules
cron_expr = ?, paths = ?, excludes = ?, tags = ?, SET cron_expr = ?, enabled = ?, updated_at = ?
retention_policy = ?, options = ?,
pre_hook = ?, post_hook = ?, enabled = ?, manual = ?,
updated_at = ?
WHERE id = ? AND host_id = ?`, WHERE id = ? AND host_id = ?`,
s.CronExpr, s.CronExpr, boolToInt(s.Enabled),
string(pathsJSON), string(excludesJSON), string(tagsJSON),
string(retentionJSON), string(optionsJSON),
s.PreHook, s.PostHook, boolToInt(s.Enabled), boolToInt(s.Manual),
now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano),
s.ID, s.HostID, s.ID, s.HostID,
) )
@@ -112,14 +72,23 @@ func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error {
return ErrNotFound return ErrNotFound
} }
s.UpdatedAt = now s.UpdatedAt = now
// Rewrite junction wholesale — simpler than diffing.
if _, err := tx.ExecContext(ctx,
`DELETE FROM schedule_source_groups WHERE schedule_id = ?`, s.ID,
); err != nil {
return fmt.Errorf("store: clear schedule junction: %w", err)
}
if err := writeScheduleGroupsTx(ctx, tx, s.ID, s.SourceGroupIDs); err != nil {
return err
}
if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil {
return err return err
} }
return tx.Commit() return tx.Commit()
} }
// DeleteSchedule removes a schedule and bumps host_schedule_version. // DeleteSchedule removes a schedule and its junction rows; bumps
// Returns ErrNotFound if no row matched. // host_schedule_version. Returns ErrNotFound if no row matched.
func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string) error { func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string) error {
tx, err := st.db.BeginTx(ctx, nil) tx, err := st.db.BeginTx(ctx, nil)
if err != nil { if err != nil {
@@ -137,35 +106,39 @@ func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string)
if n == 0 { if n == 0 {
return ErrNotFound return ErrNotFound
} }
// Junction rows go via ON DELETE CASCADE; nothing to do here.
if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil {
return err return err
} }
return tx.Commit() return tx.Commit()
} }
// GetSchedule returns one schedule by (host_id, id). Returns // GetSchedule returns one schedule (with its junction-resolved
// ErrNotFound on miss. // SourceGroupIDs populated) by (host_id, id). ErrNotFound on miss.
func (st *Store) GetSchedule(ctx context.Context, hostID, scheduleID string) (*Schedule, error) { func (st *Store) GetSchedule(ctx context.Context, hostID, scheduleID string) (*Schedule, error) {
row := st.db.QueryRowContext(ctx, row := st.db.QueryRowContext(ctx,
`SELECT id, host_id, kind, cron_expr, paths, excludes, tags, `SELECT id, host_id, cron_expr, enabled, created_at, updated_at
retention_policy, options, pre_hook, post_hook, enabled, manual,
created_at, updated_at
FROM schedules WHERE id = ? AND host_id = ?`, FROM schedules WHERE id = ? AND host_id = ?`,
scheduleID, hostID) scheduleID, hostID)
s, err := scanSchedule(row) s, err := scanSchedule(row)
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound return nil, ErrNotFound
} }
return s, err if err != nil {
return nil, err
}
s.SourceGroupIDs, err = st.scheduleGroupIDs(ctx, scheduleID)
if err != nil {
return nil, err
}
return s, nil
} }
// ListSchedulesByHost returns every schedule for a host, ordered // ListSchedulesByHost returns every schedule for a host, with
// by created_at. Empty slice on miss (not an error). // SourceGroupIDs resolved.
func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Schedule, error) { func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Schedule, error) {
rows, err := st.db.QueryContext(ctx, rows, err := st.db.QueryContext(ctx,
`SELECT id, host_id, kind, cron_expr, paths, excludes, tags, `SELECT id, host_id, cron_expr, enabled, created_at, updated_at
retention_policy, options, pre_hook, post_hook, enabled, manual,
created_at, updated_at
FROM schedules WHERE host_id = ? ORDER BY created_at`, FROM schedules WHERE host_id = ? ORDER BY created_at`,
hostID) hostID)
if err != nil { if err != nil {
@@ -180,11 +153,22 @@ func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Sche
} }
out = append(out, *s) out = append(out, *s)
} }
return out, rows.Err() if err := rows.Err(); err != nil {
return nil, err
}
// Second pass to resolve junctions — small fleet, cheap.
for i := range out {
ids, err := st.scheduleGroupIDs(ctx, out[i].ID)
if err != nil {
return nil, err
}
out[i].SourceGroupIDs = ids
}
return out, nil
} }
// GetHostScheduleVersion returns the current version for a host, // GetHostScheduleVersion returns the current version for a host, or
// or 0 if no row exists yet. // 0 if no row exists yet.
func (st *Store) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) { func (st *Store) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) {
var v int64 var v int64
err := st.db.QueryRowContext(ctx, err := st.db.QueryRowContext(ctx,
@@ -210,12 +194,26 @@ func (st *Store) SetHostAppliedScheduleVersion(ctx context.Context, hostID strin
return nil return nil
} }
// BumpHostScheduleVersion is the public wrapper for non-schedule
// CRUD that needs to push to the agent — e.g. source-group edits
// (paths/retention change), retry-policy edits.
func (st *Store) BumpHostScheduleVersion(ctx context.Context, hostID string) error {
tx, err := st.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil {
return err
}
return tx.Commit()
}
// bumpHostScheduleVersionTx upserts host_schedule_version, +1 each // bumpHostScheduleVersionTx upserts host_schedule_version, +1 each
// call. Caller owns the tx. // call. Caller owns the tx.
func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) error { func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) error {
if _, err := tx.ExecContext(ctx, if _, err := tx.ExecContext(ctx,
`INSERT INTO host_schedule_version (host_id, version) `INSERT INTO host_schedule_version (host_id, version) VALUES (?, 1)
VALUES (?, 1)
ON CONFLICT(host_id) DO UPDATE SET version = version + 1`, ON CONFLICT(host_id) DO UPDATE SET version = version + 1`,
hostID); err != nil { hostID); err != nil {
return fmt.Errorf("store: bump schedule version: %w", err) return fmt.Errorf("store: bump schedule version: %w", err)
@@ -223,6 +221,66 @@ func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) e
return nil return nil
} }
// writeScheduleGroupsTx inserts the junction rows for one schedule.
// Caller owns the tx; assumes the table is already empty for this id
// (Update wipes before calling; Create starts empty).
func writeScheduleGroupsTx(ctx context.Context, tx *sql.Tx, scheduleID string, groupIDs []string) error {
for _, gid := range groupIDs {
if gid == "" {
continue
}
if _, err := tx.ExecContext(ctx,
`INSERT INTO schedule_source_groups (schedule_id, source_group_id) VALUES (?, ?)`,
scheduleID, gid,
); err != nil {
return fmt.Errorf("store: link schedule %s to group %s: %w", scheduleID, gid, err)
}
}
return nil
}
// scheduleGroupIDs reads the junction for one schedule.
func (st *Store) scheduleGroupIDs(ctx context.Context, scheduleID string) ([]string, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT source_group_id FROM schedule_source_groups WHERE schedule_id = ?`,
scheduleID)
if err != nil {
return nil, fmt.Errorf("store: read schedule junction: %w", err)
}
defer rows.Close()
out := []string{}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
out = append(out, id)
}
return out, rows.Err()
}
// SchedulesUsingGroup is the inverse — list schedule IDs that
// reference a given source group. Used for retention-conflict
// detection and "X schedules use this group" UI labels.
func (st *Store) SchedulesUsingGroup(ctx context.Context, groupID string) ([]string, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT schedule_id FROM schedule_source_groups WHERE source_group_id = ?`,
groupID)
if err != nil {
return nil, fmt.Errorf("store: schedules using group: %w", err)
}
defer rows.Close()
out := []string{}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
out = append(out, id)
}
return out, rows.Err()
}
// ----- scan helpers -------------------------------------------------- // ----- scan helpers --------------------------------------------------
func scanSchedule(row *sql.Row) (*Schedule, error) { func scanSchedule(row *sql.Row) (*Schedule, error) {
@@ -236,34 +294,14 @@ type scheduleScanner interface {
func scanScheduleRow(s scheduleScanner) (*Schedule, error) { func scanScheduleRow(s scheduleScanner) (*Schedule, error) {
var ( var (
out Schedule out Schedule
paths, excludes, tags, retention, options string
createdAt, updatedAt string createdAt, updatedAt string
enabled, manual int enabled int
) )
err := s.Scan(&out.ID, &out.HostID, &out.Kind, &out.CronExpr, err := s.Scan(&out.ID, &out.HostID, &out.CronExpr, &enabled, &createdAt, &updatedAt)
&paths, &excludes, &tags, &retention, &options,
&out.PreHook, &out.PostHook, &enabled, &manual,
&createdAt, &updatedAt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if paths != "" {
_ = json.Unmarshal([]byte(paths), &out.Paths)
}
if excludes != "" {
_ = json.Unmarshal([]byte(excludes), &out.Excludes)
}
if tags != "" {
_ = json.Unmarshal([]byte(tags), &out.Tags)
}
if retention != "" {
_ = json.Unmarshal([]byte(retention), &out.RetentionPolicy)
}
if options != "" {
_ = json.Unmarshal([]byte(options), &out.Options)
}
out.Enabled = enabled != 0 out.Enabled = enabled != 0
out.Manual = manual != 0
if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil { if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil {
out.CreatedAt = t out.CreatedAt = t
} }
+70 -26
View File
@@ -21,29 +21,37 @@ func makeSchedHost(t *testing.T, s *Store) string {
return id return id
} }
// makeGroup is a minimal source-group helper for schedule tests
// (schedules need at least one group to point at).
func makeGroup(t *testing.T, s *Store, hostID, name, id string) string {
t.Helper()
if err := s.CreateSourceGroup(context.Background(), &SourceGroup{
ID: id, HostID: hostID, Name: name,
Includes: []string{"/etc"},
}); err != nil {
t.Fatalf("create group %s: %v", name, err)
}
return id
}
func TestSchedulesCRUDAndVersionBump(t *testing.T) { func TestSchedulesCRUDAndVersionBump(t *testing.T) {
t.Parallel() t.Parallel()
s := openTestStore(t) s := openTestStore(t)
ctx := context.Background() ctx := context.Background()
hostID := makeSchedHost(t, s) hostID := makeSchedHost(t, s)
gid := makeGroup(t, s, hostID, "default", "01HSCHEDGRP00000000000001")
// Initial version is 0 (no row). // Group creation already bumped version to 1.
v, err := s.GetHostScheduleVersion(ctx, hostID) v, _ := s.GetHostScheduleVersion(ctx, hostID)
if err != nil { if v != 1 {
t.Fatal(err) t.Fatalf("version after group create: got %d, want 1", v)
}
if v != 0 {
t.Fatalf("initial version: got %d, want 0", v)
} }
keepLast := 7
sched := Schedule{ sched := Schedule{
ID: "01SCHED000000000000000001", HostID: hostID, ID: "01SCHED000000000000000001", HostID: hostID,
Kind: "backup", CronExpr: "0 3 * * *", CronExpr: "0 3 * * *",
Paths: []string{"/etc", "/home"},
Tags: []string{"nightly"},
RetentionPolicy: RetentionPolicy{KeepLast: &keepLast},
Enabled: true, Enabled: true,
SourceGroupIDs: []string{gid},
} }
if err := s.CreateSchedule(ctx, &sched); err != nil { if err := s.CreateSchedule(ctx, &sched); err != nil {
t.Fatalf("create: %v", err) t.Fatalf("create: %v", err)
@@ -53,37 +61,33 @@ func TestSchedulesCRUDAndVersionBump(t *testing.T) {
} }
v, _ = s.GetHostScheduleVersion(ctx, hostID) v, _ = s.GetHostScheduleVersion(ctx, hostID)
if v != 1 { if v != 2 {
t.Fatalf("version after create: got %d, want 1", v) t.Fatalf("version after schedule create: got %d, want 2", v)
} }
// Round-trip read. // Round-trip read — junction populated.
got, err := s.GetSchedule(ctx, hostID, sched.ID) got, err := s.GetSchedule(ctx, hostID, sched.ID)
if err != nil { if err != nil {
t.Fatalf("get: %v", err) t.Fatalf("get: %v", err)
} }
if got.CronExpr != "0 3 * * *" || len(got.Paths) != 2 { if got.CronExpr != "0 3 * * *" || len(got.SourceGroupIDs) != 1 || got.SourceGroupIDs[0] != gid {
t.Fatalf("round-trip lost data: %+v", got) t.Fatalf("round-trip: %+v", got)
}
if got.RetentionPolicy.KeepLast == nil || *got.RetentionPolicy.KeepLast != 7 {
t.Fatalf("retention round-trip: %+v", got.RetentionPolicy)
} }
// List sees it.
list, err := s.ListSchedulesByHost(ctx, hostID) list, err := s.ListSchedulesByHost(ctx, hostID)
if err != nil || len(list) != 1 || list[0].ID != sched.ID { if err != nil || len(list) != 1 || list[0].ID != sched.ID {
t.Fatalf("list: err=%v rows=%v", err, list) t.Fatalf("list: err=%v rows=%v", err, list)
} }
// Update bumps version. // Update — flip enabled, swap junction (re-add same gid). Version bumps.
sched.CronExpr = "*/30 * * * *" sched.CronExpr = "*/30 * * * *"
sched.Enabled = false sched.Enabled = false
if err := s.UpdateSchedule(ctx, &sched); err != nil { if err := s.UpdateSchedule(ctx, &sched); err != nil {
t.Fatalf("update: %v", err) t.Fatalf("update: %v", err)
} }
v, _ = s.GetHostScheduleVersion(ctx, hostID) v, _ = s.GetHostScheduleVersion(ctx, hostID)
if v != 2 { if v != 3 {
t.Fatalf("version after update: got %d, want 2", v) t.Fatalf("version after update: got %d, want 3", v)
} }
got, _ = s.GetSchedule(ctx, hostID, sched.ID) got, _ = s.GetSchedule(ctx, hostID, sched.ID)
if got.CronExpr != "*/30 * * * *" || got.Enabled { if got.CronExpr != "*/30 * * * *" || got.Enabled {
@@ -95,14 +99,54 @@ func TestSchedulesCRUDAndVersionBump(t *testing.T) {
t.Fatalf("delete: %v", err) t.Fatalf("delete: %v", err)
} }
v, _ = s.GetHostScheduleVersion(ctx, hostID) v, _ = s.GetHostScheduleVersion(ctx, hostID)
if v != 3 { if v != 4 {
t.Fatalf("version after delete: got %d, want 3", v) t.Fatalf("version after delete: got %d, want 4", v)
} }
if err := s.DeleteSchedule(ctx, hostID, sched.ID); !errors.Is(err, ErrNotFound) { if err := s.DeleteSchedule(ctx, hostID, sched.ID); !errors.Is(err, ErrNotFound) {
t.Fatalf("delete after delete: want ErrNotFound, got %v", err) t.Fatalf("delete after delete: want ErrNotFound, got %v", err)
} }
} }
func TestSchedulesUsingGroup(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostID := makeSchedHost(t, s)
g1 := makeGroup(t, s, hostID, "default", "01HUSEGRPGRP000000000001")
g2 := makeGroup(t, s, hostID, "databases", "01HUSEGRPGRP000000000002")
// Schedule A points at g1 only; Schedule B points at both.
if err := s.CreateSchedule(ctx, &Schedule{
ID: "01HUSEGRPSCHED0000000001", HostID: hostID,
CronExpr: "@hourly", Enabled: true,
SourceGroupIDs: []string{g1},
}); err != nil {
t.Fatal(err)
}
if err := s.CreateSchedule(ctx, &Schedule{
ID: "01HUSEGRPSCHED0000000002", HostID: hostID,
CronExpr: "0 3 * * *", Enabled: true,
SourceGroupIDs: []string{g1, g2},
}); err != nil {
t.Fatal(err)
}
g1Sched, err := s.SchedulesUsingGroup(ctx, g1)
if err != nil {
t.Fatal(err)
}
if len(g1Sched) != 2 {
t.Fatalf("g1 should be in 2 schedules, got %d", len(g1Sched))
}
g2Sched, err := s.SchedulesUsingGroup(ctx, g2)
if err != nil {
t.Fatal(err)
}
if len(g2Sched) != 1 {
t.Fatalf("g2 should be in 1 schedule, got %d", len(g2Sched))
}
}
func TestSetHostAppliedScheduleVersion(t *testing.T) { func TestSetHostAppliedScheduleVersion(t *testing.T) {
t.Parallel() t.Parallel()
s := openTestStore(t) s := openTestStore(t)
+261
View File
@@ -0,0 +1,261 @@
package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"
)
// CreateSourceGroup inserts a new group + bumps host_schedule_version
// in one tx. Group name doubles as the snapshot tag on backups; the
// (host_id, name) UNIQUE constraint enforces tag unambiguity.
func (st *Store) CreateSourceGroup(ctx context.Context, g *SourceGroup) error {
if g.ID == "" || g.HostID == "" || g.Name == "" {
return errors.New("store: source group id, host_id, name required")
}
now := time.Now().UTC()
g.CreatedAt = now
g.UpdatedAt = now
if g.Includes == nil {
g.Includes = []string{}
}
if g.Excludes == nil {
g.Excludes = []string{}
}
if g.RetryMax == 0 {
g.RetryMax = 3
}
if g.RetryBackoffSeconds == 0 {
g.RetryBackoffSeconds = 60
}
includesJSON, _ := json.Marshal(g.Includes)
excludesJSON, _ := json.Marshal(g.Excludes)
retentionJSON, _ := json.Marshal(g.RetentionPolicy)
tx, err := st.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("store: begin tx: %w", err)
}
defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx,
`INSERT INTO source_groups (
id, host_id, name, includes, excludes, retention_policy,
retry_max, retry_backoff_seconds, conflict_dimension,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
g.ID, g.HostID, g.Name,
string(includesJSON), string(excludesJSON), string(retentionJSON),
g.RetryMax, g.RetryBackoffSeconds,
nullableString(g.ConflictDimension),
now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano),
); err != nil {
return fmt.Errorf("store: create source group: %w", err)
}
if err := bumpHostScheduleVersionTx(ctx, tx, g.HostID); err != nil {
return err
}
return tx.Commit()
}
// UpdateSourceGroup replaces every editable field on an existing row
// and bumps host_schedule_version. Returns ErrNotFound if no row matched.
func (st *Store) UpdateSourceGroup(ctx context.Context, g *SourceGroup) error {
if g.ID == "" || g.HostID == "" || g.Name == "" {
return errors.New("store: source group id, host_id, name required")
}
if g.Includes == nil {
g.Includes = []string{}
}
if g.Excludes == nil {
g.Excludes = []string{}
}
includesJSON, _ := json.Marshal(g.Includes)
excludesJSON, _ := json.Marshal(g.Excludes)
retentionJSON, _ := json.Marshal(g.RetentionPolicy)
now := time.Now().UTC()
tx, err := st.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("store: begin tx: %w", err)
}
defer func() { _ = tx.Rollback() }()
res, err := tx.ExecContext(ctx,
`UPDATE source_groups SET
name = ?, includes = ?, excludes = ?, retention_policy = ?,
retry_max = ?, retry_backoff_seconds = ?, conflict_dimension = ?,
updated_at = ?
WHERE id = ? AND host_id = ?`,
g.Name,
string(includesJSON), string(excludesJSON), string(retentionJSON),
g.RetryMax, g.RetryBackoffSeconds,
nullableString(g.ConflictDimension),
now.Format(time.RFC3339Nano),
g.ID, g.HostID,
)
if err != nil {
return fmt.Errorf("store: update source group: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
return ErrNotFound
}
g.UpdatedAt = now
if err := bumpHostScheduleVersionTx(ctx, tx, g.HostID); err != nil {
return err
}
return tx.Commit()
}
// DeleteSourceGroup removes a group and bumps host_schedule_version.
// Junction rows in schedule_source_groups go via ON DELETE CASCADE.
// Caller is expected to have already enforced the "default group
// can't be the only one" UI rule; this layer just deletes.
func (st *Store) DeleteSourceGroup(ctx context.Context, hostID, groupID string) error {
tx, err := st.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("store: begin tx: %w", err)
}
defer func() { _ = tx.Rollback() }()
res, err := tx.ExecContext(ctx,
`DELETE FROM source_groups WHERE id = ? AND host_id = ?`,
groupID, hostID)
if err != nil {
return fmt.Errorf("store: delete source group: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
return ErrNotFound
}
if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil {
return err
}
return tx.Commit()
}
// GetSourceGroup returns one group by (host_id, id). ErrNotFound on miss.
func (st *Store) GetSourceGroup(ctx context.Context, hostID, groupID string) (*SourceGroup, error) {
row := st.db.QueryRowContext(ctx,
`SELECT id, host_id, name, includes, excludes, retention_policy,
retry_max, retry_backoff_seconds, conflict_dimension,
created_at, updated_at
FROM source_groups WHERE id = ? AND host_id = ?`,
groupID, hostID)
g, err := scanSourceGroup(row)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
return g, err
}
// GetSourceGroupByName resolves a group by its (host-unique) name.
// Used by retention-conflict detection and the auto-init flow.
func (st *Store) GetSourceGroupByName(ctx context.Context, hostID, name string) (*SourceGroup, error) {
row := st.db.QueryRowContext(ctx,
`SELECT id, host_id, name, includes, excludes, retention_policy,
retry_max, retry_backoff_seconds, conflict_dimension,
created_at, updated_at
FROM source_groups WHERE host_id = ? AND name = ?`,
hostID, name)
g, err := scanSourceGroup(row)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
return g, err
}
// ListSourceGroupsByHost returns every group for a host, ordered
// by name (so 'default' isn't always at the bottom alphabetically —
// well, it usually IS the only 'd' name on a fresh host so this
// works out fine). Empty slice on miss.
func (st *Store) ListSourceGroupsByHost(ctx context.Context, hostID string) ([]SourceGroup, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT id, host_id, name, includes, excludes, retention_policy,
retry_max, retry_backoff_seconds, conflict_dimension,
created_at, updated_at
FROM source_groups WHERE host_id = ? ORDER BY name`,
hostID)
if err != nil {
return nil, fmt.Errorf("store: list source groups: %w", err)
}
defer rows.Close()
out := []SourceGroup{}
for rows.Next() {
g, err := scanSourceGroupRow(rows)
if err != nil {
return nil, err
}
out = append(out, *g)
}
return out, rows.Err()
}
// SetSourceGroupConflict updates only the cached conflict_dimension.
// Doesn't bump schedule version (the cache is server-internal, agent
// doesn't see it). Empty string clears the conflict.
func (st *Store) SetSourceGroupConflict(ctx context.Context, groupID, dimension string) error {
_, err := st.db.ExecContext(ctx,
`UPDATE source_groups SET conflict_dimension = ? WHERE id = ?`,
nullableString(dimension), groupID)
if err != nil {
return fmt.Errorf("store: set source group conflict: %w", err)
}
return nil
}
// ----- scan helpers --------------------------------------------------
func scanSourceGroup(row *sql.Row) (*SourceGroup, error) {
return scanSourceGroupRow(row)
}
type sourceGroupScanner interface {
Scan(dest ...any) error
}
func scanSourceGroupRow(s sourceGroupScanner) (*SourceGroup, error) {
var (
out SourceGroup
includes, excludes, retention string
conflict sql.NullString
createdAt, updatedAt string
)
err := s.Scan(&out.ID, &out.HostID, &out.Name,
&includes, &excludes, &retention,
&out.RetryMax, &out.RetryBackoffSeconds, &conflict,
&createdAt, &updatedAt)
if err != nil {
return nil, err
}
if includes != "" {
_ = json.Unmarshal([]byte(includes), &out.Includes)
}
if excludes != "" {
_ = json.Unmarshal([]byte(excludes), &out.Excludes)
}
if retention != "" {
_ = json.Unmarshal([]byte(retention), &out.RetentionPolicy)
}
if conflict.Valid {
out.ConflictDimension = conflict.String
}
if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil {
out.CreatedAt = t
}
if t, err := time.Parse(time.RFC3339Nano, updatedAt); err == nil {
out.UpdatedAt = t
}
return &out, nil
}
func nullableString(s string) any {
if s == "" {
return nil
}
return s
}
+221
View File
@@ -0,0 +1,221 @@
package store
import (
"context"
"errors"
"testing"
"time"
)
func TestSourceGroupCRUDAndVersionBump(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostID := makeSchedHost(t, s)
keepLast := 7
g := SourceGroup{
ID: "01HSGRP00000000000000001", HostID: hostID, Name: "default",
Includes: []string{"/etc", "/home"},
Excludes: []string{"*.tmp"},
RetentionPolicy: RetentionPolicy{KeepLast: &keepLast},
}
if err := s.CreateSourceGroup(ctx, &g); err != nil {
t.Fatalf("create: %v", err)
}
if g.RetryMax != 3 || g.RetryBackoffSeconds != 60 {
t.Fatalf("retry defaults not applied: %+v", g)
}
v, _ := s.GetHostScheduleVersion(ctx, hostID)
if v != 1 {
t.Fatalf("version after create: got %d, want 1", v)
}
// Round-trip.
got, err := s.GetSourceGroup(ctx, hostID, g.ID)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Name != "default" || len(got.Includes) != 2 || len(got.Excludes) != 1 {
t.Fatalf("round-trip: %+v", got)
}
if got.RetentionPolicy.KeepLast == nil || *got.RetentionPolicy.KeepLast != 7 {
t.Fatalf("retention round-trip: %+v", got.RetentionPolicy)
}
// By name.
byName, err := s.GetSourceGroupByName(ctx, hostID, "default")
if err != nil || byName.ID != g.ID {
t.Fatalf("get by name: err=%v got=%v", err, byName)
}
// Update — rename + new retention. Version bumps.
keepDaily := 14
g.Name = "system"
g.RetentionPolicy = RetentionPolicy{KeepDaily: &keepDaily}
if err := s.UpdateSourceGroup(ctx, &g); err != nil {
t.Fatal(err)
}
v, _ = s.GetHostScheduleVersion(ctx, hostID)
if v != 2 {
t.Fatalf("version after update: got %d, want 2", v)
}
got, _ = s.GetSourceGroup(ctx, hostID, g.ID)
if got.Name != "system" || got.RetentionPolicy.KeepLast != nil || got.RetentionPolicy.KeepDaily == nil {
t.Fatalf("update did not persist: %+v", got)
}
// Conflict cache (no version bump).
if err := s.SetSourceGroupConflict(ctx, g.ID, "hourly"); err != nil {
t.Fatal(err)
}
got, _ = s.GetSourceGroup(ctx, hostID, g.ID)
if got.ConflictDimension != "hourly" {
t.Fatalf("conflict not cached: %q", got.ConflictDimension)
}
v2, _ := s.GetHostScheduleVersion(ctx, hostID)
if v2 != v {
t.Fatalf("conflict cache should not bump version: %d → %d", v, v2)
}
// List.
list, _ := s.ListSourceGroupsByHost(ctx, hostID)
if len(list) != 1 || list[0].ID != g.ID {
t.Fatalf("list: %v", list)
}
// Delete bumps version.
if err := s.DeleteSourceGroup(ctx, hostID, g.ID); err != nil {
t.Fatal(err)
}
v3, _ := s.GetHostScheduleVersion(ctx, hostID)
if v3 != 3 {
t.Fatalf("version after delete: got %d, want 3", v3)
}
if err := s.DeleteSourceGroup(ctx, hostID, g.ID); !errors.Is(err, ErrNotFound) {
t.Fatalf("delete after delete: want ErrNotFound, got %v", err)
}
}
func TestSourceGroupNameUniquePerHost(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostID := makeSchedHost(t, s)
if err := s.CreateSourceGroup(ctx, &SourceGroup{
ID: "01HUNIQGRP00000000000001", HostID: hostID, Name: "shared",
}); err != nil {
t.Fatal(err)
}
err := s.CreateSourceGroup(ctx, &SourceGroup{
ID: "01HUNIQGRP00000000000002", HostID: hostID, Name: "shared",
})
if err == nil {
t.Fatal("expected unique-constraint error on duplicate name within host")
}
}
func TestRepoMaintenanceDefaultsAndUpdate(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostID := makeSchedHost(t, s)
if _, err := s.GetRepoMaintenance(ctx, hostID); !errors.Is(err, ErrNotFound) {
t.Fatalf("expected ErrNotFound before seed, got %v", err)
}
if err := s.CreateDefaultRepoMaintenance(ctx, hostID); err != nil {
t.Fatal(err)
}
m, err := s.GetRepoMaintenance(ctx, hostID)
if err != nil {
t.Fatal(err)
}
if m.ForgetCron != "0 3 * * *" || !m.ForgetEnabled {
t.Fatalf("forget defaults: %+v", m)
}
if m.PruneCron != "0 4 * * 0" || m.CheckSubsetPct != 5 {
t.Fatalf("other defaults: %+v", m)
}
m.ForgetCron = "0 4 * * *"
m.PruneEnabled = false
m.CheckSubsetPct = 10
if err := s.UpdateRepoMaintenance(ctx, m); err != nil {
t.Fatal(err)
}
m2, _ := s.GetRepoMaintenance(ctx, hostID)
if m2.ForgetCron != "0 4 * * *" || m2.PruneEnabled || m2.CheckSubsetPct != 10 {
t.Fatalf("update did not persist: %+v", m2)
}
// CreateDefaultRepoMaintenance is idempotent (INSERT OR IGNORE).
if err := s.CreateDefaultRepoMaintenance(ctx, hostID); err != nil {
t.Fatal(err)
}
m3, _ := s.GetRepoMaintenance(ctx, hostID)
if m3.ForgetCron != "0 4 * * *" {
t.Fatalf("INSERT OR IGNORE clobbered existing row: %+v", m3)
}
}
func TestPendingRunQueue(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostID := makeSchedHost(t, s)
gid := makeGroup(t, s, hostID, "default", "01HPENDGRP00000000000001")
schedID := "01HPENDSCHED0000000000001"
if err := s.CreateSchedule(ctx, &Schedule{
ID: schedID, HostID: hostID, CronExpr: "@hourly", Enabled: true,
SourceGroupIDs: []string{gid},
}); err != nil {
t.Fatal(err)
}
now := time.Now().UTC()
if err := s.EnqueuePendingRun(ctx, &PendingRun{
ID: "01HPEND00000000000000001",
ScheduleID: schedID, SourceGroupID: gid, HostID: hostID,
NextAttemptAt: now.Add(-time.Second), // already due
ScheduledAt: now.Add(-time.Minute),
}); err != nil {
t.Fatal(err)
}
due, err := s.DuePendingRuns(ctx, now, 10)
if err != nil {
t.Fatal(err)
}
if len(due) != 1 {
t.Fatalf("due: got %d, want 1", len(due))
}
if due[0].Attempt != 1 {
t.Fatalf("attempt: %d", due[0].Attempt)
}
// Bump.
next := now.Add(2 * time.Minute)
if err := s.BumpPendingRunAttempt(ctx, due[0].ID, next, "agent offline"); err != nil {
t.Fatal(err)
}
// No longer due at `now`.
due, _ = s.DuePendingRuns(ctx, now, 10)
if len(due) != 0 {
t.Fatalf("should not be due yet: %v", due)
}
// Due at `next`.
due, _ = s.DuePendingRuns(ctx, next, 10)
if len(due) != 1 || due[0].Attempt != 2 || due[0].LastError != "agent offline" {
t.Fatalf("after bump: %+v", due)
}
if err := s.DeletePendingRun(ctx, due[0].ID); err != nil {
t.Fatal(err)
}
due, _ = s.DuePendingRuns(ctx, next, 10)
if len(due) != 0 {
t.Fatalf("after delete: %v", due)
}
}
+70 -41
View File
@@ -30,7 +30,7 @@ const (
// token; the DB stores its hash. Callers that hold a *Session have // token; the DB stores its hash. Callers that hold a *Session have
// already authenticated. // already authenticated.
type Session struct { type Session struct {
ID string // session token (raw); never persisted as-is ID string
UserID string UserID string
CreatedAt time.Time CreatedAt time.Time
ExpiresAt time.Time ExpiresAt time.Time
@@ -38,8 +38,9 @@ type Session struct {
UA string UA string
} }
// Host mirrors the denormalised hosts table. JSON columns (tags) are // Host mirrors the hosts table. The P2 redesign moved repo-related
// returned decoded into Go slices for ergonomics. // flags out (auto-init replaces RepoInitialisedAt; bandwidth lives
// here as a host-wide cap; "what to back up" lives on source_groups).
type Host struct { type Host struct {
ID string ID string
Name string Name string
@@ -60,44 +61,54 @@ type Host struct {
SnapshotCount int SnapshotCount int
OpenAlertCount int OpenAlertCount int
AppliedScheduleVersion int64 AppliedScheduleVersion int64
// RepoInitialisedAt is non-nil once we've confirmed the host's // Host-wide bandwidth caps applied to every restic invocation
// repo has been initialised — either the operator clicked the // (backup, restore, prune). nil = no cap.
// init button, or a backup succeeded, or snapshots.report came BandwidthUpKBps *int
// back non-empty. The host detail run-now panel shows a red BandwidthDownKBps *int
// "Initialise repo" affordance while this is nil.
RepoInitialisedAt *time.Time
} }
// Schedule mirrors one row of the schedules table. JSON columns // Schedule is now intentionally slim: cron + which groups + enabled.
// (paths, excludes, tags, retention_policy, options) are decoded // The "what" lives on SourceGroup (paths, excludes, retention, retry).
// into Go-native shapes for ergonomics; the wire form on the agent // The "kind" of operation a schedule can drive is implicit — backup
// side keeps retention_policy / options as raw JSON since the agent // only. forget/prune/check are repo-level cadences on
// just forwards them to restic. // HostRepoMaintenance, not schedule kinds.
type Schedule struct { type Schedule struct {
ID string ID string
HostID string HostID string
Kind string
CronExpr string CronExpr string
Paths []string
Excludes []string
Tags []string
RetentionPolicy RetentionPolicy
Options ScheduleOptions
PreHook string
PostHook string
Enabled bool Enabled bool
// Manual schedules carry paths/excludes/tags/retention like any CreatedAt time.Time
// other but have no cron — they only fire when the operator UpdatedAt time.Time
// clicks Run-now. Lets us keep one data shape for "what gets // SourceGroupIDs is populated by ListSchedulesByHost (joins
// backed up" without forcing every host to have an automated // schedule_source_groups) and accepted on Create / Update so the
// schedule. Created by Add-host with the typed paths. // caller passes the desired junction state in one shape.
Manual bool SourceGroupIDs []string
}
// SourceGroup is the new home for "what gets backed up." A named
// bundle of include + exclude paths plus a retention policy. Group
// name doubles as the snapshot tag (restic --tag <name>) so retention
// can target it via `restic forget --tag`.
type SourceGroup struct {
ID string
HostID string
Name string
Includes []string
Excludes []string
RetentionPolicy RetentionPolicy
RetryMax int
RetryBackoffSeconds int
// ConflictDimension is the cached name of the failing keep-* on
// a granularity↔cadence mismatch (e.g. "hourly" when keep-hourly
// is set but no schedule pointing at this group fires sub-daily).
// Empty means no conflict. Refreshed on every schedule + group CRUD.
ConflictDimension string
CreatedAt time.Time CreatedAt time.Time
UpdatedAt time.Time UpdatedAt time.Time
} }
// RetentionPolicy is the typed view of `restic forget --keep-*`. // RetentionPolicy is the typed view of `restic forget --keep-*`.
// All fields nullable so empty == "no policy / keep everything". // All fields nullable; empty struct = "keep everything for this group."
type RetentionPolicy struct { type RetentionPolicy struct {
KeepLast *int `json:"keep_last,omitempty"` KeepLast *int `json:"keep_last,omitempty"`
KeepHourly *int `json:"keep_hourly,omitempty"` KeepHourly *int `json:"keep_hourly,omitempty"`
@@ -107,8 +118,8 @@ type RetentionPolicy struct {
KeepYearly *int `json:"keep_yearly,omitempty"` KeepYearly *int `json:"keep_yearly,omitempty"`
} }
// Summary renders a compact human view of the policy for templates // Summary renders a compact human view — "last=7, d=14, w=4" or
// and logs — "last=7, d=14, w=4" or "—" when nothing is set. // "—" when nothing is set. Used by templates and logs.
func (p RetentionPolicy) Summary() string { func (p RetentionPolicy) Summary() string {
parts := []string{} parts := []string{}
for _, kv := range []struct { for _, kv := range []struct {
@@ -132,18 +143,36 @@ func (p RetentionPolicy) Summary() string {
return strings.Join(parts, ", ") return strings.Join(parts, ", ")
} }
// ScheduleOptions covers per-schedule knobs that aren't core to the // HostRepoMaintenance carries the host-level cron cadences for the
// command itself — currently bandwidth caps. Stored as JSON so // three repo-wide verbs (forget / prune / check). 1:1 with hosts;
// future fields don't churn the schema. // row is auto-created at host enrolment with sensible defaults.
type ScheduleOptions struct { type HostRepoMaintenance struct {
LimitUploadKBps *int `json:"limit_upload_kbps,omitempty"` HostID string
LimitDownloadKBps *int `json:"limit_download_kbps,omitempty"` ForgetCron string
ForgetEnabled bool
PruneCron string
PruneEnabled bool
CheckCron string
CheckEnabled bool
CheckSubsetPct int
} }
// EnrollmentToken is the issuer's view of a one-time token. The // PendingRun queues a missed cron tick (agent was offline) for the
// raw token is returned only at create time; the DB stores its hash. // server-side retry ticker to dispatch later.
type PendingRun struct {
ID string
ScheduleID string
SourceGroupID string
HostID string
Attempt int
NextAttemptAt time.Time
ScheduledAt time.Time // original cron tick — forensic / audit
LastError string
}
// EnrollmentToken is the issuer's view of a one-time token.
type EnrollmentToken struct { type EnrollmentToken struct {
Raw string // populated on create only Raw string
TokenHash string TokenHash string
CreatedAt time.Time CreatedAt time.Time
ExpiresAt time.Time ExpiresAt time.Time
@@ -153,7 +182,7 @@ type EnrollmentToken struct {
type AuditEntry struct { type AuditEntry struct {
ID string ID string
UserID *string UserID *string
Actor string // user|agent|system Actor string
Action string Action string
TargetKind *string TargetKind *string
TargetID *string TargetID *string