diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d401640..d6c3d8b 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" @@ -170,6 +171,14 @@ type dispatcher struct { resticBin string secrets *secrets.Store scheduler *scheduler.Scheduler + + // Bandwidth caps in KB/s pushed via config.update. Mutated under + // bwMu by the config.update handler; read by runJob when building + // the runner. <=0 means "no cap" (do not pass --limit-* to restic). + // Per-job overrides on CommandRunPayload take precedence. + bwMu sync.Mutex + bwUpKBps int + bwDownKBps int } func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error { @@ -263,6 +272,24 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot) } + // Bandwidth caps ride independently of the slot — they're host- + // wide and apply to every restic invocation regardless of which + // credentials slot the job uses. nil pointer = no change in this + // push; non-nil = set to that value (≤0 clears the cap). + if p.BandwidthUpKBps != nil || p.BandwidthDownKBps != nil { + d.bwMu.Lock() + if p.BandwidthUpKBps != nil { + d.bwUpKBps = *p.BandwidthUpKBps + } + if p.BandwidthDownKBps != nil { + d.bwDownKBps = *p.BandwidthDownKBps + } + up, down := d.bwUpKBps, d.bwDownKBps + d.bwMu.Unlock() + slog.Info("ws agent: bandwidth caps updated", + "up_kbps", up, "down_kbps", down) + } + case api.MsgAgentUpdateAvail: var p api.AgentUpdateAvailablePayload _ = env.UnmarshalPayload(&p) @@ -295,11 +322,25 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc // not on r). If you find yourself adding a new JobKind that // needs delete authority, mirror the JobPrune pattern below // — don't try to overload r. + // Resolve bandwidth caps: per-job override (if set) wins over the + // host-wide caps last pushed via config.update. <=0 means no cap. + d.bwMu.Lock() + upKBps, downKBps := d.bwUpKBps, d.bwDownKBps + d.bwMu.Unlock() + if p.BandwidthUpKBps != nil { + upKBps = *p.BandwidthUpKBps + } + if p.BandwidthDownKBps != nil { + downKBps = *p.BandwidthDownKBps + } + r := runner.New(runner.Config{ - ResticBin: d.resticBin, - RepoURL: creds.URL, - RepoUsername: creds.Username, - RepoPassword: creds.Password, + ResticBin: d.resticBin, + RepoURL: creds.URL, + RepoUsername: creds.Username, + RepoPassword: creds.Password, + LimitUploadKBps: upKBps, + LimitDownloadKBps: downKBps, }, tx, time.Second) switch p.Kind { @@ -381,10 +422,12 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc runCreds = ac } prr := runner.New(runner.Config{ - ResticBin: d.resticBin, - RepoURL: runCreds.URL, - RepoUsername: runCreds.Username, - RepoPassword: runCreds.Password, + ResticBin: d.resticBin, + RepoURL: runCreds.URL, + RepoUsername: runCreds.Username, + RepoPassword: runCreds.Password, + LimitUploadKBps: upKBps, + LimitDownloadKBps: downKBps, }, tx, time.Second) slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds) go func() { diff --git a/internal/agent/runner/runner.go b/internal/agent/runner/runner.go index 985380e..617c8a6 100644 --- a/internal/agent/runner/runner.go +++ b/internal/agent/runner/runner.go @@ -30,6 +30,12 @@ type Config struct { RepoURL string RepoUsername string RepoPassword string + + // Bandwidth caps in KB/s applied to every restic invocation. + // <=0 means "no cap". Per-job override: callers that build a + // runner per-dispatch can pass the override value here directly. + LimitUploadKBps int + LimitDownloadKBps int } // Runner owns the restic invocations. @@ -54,10 +60,12 @@ func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner { // resticEnv builds the shared restic.Env from r.cfg. func (r *Runner) resticEnv() restic.Env { return restic.Env{ - Bin: r.cfg.ResticBin, - RepoURL: r.cfg.RepoURL, - RepoUsername: r.cfg.RepoUsername, - RepoPassword: r.cfg.RepoPassword, + Bin: r.cfg.ResticBin, + RepoURL: r.cfg.RepoURL, + RepoUsername: r.cfg.RepoUsername, + RepoPassword: r.cfg.RepoPassword, + LimitUploadKBps: r.cfg.LimitUploadKBps, + LimitDownloadKBps: r.cfg.LimitDownloadKBps, } } diff --git a/internal/api/messages.go b/internal/api/messages.go index 816d203..ce43bc3 100644 --- a/internal/api/messages.go +++ b/internal/api/messages.go @@ -130,6 +130,19 @@ type CommandRunPayload struct { Tag string `json:"tag,omitempty"` ForgetGroups []ForgetGroup `json:"forget_groups,omitempty"` RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"` + + // Per-job bandwidth caps in KB/s. When nil, the agent uses the + // host-wide caps it received via config.update. When non-nil, + // the override wins for this job only — even a non-nil zero + // pointer means "no cap for this job" (caller's explicit choice). + BandwidthUpKBps *int `json:"bandwidth_up_kbps,omitempty"` + BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"` + + // Hooks run only for kind=backup. Server resolves source-group + // hook → host default → empty before dispatching, so the agent + // just executes whatever is here. + PreHook string `json:"pre_hook,omitempty"` + PostHook string `json:"post_hook,omitempty"` } // CommandCancelPayload is the server → agent cancel signal. @@ -306,6 +319,14 @@ type ConfigUpdatePayload struct { RepoCredential string `json:"repo_credential,omitempty"` // sensitive (for rest server basic auth) HookShell string `json:"hook_shell,omitempty"` Slot string `json:"slot,omitempty"` + + // Bandwidth caps in KB/s. Pointer semantics so the server can + // disambiguate "no change in this push" (nil → omitted on the + // wire) from "explicitly clear the cap" (zero or negative value). + // Applied to every restic invocation as --limit-upload / + // --limit-download. Per-job overrides ride on CommandRunPayload. + BandwidthUpKBps *int `json:"bandwidth_up_kbps,omitempty"` + BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"` } // AgentUpdateAvailablePayload — informational only; the agent does diff --git a/internal/restic/runner.go b/internal/restic/runner.go index 51675ff..6104e7a 100644 --- a/internal/restic/runner.go +++ b/internal/restic/runner.go @@ -47,6 +47,37 @@ type Env struct { RepoPassword string // doubles as RESTIC_PASSWORD and (for rest:) HTTP basic-auth password ExtraEnv map[string]string // any other RESTIC_* / passthrough WorkDir string // CWD; default = current + + // Bandwidth caps in KB/s. <=0 means "no cap" (omit the flag). + // Emitted as restic global flags --limit-upload / --limit-download + // before the subcommand on every invocation. + LimitUploadKBps int + LimitDownloadKBps int +} + +// globalArgs returns restic's pre-subcommand global flags derived +// from the Env. Currently just bandwidth caps. +func (e Env) globalArgs() []string { + var out []string + if e.LimitUploadKBps > 0 { + out = append(out, "--limit-upload", fmt.Sprintf("%d", e.LimitUploadKBps)) + } + if e.LimitDownloadKBps > 0 { + out = append(out, "--limit-download", fmt.Sprintf("%d", e.LimitDownloadKBps)) + } + return out +} + +// resticCmd builds an exec.Cmd with bandwidth-limit globals prefixed +// before the supplied subcommand args. Centralizing this so every +// command (backup/forget/prune/check/unlock/init/stats) honors +// the caps without each call site having to remember. +func (e Env) resticCmd(ctx context.Context, sub ...string) *exec.Cmd { + args := append(e.globalArgs(), sub...) + cmd := exec.CommandContext(ctx, e.Bin, args...) + cmd.Env = e.envSlice() + cmd.Dir = e.WorkDir + return cmd } // EventKind enumerates what we care about in restic's --json output @@ -110,9 +141,7 @@ func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, hand } args = append(args, paths...) - cmd := exec.CommandContext(ctx, e.Bin, args...) - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir + cmd := e.resticCmd(ctx, args...) stdout, err := cmd.StdoutPipe() if err != nil { @@ -215,9 +244,7 @@ func (e Env) RunForget(ctx context.Context, groups []ForgetGroup, handle LineHan } args := []string{"forget", "--json", "--tag", g.Tag} args = append(args, g.Policy.args()...) - cmd := exec.CommandContext(ctx, e.Bin, args...) - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir + cmd := e.resticCmd(ctx, args...) if err := runWithPump(cmd, handle); err != nil { return err } @@ -232,9 +259,7 @@ func (e Env) RunForget(ctx context.Context, groups []ForgetGroup, handle LineHan // at " on success, "config file already exists" on a // re-init attempt, etc.). func (e Env) RunInit(ctx context.Context, handle LineHandler) error { - cmd := exec.CommandContext(ctx, e.Bin, "init") - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir + cmd := e.resticCmd(ctx, "init") // Sniff for "config file already exists" on stderr; if we see it // we'll treat the non-zero exit as a soft success — running init @@ -272,10 +297,7 @@ func (e Env) RunInit(ctx context.Context, handle LineHandler) error { // support that's useful for our purposes). We tee everything to the // handler so the live log is the operator's progress bar. func (e Env) RunPrune(ctx context.Context, handle LineHandler) error { - cmd := exec.CommandContext(ctx, e.Bin, "prune") - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir - return runWithPump(cmd, handle) + return runWithPump(e.resticCmd(ctx, "prune"), handle) } // runWithPump starts the configured cmd, fans stdout+stderr into @@ -313,10 +335,7 @@ func runWithPump(cmd *exec.Cmd, handle LineHandler) error { // RunUnlock executes `restic unlock`. Returns nil on a clean exit. func (e Env) RunUnlock(ctx context.Context, handle LineHandler) error { - cmd := exec.CommandContext(ctx, e.Bin, "unlock") - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir - return runWithPump(cmd, handle) + return runWithPump(e.resticCmd(ctx, "unlock"), handle) } // RepoStats mirrors `restic stats --json --mode raw-data` output. @@ -333,9 +352,7 @@ type RepoStats struct { // caller can still log it. Returns an error if no JSON-shaped line // arrived on stdout. func (e Env) RunStats(ctx context.Context, handle LineHandler) (*RepoStats, error) { - cmd := exec.CommandContext(ctx, e.Bin, "stats", "--json", "--mode", "raw-data") - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir + cmd := e.resticCmd(ctx, "stats", "--json", "--mode", "raw-data") var out *RepoStats capture := func(stream, line string, ev any) { if stream == "stdout" && strings.HasPrefix(line, "{") { @@ -378,9 +395,7 @@ func (e Env) RunCheck(ctx context.Context, subsetPct int, handle LineHandler) (C if subsetPct > 0 { args = append(args, "--read-data-subset", fmt.Sprintf("%d%%", subsetPct)) } - cmd := exec.CommandContext(ctx, e.Bin, args...) - cmd.Env = e.envSlice() - cmd.Dir = e.WorkDir + cmd := e.resticCmd(ctx, args...) var res CheckResult sniff := func(stream, line string, ev any) { diff --git a/internal/restic/runner_test.go b/internal/restic/runner_test.go index a2d6708..698a122 100644 --- a/internal/restic/runner_test.go +++ b/internal/restic/runner_test.go @@ -174,6 +174,43 @@ func TestRunStatsErrorsWithoutJSON(t *testing.T) { } } +func TestBandwidthLimitFlagsInjected(t *testing.T) { + // Script echoes its argv to stdout. Each variant should produce + // the right --limit-* flags before the subcommand. + cases := []struct { + name string + env Env + want []string + }{ + {"both caps", Env{LimitUploadKBps: 1024, LimitDownloadKBps: 512}, []string{"--limit-upload 1024", "--limit-download 512"}}, + {"only upload", Env{LimitUploadKBps: 256}, []string{"--limit-upload 256"}}, + {"zero means omit", Env{LimitUploadKBps: 0, LimitDownloadKBps: 0}, nil}, + {"negative means omit", Env{LimitUploadKBps: -1}, nil}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + bin := setupScriptBin(t, `echo "$@"`) + env := c.env + env.Bin = bin + lines, h := captureLines() + if err := env.RunUnlock(context.Background(), h); err != nil { + t.Fatalf("RunUnlock: %v", err) + } + joined := strings.Join(*lines, "\n") + for _, want := range c.want { + if !strings.Contains(joined, want) { + t.Fatalf("want %q in argv; got: %s", want, joined) + } + } + if len(c.want) == 0 { + if strings.Contains(joined, "--limit-upload") || strings.Contains(joined, "--limit-download") { + t.Fatalf("expected no limit flags; got: %s", joined) + } + } + }) + } +} + func TestRunStatsZeroSnapshots(t *testing.T) { // Confirms RunStats succeeds and returns a valid *RepoStats when the // repo has no snapshots (snapshots_count=0). A regression that diff --git a/internal/server/http/host_bandwidth.go b/internal/server/http/host_bandwidth.go index e42996b..8165a09 100644 --- a/internal/server/http/host_bandwidth.go +++ b/internal/server/http/host_bandwidth.go @@ -58,5 +58,10 @@ func (s *Server) handleUpdateHostBandwidth(w stdhttp.ResponseWriter, r *stdhttp. writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } + // Fan out to the agent if connected. Errors are non-fatal — the + // next reconnect's onAgentHello will resync. + if s.deps.Hub != nil && s.deps.Hub.Connected(hostID) { + _ = s.pushBandwidthToAgent(r.Context(), hostID, req.BandwidthUpKBps, req.BandwidthDownKBps) + } writeJSON(w, stdhttp.StatusOK, hostBandwidthView(req)) } diff --git a/internal/server/http/host_bandwidth_push.go b/internal/server/http/host_bandwidth_push.go new file mode 100644 index 0000000..2cfa7fd --- /dev/null +++ b/internal/server/http/host_bandwidth_push.go @@ -0,0 +1,78 @@ +// host_bandwidth_push.go — server → agent fan-out of host-wide +// bandwidth caps via config.update. +// +// Two entry points: pushBandwidthOnHello (called from onAgentHello, +// always pushes the current state so the agent picks up edits made +// while it was offline) and pushBandwidthToAgent (called after the +// PUT bandwidth handler succeeds, so an online agent re-arms within +// seconds). +// +// We always send pointer fields (zero-valued when uncapped) so the +// agent can distinguish "no change" (nil → field absent on the wire) +// from "explicitly cleared" (non-nil zero pointer). See +// api.ConfigUpdatePayload doc for the wire semantics. +package http + +import ( + "context" + "log/slog" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws" +) + +// pushBandwidthOnHello ships the host's current bandwidth caps as a +// config.update on the supplied conn. Silent no-op on lookup error. +func (s *Server) pushBandwidthOnHello(ctx context.Context, hostID string, conn *ws.Conn) { + host, err := s.deps.Store.GetHost(ctx, hostID) + if err != nil { + slog.Warn("on-hello: load host for bandwidth", "host_id", hostID, "err", err) + return + } + payload := bandwidthPayload(host.BandwidthUpKBps, host.BandwidthDownKBps) + env, err := api.Marshal(api.MsgConfigUpdate, "", payload) + if err != nil { + slog.Error("on-hello: marshal bandwidth config.update", "host_id", hostID, "err", err) + return + } + sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := conn.Send(sendCtx, env); err != nil { + slog.Warn("on-hello: send bandwidth config.update", "host_id", hostID, "err", err) + } +} + +// pushBandwidthToAgent ships the supplied caps via the hub. Caller is +// expected to check Hub.Connected first when it matters. +func (s *Server) pushBandwidthToAgent(ctx context.Context, hostID string, up, down *int) error { + env, err := api.Marshal(api.MsgConfigUpdate, "", bandwidthPayload(up, down)) + if err != nil { + return err + } + sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + return s.deps.Hub.Send(sendCtx, hostID, env) +} + +// bandwidthPayload builds a ConfigUpdatePayload with only the +// bandwidth fields populated. Pointers are passed through verbatim; +// callers wanting to clear a cap should pass a non-nil pointer to 0. +// On the on-hello path we materialize zero-valued pointers when the +// host record has no cap set, so the agent's stored state is always +// in sync (rather than retaining whatever value it last received). +func bandwidthPayload(up, down *int) api.ConfigUpdatePayload { + zero := 0 + upPtr := up + if upPtr == nil { + upPtr = &zero + } + downPtr := down + if downPtr == nil { + downPtr = &zero + } + return api.ConfigUpdatePayload{ + BandwidthUpKBps: upPtr, + BandwidthDownKBps: downPtr, + } +} diff --git a/internal/server/http/host_credentials.go b/internal/server/http/host_credentials.go index 0060de3..c414eba 100644 --- a/internal/server/http/host_credentials.go +++ b/internal/server/http/host_credentials.go @@ -399,6 +399,10 @@ func (s *Server) pushAdminCredsToAgent(ctx context.Context, hostID string) error // don't race a brand-new register against an old still-closing conn. func (s *Server) onAgentHello(ctx context.Context, hostID string, conn *ws.Conn) { s.pushRepoCredsOnHello(ctx, hostID, conn) + // Bandwidth caps are sent unconditionally so an agent that + // reconnects after a cap edit picks up the new state without + // waiting for the next bandwidth PUT. + s.pushBandwidthOnHello(ctx, hostID, conn) // Push the current schedule set in the same on-hello window so // the agent's local cron is in sync before any command.run lands. // An empty schedule list is a valid push: it tells the agent to