diff --git a/cmd/agent/main.go b/cmd/agent/main.go index e5d0f38..6f8a229 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "errors" "flag" "fmt" @@ -336,28 +335,26 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc slog.Info("agent: init job complete", "job_id", p.JobID) }() case api.JobForget: - var policy restic.ForgetPolicy - if len(p.RetentionPolicy) > 0 { - var raw struct { - KeepLast *int `json:"keep_last,omitempty"` - KeepHourly *int `json:"keep_hourly,omitempty"` - KeepDaily *int `json:"keep_daily,omitempty"` - KeepWeekly *int `json:"keep_weekly,omitempty"` - KeepMonthly *int `json:"keep_monthly,omitempty"` - KeepYearly *int `json:"keep_yearly,omitempty"` - } - if err := json.Unmarshal(p.RetentionPolicy, &raw); err != nil { - return fmt.Errorf("forget: decode retention_policy: %w", err) - } - policy = restic.ForgetPolicy{ - KeepLast: raw.KeepLast, KeepHourly: raw.KeepHourly, - KeepDaily: raw.KeepDaily, KeepWeekly: raw.KeepWeekly, - KeepMonthly: raw.KeepMonthly, KeepYearly: raw.KeepYearly, - } + if len(p.ForgetGroups) == 0 { + return fmt.Errorf("forget: command.run carried no forget_groups (server didn't populate them)") } - slog.Info("agent: accepting forget job", "job_id", p.JobID, "policy", p.RetentionPolicy) + groups := make([]restic.ForgetGroup, 0, len(p.ForgetGroups)) + for _, g := range p.ForgetGroups { + groups = append(groups, restic.ForgetGroup{ + Tag: g.Tag, + Policy: restic.ForgetPolicy{ + KeepLast: g.Policy.KeepLast, + KeepHourly: g.Policy.KeepHourly, + KeepDaily: g.Policy.KeepDaily, + KeepWeekly: g.Policy.KeepWeekly, + KeepMonthly: g.Policy.KeepMonthly, + KeepYearly: g.Policy.KeepYearly, + }, + }) + } + slog.Info("agent: accepting forget job", "job_id", p.JobID, "groups", len(groups)) go func() { - if err := r.RunForget(ctx, p.JobID, policy); err != nil { + if err := r.RunForget(ctx, p.JobID, groups); err != nil { slog.Warn("agent: forget job failed", "job_id", p.JobID, "err", err) return } diff --git a/cmd/server/main.go b/cmd/server/main.go index 4059bb2..e325f4c 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,6 +16,7 @@ import ( "gitea.dcglab.co.uk/steve/restic-manager/internal/crypto" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/config" rmhttp "gitea.dcglab.co.uk/steve/restic-manager/internal/server/http" + "gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" @@ -139,6 +140,14 @@ func run() error { defer purgeTick.Stop() offlineTick := time.NewTicker(30 * time.Second) defer offlineTick.Stop() + // Maintenance ticker: drives forget/prune/check on the cadences + // operators set per-host. Independent of the agent's local cron + // (which only handles backup schedules). 60s cadence — the cron + // expressions are minute-grained, so anything finer is wasted + // work. + maintenanceTick := time.NewTicker(60 * time.Second) + defer maintenanceTick.Stop() + mt := maintenance.New(st) go func() { for { select { @@ -156,6 +165,16 @@ func run() error { if n, err := st.MarkHostsOfflineStale(ctx, cutoff); err == nil && n > 0 { slog.Info("marked hosts offline (stale heartbeat)", "n", n) } + case <-maintenanceTick.C: + decisions, err := mt.Decide(ctx, time.Now().UTC()) + if err != nil { + slog.Warn("maintenance ticker: decide", "err", err) + continue + } + if len(decisions) > 0 { + slog.Info("maintenance ticker: dispatching", "n", len(decisions)) + srv.DispatchMaintenance(ctx, decisions) + } } } }() diff --git a/internal/agent/runner/runner.go b/internal/agent/runner/runner.go index 4d02d9e..985380e 100644 --- a/internal/agent/runner/runner.go +++ b/internal/agent/runner/runner.go @@ -206,18 +206,20 @@ func (r *Runner) RunInit(ctx context.Context, jobID string) error { return nil } -// RunForget executes a forget job against the configured repo with -// the given retention policy. Same envelope shape as RunBackup so -// the live log viewer + job lifecycle work without special-casing. -// On success refreshes the snapshot projection (forget rewrites the -// snapshot index — the host's snapshot list shrinks). -func (r *Runner) RunForget(ctx context.Context, jobID string, policy restic.ForgetPolicy) error { +// RunForget executes a forget job against the configured repo by +// invoking `restic forget --tag --keep-* …` once per group. +// Same envelope shape as RunBackup so the live log viewer + job +// lifecycle work without special-casing. On success refreshes the +// snapshot projection (forget rewrites the snapshot index — the +// host's snapshot list shrinks). Snapshot refresh runs once after +// every group completes, not per-group. +func (r *Runner) RunForget(ctx context.Context, jobID string, groups []restic.ForgetGroup) error { startedAt := time.Now().UTC() r.sendStarted(jobID, api.JobForget, startedAt) env := r.resticEnv() var seq atomic.Int64 - err := env.RunForget(ctx, policy, r.streamHandler(jobID, &seq)) + err := env.RunForget(ctx, groups, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() r.sendFinished(jobID, finishedAt, err, nil) diff --git a/internal/agent/runner/runner_test.go b/internal/agent/runner/runner_test.go index 51ba294..a5a817f 100644 --- a/internal/agent/runner/runner_test.go +++ b/internal/agent/runner/runner_test.go @@ -333,8 +333,11 @@ esac tx := &fakeSender{} r := New(Config{ResticBin: bin}, tx, 0) keepLast := 1 - policy := restic.ForgetPolicy{KeepLast: &keepLast} - if err := r.RunForget(context.Background(), "job-forget", policy); err != nil { + groups := []restic.ForgetGroup{{ + Tag: "documents", + Policy: restic.ForgetPolicy{KeepLast: &keepLast}, + }} + if err := r.RunForget(context.Background(), "job-forget", groups); err != nil { t.Fatalf("RunForget: %v", err) } _ = firstEnvOfType(t, tx.envs, api.MsgJobStarted) diff --git a/internal/api/messages.go b/internal/api/messages.go index dd443da..816d203 100644 --- a/internal/api/messages.go +++ b/internal/api/messages.go @@ -77,6 +77,30 @@ const ( JobCancelled JobStatus = "cancelled" //nolint:misspell // wire format ) +// ForgetPolicyJSON is the wire shape of a per-group retention policy +// shipped with a forget command.run. Mirrors store.RetentionPolicy +// JSON tags exactly so a future caller could json-roundtrip between +// the two without reshaping. All fields nullable; an empty struct is +// rejected by the agent (restic refuses to forget without --keep-*). +type ForgetPolicyJSON struct { + KeepLast *int `json:"keep_last,omitempty"` + KeepHourly *int `json:"keep_hourly,omitempty"` + KeepDaily *int `json:"keep_daily,omitempty"` + KeepWeekly *int `json:"keep_weekly,omitempty"` + KeepMonthly *int `json:"keep_monthly,omitempty"` + KeepYearly *int `json:"keep_yearly,omitempty"` +} + +// ForgetGroup is one (tag, retention) pair shipped to the agent in a +// forget command.run. The agent invokes +// `restic forget --tag --keep-* …` once per group, with each +// group's own policy. The Tag is the source-group name (which is +// also the snapshot tag carried at backup time). +type ForgetGroup struct { + Tag string `json:"tag"` + Policy ForgetPolicyJSON `json:"policy"` +} + // CommandRunPayload is the server → agent dispatch for a run-now job. // // For kind=backup, Includes/Excludes/Tag are populated from the source @@ -85,25 +109,27 @@ const ( // the source group's name) so retention can target it later via // `restic forget --tag`. // -// For kind=forget, RetentionPolicy is the typed keep-* set as raw JSON -// (the agent doesn't share the store package's typed struct). +// For kind=forget, ForgetGroups carries one entry per source-group on +// the host that has a non-empty retention policy. The agent walks the +// list and runs `restic forget --tag --keep-* …` per group. // // Args is preserved as a generic free-form slice for kinds that don't -// fit the structured fields (e.g. unlock takes none; init takes none). +// fit the structured fields (e.g. unlock takes none; init takes none; +// check carries the subset% as Args[0]). // // RequiresAdminCreds tells the agent to load the admin slot of its // secrets store rather than the everyday repo slot. Set by the server -// only for prune and operator-triggered unlock (kinds that need delete -// authority on a rest-server repo). +// only for prune (the only kind that needs delete authority on a +// rest-server repo today). type CommandRunPayload struct { - JobID string `json:"job_id"` - Kind JobKind `json:"kind"` - Args []string `json:"args,omitempty"` - Includes []string `json:"includes,omitempty"` - Excludes []string `json:"excludes,omitempty"` - Tag string `json:"tag,omitempty"` - RetentionPolicy json.RawMessage `json:"retention_policy,omitempty"` - RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"` + JobID string `json:"job_id"` + Kind JobKind `json:"kind"` + Args []string `json:"args,omitempty"` + Includes []string `json:"includes,omitempty"` + Excludes []string `json:"excludes,omitempty"` + Tag string `json:"tag,omitempty"` + ForgetGroups []ForgetGroup `json:"forget_groups,omitempty"` + RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"` } // CommandCancelPayload is the server → agent cancel signal. diff --git a/internal/restic/runner.go b/internal/restic/runner.go index b54ac71..51675ff 100644 --- a/internal/restic/runner.go +++ b/internal/restic/runner.go @@ -151,8 +151,7 @@ func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, hand } // ForgetPolicy mirrors restic forget's --keep-* flags. All optional; -// nil/zero means "don't pass that flag." Caller passes whatever the -// schedule's RetentionPolicy carries. +// nil/zero means "don't pass that flag." type ForgetPolicy struct { KeepLast *int KeepHourly *int @@ -181,30 +180,49 @@ func (p ForgetPolicy) args() []string { return out } -// Empty reports whether no retention dimensions are set. restic -// forget refuses to run without at least one keep-* flag (it would -// delete every snapshot), so the agent rejects empty policies before -// invoking restic. +// Empty reports whether no retention dimensions are set. func (p ForgetPolicy) Empty() bool { return p.KeepLast == nil && p.KeepHourly == nil && p.KeepDaily == nil && p.KeepWeekly == nil && p.KeepMonthly == nil && p.KeepYearly == nil } -// RunForget executes `restic forget --keep-* … --json` against the -// configured repo. Does NOT pass --prune — pruning lives behind a -// separate, admin-only credential (see spec §4.3 / P2-06). Restic -// just rewrites the snapshot index; the actual data deletion waits -// for the next prune. Returns nil on a clean exit. -func (e Env) RunForget(ctx context.Context, policy ForgetPolicy, handle LineHandler) error { - if policy.Empty() { - return fmt.Errorf("restic forget: refusing to run with empty retention policy (would delete every snapshot)") +// ForgetGroup is one (tag, retention-policy) pair fed to RunForget. +// The wrapper invokes `restic forget --tag --keep-* …` per +// group so retention can be targeted at a single source-group's +// snapshots without disturbing snapshots tagged for other groups. +type ForgetGroup struct { + Tag string + Policy ForgetPolicy +} + +// RunForget executes one `restic forget --tag --keep-* …` +// invocation per group. Does NOT pass --prune — pruning lives behind +// a separate admin-only credential (see spec §4.3 / P2-06). Restic +// rewrites the snapshot index; the actual data deletion waits for +// the next prune. Empty groups slice is rejected (would be a no-op); +// any group with an empty policy is rejected (restic forget without +// any keep-* would delete every snapshot in the tagged set). +// Returns the first error encountered, or nil when every group runs +// to a clean exit. +func (e Env) RunForget(ctx context.Context, groups []ForgetGroup, handle LineHandler) error { + if len(groups) == 0 { + return fmt.Errorf("restic forget: refusing to run with no groups (would be a no-op)") } - args := append([]string{"forget", "--json"}, policy.args()...) - cmd := exec.CommandContext(ctx, e.Bin, args...) - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir - return runWithPump(cmd, handle) + for _, g := range groups { + if g.Policy.Empty() { + return fmt.Errorf("restic forget: group %q has empty retention policy (would delete every snapshot)", g.Tag) + } + args := []string{"forget", "--json", "--tag", g.Tag} + args = append(args, g.Policy.args()...) + cmd := exec.CommandContext(ctx, e.Bin, args...) + cmd.Env = e.envSlice() + cmd.Dir = e.WorkDir + if err := runWithPump(cmd, handle); err != nil { + return err + } + } + return nil } // RunInit executes `restic init` against the configured repo. Returns diff --git a/internal/server/http/maintenance_dispatch.go b/internal/server/http/maintenance_dispatch.go new file mode 100644 index 0000000..598533f --- /dev/null +++ b/internal/server/http/maintenance_dispatch.go @@ -0,0 +1,132 @@ +// maintenance_dispatch.go bridges the pure-logic maintenance.Ticker +// (internal/server/maintenance) to the side-effecting world: checks +// online state, builds the per-kind command.run payload, and calls +// dispatchJobWithPayload — the same path operator-triggered Run-now +// uses. Cadence-driven jobs are persisted with actor_kind="system" +// (dispatchJobWithPayload tags it that way when user==nil). +// +// Maintenance fires deliberately do NOT queue to pending_runs when +// the host is offline — five missed prunes on a laptop returning +// from a week away is not what the operator wants. Skip + log; the +// next 60s tick will re-evaluate. +package http + +import ( + "context" + "errors" + "log/slog" + "strconv" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// DispatchMaintenance acts on each Decision from the ticker. Offline +// hosts are skipped (logged); prune dispatches without admin creds +// are skipped silently (logged) — the operator hasn't completed the +// admin-creds setup yet, and re-trying every minute would just spam +// the logs. (Operator-triggered prune via the run-now endpoint +// returns a clear error instead — different path, different UX.) +func (s *Server) DispatchMaintenance(ctx context.Context, decisions []maintenance.Decision) { + for _, d := range decisions { + if !s.deps.Hub.Connected(d.HostID) { + slog.Info("maintenance: host offline, skipping", + "host_id", d.HostID, "kind", d.Kind) + continue + } + switch d.Kind { + case "forget": + payload, ok := s.buildForgetPayloadForHost(ctx, d.HostID) + if !ok { + slog.Info("maintenance: forget skipped — no source groups with retention", + "host_id", d.HostID) + continue + } + _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobForget, payload) + if code != "" { + slog.Warn("maintenance: forget dispatch failed", + "host_id", d.HostID, "code", code, "msg", msg) + } + case "prune": + if _, err := s.deps.Store.GetHostCredentials(ctx, d.HostID, store.CredKindAdmin); err != nil { + if errors.Is(err, store.ErrNotFound) { + slog.Info("maintenance: prune skipped — no admin creds", + "host_id", d.HostID) + continue + } + slog.Warn("maintenance: prune skipped — admin creds error", + "host_id", d.HostID, "err", err) + continue + } + if err := s.pushAdminCredsToAgent(ctx, d.HostID); err != nil { + slog.Warn("maintenance: prune push admin creds failed", + "host_id", d.HostID, "err", err) + continue + } + payload := api.CommandRunPayload{RequiresAdminCreds: true} + _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobPrune, payload) + if code != "" { + slog.Warn("maintenance: prune dispatch failed", + "host_id", d.HostID, "code", code, "msg", msg) + } + case "check": + payload := api.CommandRunPayload{Args: []string{strconv.Itoa(d.SubsetPct)}} + _, _, code, msg := s.dispatchJobWithPayload(ctx, nil, d.HostID, api.JobCheck, payload) + if code != "" { + slog.Warn("maintenance: check dispatch failed", + "host_id", d.HostID, "code", code, "msg", msg) + } + default: + slog.Warn("maintenance: unknown decision kind", + "host_id", d.HostID, "kind", d.Kind) + } + } +} + +// buildForgetPayloadForHost collects every source group on the host +// that has a non-empty retention policy and builds a CommandRunPayload +// with ForgetGroups populated. Returns ok=false if the host has no +// such groups (the dispatcher then skips this kind). +func (s *Server) buildForgetPayloadForHost(ctx context.Context, hostID string) (api.CommandRunPayload, bool) { + groups, err := s.deps.Store.ListSourceGroupsByHost(ctx, hostID) + if err != nil { + slog.Warn("maintenance: list source groups failed", "host_id", hostID, "err", err) + return api.CommandRunPayload{}, false + } + fg := make([]api.ForgetGroup, 0, len(groups)) + for _, g := range groups { + if isEmptyRetention(g.RetentionPolicy) { + continue + } + fg = append(fg, api.ForgetGroup{ + Tag: g.Name, + Policy: forgetPolicyJSONFromStore(g.RetentionPolicy), + }) + } + if len(fg) == 0 { + return api.CommandRunPayload{}, false + } + return api.CommandRunPayload{ForgetGroups: fg}, true +} + +func isEmptyRetention(p store.RetentionPolicy) bool { + return p.KeepLast == nil && p.KeepHourly == nil && + p.KeepDaily == nil && p.KeepWeekly == nil && + p.KeepMonthly == nil && p.KeepYearly == nil +} + +// forgetPolicyJSONFromStore copies retention pointers from the store +// view to the wire view. Both shapes are field-for-field identical; +// this avoids importing store from internal/api (which would invert +// the dependency direction). +func forgetPolicyJSONFromStore(p store.RetentionPolicy) api.ForgetPolicyJSON { + return api.ForgetPolicyJSON{ + KeepLast: p.KeepLast, + KeepHourly: p.KeepHourly, + KeepDaily: p.KeepDaily, + KeepWeekly: p.KeepWeekly, + KeepMonthly: p.KeepMonthly, + KeepYearly: p.KeepYearly, + } +} diff --git a/internal/server/http/maintenance_dispatch_test.go b/internal/server/http/maintenance_dispatch_test.go new file mode 100644 index 0000000..33a1f4a --- /dev/null +++ b/internal/server/http/maintenance_dispatch_test.go @@ -0,0 +1,300 @@ +// maintenance_dispatch_test.go — exercises Server.DispatchMaintenance +// directly (one Decision at a time). Reuses the same fake-agent +// harness as p2r01_ws_test / repo_ops_test: a real Server with a +// real Hub, plus a websocket connected as the host. We then push +// Decisions through DispatchMaintenance and assert the envelopes +// the agent receives + the job rows that land. +package http + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/coder/websocket" + "github.com/oklog/ulid/v2" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// readNextCommandRun pulls envelopes until a command.run lands or the +// deadline passes. Returns nil if the deadline is hit. +func readNextCommandRun(t *testing.T, c *websocket.Conn, deadline time.Time) *api.CommandRunPayload { + t.Helper() + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond) + mt, raw, err := c.Read(ctx) + cancel() + if err != nil { + return nil + } + if mt != websocket.MessageText { + continue + } + var env api.Envelope + if err := json.Unmarshal(raw, &env); err != nil { + continue + } + if env.Type != api.MsgCommandRun { + continue + } + var p api.CommandRunPayload + if err := env.UnmarshalPayload(&p); err != nil { + continue + } + return &p + } + return nil +} + +// TestDispatchMaintenanceSkipsOfflineHosts: host not connected → no +// envelope, no job row. +func TestDispatchMaintenanceSkipsOfflineHosts(t *testing.T) { + t.Parallel() + srv, _, st := rawTestServer(t) + hostID, _ := enrolHostForWS(t, srv, st, "offline-host") + + srv.DispatchMaintenance(context.Background(), []maintenance.Decision{ + {HostID: hostID, Kind: "check", SubsetPct: 10}, + }) + + var n int + if err := st.DB().QueryRow( + `SELECT COUNT(*) FROM jobs WHERE host_id = ?`, hostID).Scan(&n); err != nil { + t.Fatalf("count: %v", err) + } + if n != 0 { + t.Errorf("offline host produced %d job rows; want 0", n) + } +} + +// TestDispatchMaintenanceForgetShipsForgetGroups: connected host with +// two source groups (one with retention, one without). Decision of +// kind=forget → command.run with ForgetGroups containing only the +// group that had retention. +func TestDispatchMaintenanceForgetShipsForgetGroups(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "forget-host") + seedInitJob(t, st, hostID) + + keep := 7 + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: ulid.Make().String(), HostID: hostID, Name: "documents", + Includes: []string{"/home/documents"}, + RetentionPolicy: store.RetentionPolicy{KeepLast: &keep}, + }); err != nil { + t.Fatalf("group docs: %v", err) + } + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: ulid.Make().String(), HostID: hostID, Name: "ephemeral", + Includes: []string{"/tmp"}, + }); err != nil { + t.Fatalf("group eph: %v", err) + } + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "forget-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.DispatchMaintenance(context.Background(), []maintenance.Decision{ + {HostID: hostID, Kind: "forget"}, + }) + + got := readNextCommandRun(t, c, time.Now().Add(2*time.Second)) + if got == nil { + t.Fatal("no command.run received") + } + if got.Kind != api.JobForget { + t.Errorf("kind: got %q, want %q", got.Kind, api.JobForget) + } + if len(got.ForgetGroups) != 1 { + t.Fatalf("ForgetGroups: got %d entries (%+v), want 1", len(got.ForgetGroups), got.ForgetGroups) + } + if got.ForgetGroups[0].Tag != "documents" { + t.Errorf("forget group tag: got %q, want %q", got.ForgetGroups[0].Tag, "documents") + } + if got.ForgetGroups[0].Policy.KeepLast == nil || *got.ForgetGroups[0].Policy.KeepLast != 7 { + t.Errorf("forget group policy: got %+v", got.ForgetGroups[0].Policy) + } + + // Job row must be persisted with actor_kind=system. + var actor string + if err := st.DB().QueryRow( + `SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'forget'`, hostID).Scan(&actor); err != nil { + t.Fatalf("query actor_kind: %v", err) + } + if actor != "system" { + t.Errorf("actor_kind: got %q, want system", actor) + } +} + +// TestDispatchMaintenanceForgetSkipsHostWithNoRetention: connected +// host, but every source group has empty retention → no envelope. +func TestDispatchMaintenanceForgetSkipsHostWithNoRetention(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "no-ret-host") + seedInitJob(t, st, hostID) + if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{ + ID: ulid.Make().String(), HostID: hostID, Name: "ephemeral", + Includes: []string{"/tmp"}, + }); err != nil { + t.Fatalf("group: %v", err) + } + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "no-ret-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.DispatchMaintenance(context.Background(), []maintenance.Decision{ + {HostID: hostID, Kind: "forget"}, + }) + + if got := readNextCommandRun(t, c, time.Now().Add(800*time.Millisecond)); got != nil { + t.Errorf("unexpected command.run: %+v", got) + } + var n int + _ = st.DB().QueryRow(`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'forget'`, hostID).Scan(&n) + if n != 0 { + t.Errorf("forget job rows: got %d, want 0", n) + } +} + +// TestDispatchMaintenancePruneSkipsWithoutAdminCreds: no admin creds +// row → no envelope, no job row, silent skip. +func TestDispatchMaintenancePruneSkipsWithoutAdminCreds(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "no-admin-host") + seedInitJob(t, st, hostID) + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "no-admin-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.DispatchMaintenance(context.Background(), []maintenance.Decision{ + {HostID: hostID, Kind: "prune"}, + }) + + if got := readNextCommandRun(t, c, time.Now().Add(800*time.Millisecond)); got != nil { + t.Errorf("unexpected command.run: %+v", got) + } + var n int + _ = st.DB().QueryRow(`SELECT COUNT(*) FROM jobs WHERE host_id = ? AND kind = 'prune'`, hostID).Scan(&n) + if n != 0 { + t.Errorf("prune job rows: got %d, want 0", n) + } +} + +// TestDispatchMaintenancePruneShipsConfigUpdateThenCommandRun: with +// admin creds set, prune dispatch must push admin config.update first +// then command.run(prune, RequiresAdminCreds=true). +func TestDispatchMaintenancePruneShipsConfigUpdateThenCommandRun(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "prune-mt-host") + setAdminCreds(t, srv, st, hostID) + seedInitJob(t, st, hostID) + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "prune-mt-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.DispatchMaintenance(context.Background(), []maintenance.Decision{ + {HostID: hostID, Kind: "prune"}, + }) + + // Read until we've seen both config.update(slot=admin) and the + // prune command.run. + deadline := time.Now().Add(3 * time.Second) + var sawAdminPush bool + var prunePayload *api.CommandRunPayload + for prunePayload == nil && time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond) + mt, raw, err := c.Read(ctx) + cancel() + if err != nil { + break + } + if mt != websocket.MessageText { + continue + } + var env api.Envelope + if err := json.Unmarshal(raw, &env); err != nil { + continue + } + switch env.Type { + case api.MsgConfigUpdate: + var p api.ConfigUpdatePayload + if err := env.UnmarshalPayload(&p); err == nil && p.Slot == "admin" { + sawAdminPush = true + } + case api.MsgCommandRun: + var p api.CommandRunPayload + if err := env.UnmarshalPayload(&p); err == nil && p.Kind == api.JobPrune { + cp := p + prunePayload = &cp + } + } + } + if !sawAdminPush { + t.Error("expected config.update(slot=admin) before prune dispatch") + } + if prunePayload == nil { + t.Fatal("timed out waiting for command.run(prune)") + } + if !prunePayload.RequiresAdminCreds { + t.Error("prune command.run must have RequiresAdminCreds=true") + } + + // Persisted job must be system actor. + var actor string + if err := st.DB().QueryRow( + `SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'prune'`, hostID).Scan(&actor); err != nil { + t.Fatalf("query actor_kind: %v", err) + } + if actor != "system" { + t.Errorf("actor_kind: got %q, want system", actor) + } +} + +// TestDispatchMaintenanceCheckCarriesSubset: Decision SubsetPct=15 → +// command.run.Args == ["15"]. Job row actor_kind=system. +func TestDispatchMaintenanceCheckCarriesSubset(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "check-mt-host") + seedInitJob(t, st, hostID) + + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "check-mt-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + srv.DispatchMaintenance(context.Background(), []maintenance.Decision{ + {HostID: hostID, Kind: "check", SubsetPct: 15}, + }) + + got := readNextCommandRun(t, c, time.Now().Add(2*time.Second)) + if got == nil { + t.Fatal("no command.run received") + } + if got.Kind != api.JobCheck { + t.Errorf("kind: got %q, want %q", got.Kind, api.JobCheck) + } + if len(got.Args) != 1 || got.Args[0] != "15" { + t.Errorf("Args: got %+v, want [15]", got.Args) + } + + var actor string + if err := st.DB().QueryRow( + `SELECT actor_kind FROM jobs WHERE host_id = ? AND kind = 'check'`, hostID).Scan(&actor); err != nil { + t.Fatalf("query actor_kind: %v", err) + } + if actor != "system" { + t.Errorf("actor_kind: got %q, want system", actor) + } +}