agent+server: apply host bandwidth caps to restic invocations
P2R-13a. restic.Env gains LimitUploadKBps/LimitDownloadKBps which are
emitted as global --limit-upload/--limit-download flags before the
subcommand on every invocation. Agent dispatcher tracks host-wide
caps received via config.update; server pushes them on hello and
after PUT /api/hosts/{id}/bandwidth.
Also extends api.CommandRunPayload with optional per-job overrides
(BandwidthUpKBps/Down + PreHook/PostHook); the override consumers
land in T2/T6.
This commit is contained in:
+51
-8
@@ -9,6 +9,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -170,6 +171,14 @@ type dispatcher struct {
|
|||||||
resticBin string
|
resticBin string
|
||||||
secrets *secrets.Store
|
secrets *secrets.Store
|
||||||
scheduler *scheduler.Scheduler
|
scheduler *scheduler.Scheduler
|
||||||
|
|
||||||
|
// Bandwidth caps in KB/s pushed via config.update. Mutated under
|
||||||
|
// bwMu by the config.update handler; read by runJob when building
|
||||||
|
// the runner. <=0 means "no cap" (do not pass --limit-* to restic).
|
||||||
|
// Per-job overrides on CommandRunPayload take precedence.
|
||||||
|
bwMu sync.Mutex
|
||||||
|
bwUpKBps int
|
||||||
|
bwDownKBps int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error {
|
func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error {
|
||||||
@@ -263,6 +272,24 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S
|
|||||||
slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot)
|
slog.Warn("ws agent: unknown config.update slot, ignoring", "slot", p.Slot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Bandwidth caps ride independently of the slot — they're host-
|
||||||
|
// wide and apply to every restic invocation regardless of which
|
||||||
|
// credentials slot the job uses. nil pointer = no change in this
|
||||||
|
// push; non-nil = set to that value (≤0 clears the cap).
|
||||||
|
if p.BandwidthUpKBps != nil || p.BandwidthDownKBps != nil {
|
||||||
|
d.bwMu.Lock()
|
||||||
|
if p.BandwidthUpKBps != nil {
|
||||||
|
d.bwUpKBps = *p.BandwidthUpKBps
|
||||||
|
}
|
||||||
|
if p.BandwidthDownKBps != nil {
|
||||||
|
d.bwDownKBps = *p.BandwidthDownKBps
|
||||||
|
}
|
||||||
|
up, down := d.bwUpKBps, d.bwDownKBps
|
||||||
|
d.bwMu.Unlock()
|
||||||
|
slog.Info("ws agent: bandwidth caps updated",
|
||||||
|
"up_kbps", up, "down_kbps", down)
|
||||||
|
}
|
||||||
|
|
||||||
case api.MsgAgentUpdateAvail:
|
case api.MsgAgentUpdateAvail:
|
||||||
var p api.AgentUpdateAvailablePayload
|
var p api.AgentUpdateAvailablePayload
|
||||||
_ = env.UnmarshalPayload(&p)
|
_ = env.UnmarshalPayload(&p)
|
||||||
@@ -295,11 +322,25 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc
|
|||||||
// not on r). If you find yourself adding a new JobKind that
|
// not on r). If you find yourself adding a new JobKind that
|
||||||
// needs delete authority, mirror the JobPrune pattern below
|
// needs delete authority, mirror the JobPrune pattern below
|
||||||
// — don't try to overload r.
|
// — don't try to overload r.
|
||||||
|
// Resolve bandwidth caps: per-job override (if set) wins over the
|
||||||
|
// host-wide caps last pushed via config.update. <=0 means no cap.
|
||||||
|
d.bwMu.Lock()
|
||||||
|
upKBps, downKBps := d.bwUpKBps, d.bwDownKBps
|
||||||
|
d.bwMu.Unlock()
|
||||||
|
if p.BandwidthUpKBps != nil {
|
||||||
|
upKBps = *p.BandwidthUpKBps
|
||||||
|
}
|
||||||
|
if p.BandwidthDownKBps != nil {
|
||||||
|
downKBps = *p.BandwidthDownKBps
|
||||||
|
}
|
||||||
|
|
||||||
r := runner.New(runner.Config{
|
r := runner.New(runner.Config{
|
||||||
ResticBin: d.resticBin,
|
ResticBin: d.resticBin,
|
||||||
RepoURL: creds.URL,
|
RepoURL: creds.URL,
|
||||||
RepoUsername: creds.Username,
|
RepoUsername: creds.Username,
|
||||||
RepoPassword: creds.Password,
|
RepoPassword: creds.Password,
|
||||||
|
LimitUploadKBps: upKBps,
|
||||||
|
LimitDownloadKBps: downKBps,
|
||||||
}, tx, time.Second)
|
}, tx, time.Second)
|
||||||
|
|
||||||
switch p.Kind {
|
switch p.Kind {
|
||||||
@@ -381,10 +422,12 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc
|
|||||||
runCreds = ac
|
runCreds = ac
|
||||||
}
|
}
|
||||||
prr := runner.New(runner.Config{
|
prr := runner.New(runner.Config{
|
||||||
ResticBin: d.resticBin,
|
ResticBin: d.resticBin,
|
||||||
RepoURL: runCreds.URL,
|
RepoURL: runCreds.URL,
|
||||||
RepoUsername: runCreds.Username,
|
RepoUsername: runCreds.Username,
|
||||||
RepoPassword: runCreds.Password,
|
RepoPassword: runCreds.Password,
|
||||||
|
LimitUploadKBps: upKBps,
|
||||||
|
LimitDownloadKBps: downKBps,
|
||||||
}, tx, time.Second)
|
}, tx, time.Second)
|
||||||
slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds)
|
slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds)
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
@@ -30,6 +30,12 @@ type Config struct {
|
|||||||
RepoURL string
|
RepoURL string
|
||||||
RepoUsername string
|
RepoUsername string
|
||||||
RepoPassword string
|
RepoPassword string
|
||||||
|
|
||||||
|
// Bandwidth caps in KB/s applied to every restic invocation.
|
||||||
|
// <=0 means "no cap". Per-job override: callers that build a
|
||||||
|
// runner per-dispatch can pass the override value here directly.
|
||||||
|
LimitUploadKBps int
|
||||||
|
LimitDownloadKBps int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runner owns the restic invocations.
|
// Runner owns the restic invocations.
|
||||||
@@ -54,10 +60,12 @@ func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
|
|||||||
// resticEnv builds the shared restic.Env from r.cfg.
|
// resticEnv builds the shared restic.Env from r.cfg.
|
||||||
func (r *Runner) resticEnv() restic.Env {
|
func (r *Runner) resticEnv() restic.Env {
|
||||||
return restic.Env{
|
return restic.Env{
|
||||||
Bin: r.cfg.ResticBin,
|
Bin: r.cfg.ResticBin,
|
||||||
RepoURL: r.cfg.RepoURL,
|
RepoURL: r.cfg.RepoURL,
|
||||||
RepoUsername: r.cfg.RepoUsername,
|
RepoUsername: r.cfg.RepoUsername,
|
||||||
RepoPassword: r.cfg.RepoPassword,
|
RepoPassword: r.cfg.RepoPassword,
|
||||||
|
LimitUploadKBps: r.cfg.LimitUploadKBps,
|
||||||
|
LimitDownloadKBps: r.cfg.LimitDownloadKBps,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -130,6 +130,19 @@ type CommandRunPayload struct {
|
|||||||
Tag string `json:"tag,omitempty"`
|
Tag string `json:"tag,omitempty"`
|
||||||
ForgetGroups []ForgetGroup `json:"forget_groups,omitempty"`
|
ForgetGroups []ForgetGroup `json:"forget_groups,omitempty"`
|
||||||
RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"`
|
RequiresAdminCreds bool `json:"requires_admin_creds,omitempty"`
|
||||||
|
|
||||||
|
// Per-job bandwidth caps in KB/s. When nil, the agent uses the
|
||||||
|
// host-wide caps it received via config.update. When non-nil,
|
||||||
|
// the override wins for this job only — even a non-nil zero
|
||||||
|
// pointer means "no cap for this job" (caller's explicit choice).
|
||||||
|
BandwidthUpKBps *int `json:"bandwidth_up_kbps,omitempty"`
|
||||||
|
BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"`
|
||||||
|
|
||||||
|
// Hooks run only for kind=backup. Server resolves source-group
|
||||||
|
// hook → host default → empty before dispatching, so the agent
|
||||||
|
// just executes whatever is here.
|
||||||
|
PreHook string `json:"pre_hook,omitempty"`
|
||||||
|
PostHook string `json:"post_hook,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommandCancelPayload is the server → agent cancel signal.
|
// CommandCancelPayload is the server → agent cancel signal.
|
||||||
@@ -306,6 +319,14 @@ type ConfigUpdatePayload struct {
|
|||||||
RepoCredential string `json:"repo_credential,omitempty"` // sensitive (for rest server basic auth)
|
RepoCredential string `json:"repo_credential,omitempty"` // sensitive (for rest server basic auth)
|
||||||
HookShell string `json:"hook_shell,omitempty"`
|
HookShell string `json:"hook_shell,omitempty"`
|
||||||
Slot string `json:"slot,omitempty"`
|
Slot string `json:"slot,omitempty"`
|
||||||
|
|
||||||
|
// Bandwidth caps in KB/s. Pointer semantics so the server can
|
||||||
|
// disambiguate "no change in this push" (nil → omitted on the
|
||||||
|
// wire) from "explicitly clear the cap" (zero or negative value).
|
||||||
|
// Applied to every restic invocation as --limit-upload /
|
||||||
|
// --limit-download. Per-job overrides ride on CommandRunPayload.
|
||||||
|
BandwidthUpKBps *int `json:"bandwidth_up_kbps,omitempty"`
|
||||||
|
BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// AgentUpdateAvailablePayload — informational only; the agent does
|
// AgentUpdateAvailablePayload — informational only; the agent does
|
||||||
|
|||||||
+38
-23
@@ -47,6 +47,37 @@ type Env struct {
|
|||||||
RepoPassword string // doubles as RESTIC_PASSWORD and (for rest:) HTTP basic-auth password
|
RepoPassword string // doubles as RESTIC_PASSWORD and (for rest:) HTTP basic-auth password
|
||||||
ExtraEnv map[string]string // any other RESTIC_* / passthrough
|
ExtraEnv map[string]string // any other RESTIC_* / passthrough
|
||||||
WorkDir string // CWD; default = current
|
WorkDir string // CWD; default = current
|
||||||
|
|
||||||
|
// Bandwidth caps in KB/s. <=0 means "no cap" (omit the flag).
|
||||||
|
// Emitted as restic global flags --limit-upload / --limit-download
|
||||||
|
// before the subcommand on every invocation.
|
||||||
|
LimitUploadKBps int
|
||||||
|
LimitDownloadKBps int
|
||||||
|
}
|
||||||
|
|
||||||
|
// globalArgs returns restic's pre-subcommand global flags derived
|
||||||
|
// from the Env. Currently just bandwidth caps.
|
||||||
|
func (e Env) globalArgs() []string {
|
||||||
|
var out []string
|
||||||
|
if e.LimitUploadKBps > 0 {
|
||||||
|
out = append(out, "--limit-upload", fmt.Sprintf("%d", e.LimitUploadKBps))
|
||||||
|
}
|
||||||
|
if e.LimitDownloadKBps > 0 {
|
||||||
|
out = append(out, "--limit-download", fmt.Sprintf("%d", e.LimitDownloadKBps))
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// resticCmd builds an exec.Cmd with bandwidth-limit globals prefixed
|
||||||
|
// before the supplied subcommand args. Centralizing this so every
|
||||||
|
// command (backup/forget/prune/check/unlock/init/stats) honors
|
||||||
|
// the caps without each call site having to remember.
|
||||||
|
func (e Env) resticCmd(ctx context.Context, sub ...string) *exec.Cmd {
|
||||||
|
args := append(e.globalArgs(), sub...)
|
||||||
|
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
||||||
|
cmd.Env = e.envSlice()
|
||||||
|
cmd.Dir = e.WorkDir
|
||||||
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventKind enumerates what we care about in restic's --json output
|
// EventKind enumerates what we care about in restic's --json output
|
||||||
@@ -110,9 +141,7 @@ func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, hand
|
|||||||
}
|
}
|
||||||
args = append(args, paths...)
|
args = append(args, paths...)
|
||||||
|
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
cmd := e.resticCmd(ctx, args...)
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
|
|
||||||
stdout, err := cmd.StdoutPipe()
|
stdout, err := cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -215,9 +244,7 @@ func (e Env) RunForget(ctx context.Context, groups []ForgetGroup, handle LineHan
|
|||||||
}
|
}
|
||||||
args := []string{"forget", "--json", "--tag", g.Tag}
|
args := []string{"forget", "--json", "--tag", g.Tag}
|
||||||
args = append(args, g.Policy.args()...)
|
args = append(args, g.Policy.args()...)
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
cmd := e.resticCmd(ctx, args...)
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
if err := runWithPump(cmd, handle); err != nil {
|
if err := runWithPump(cmd, handle); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -232,9 +259,7 @@ func (e Env) RunForget(ctx context.Context, groups []ForgetGroup, handle LineHan
|
|||||||
// <id> at <url>" on success, "config file already exists" on a
|
// <id> at <url>" on success, "config file already exists" on a
|
||||||
// re-init attempt, etc.).
|
// re-init attempt, etc.).
|
||||||
func (e Env) RunInit(ctx context.Context, handle LineHandler) error {
|
func (e Env) RunInit(ctx context.Context, handle LineHandler) error {
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, "init")
|
cmd := e.resticCmd(ctx, "init")
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
|
|
||||||
// Sniff for "config file already exists" on stderr; if we see it
|
// Sniff for "config file already exists" on stderr; if we see it
|
||||||
// we'll treat the non-zero exit as a soft success — running init
|
// we'll treat the non-zero exit as a soft success — running init
|
||||||
@@ -272,10 +297,7 @@ func (e Env) RunInit(ctx context.Context, handle LineHandler) error {
|
|||||||
// support that's useful for our purposes). We tee everything to the
|
// support that's useful for our purposes). We tee everything to the
|
||||||
// handler so the live log is the operator's progress bar.
|
// handler so the live log is the operator's progress bar.
|
||||||
func (e Env) RunPrune(ctx context.Context, handle LineHandler) error {
|
func (e Env) RunPrune(ctx context.Context, handle LineHandler) error {
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, "prune")
|
return runWithPump(e.resticCmd(ctx, "prune"), handle)
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
return runWithPump(cmd, handle)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// runWithPump starts the configured cmd, fans stdout+stderr into
|
// runWithPump starts the configured cmd, fans stdout+stderr into
|
||||||
@@ -313,10 +335,7 @@ func runWithPump(cmd *exec.Cmd, handle LineHandler) error {
|
|||||||
|
|
||||||
// RunUnlock executes `restic unlock`. Returns nil on a clean exit.
|
// RunUnlock executes `restic unlock`. Returns nil on a clean exit.
|
||||||
func (e Env) RunUnlock(ctx context.Context, handle LineHandler) error {
|
func (e Env) RunUnlock(ctx context.Context, handle LineHandler) error {
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, "unlock")
|
return runWithPump(e.resticCmd(ctx, "unlock"), handle)
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
return runWithPump(cmd, handle)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RepoStats mirrors `restic stats --json --mode raw-data` output.
|
// RepoStats mirrors `restic stats --json --mode raw-data` output.
|
||||||
@@ -333,9 +352,7 @@ type RepoStats struct {
|
|||||||
// caller can still log it. Returns an error if no JSON-shaped line
|
// caller can still log it. Returns an error if no JSON-shaped line
|
||||||
// arrived on stdout.
|
// arrived on stdout.
|
||||||
func (e Env) RunStats(ctx context.Context, handle LineHandler) (*RepoStats, error) {
|
func (e Env) RunStats(ctx context.Context, handle LineHandler) (*RepoStats, error) {
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, "stats", "--json", "--mode", "raw-data")
|
cmd := e.resticCmd(ctx, "stats", "--json", "--mode", "raw-data")
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
var out *RepoStats
|
var out *RepoStats
|
||||||
capture := func(stream, line string, ev any) {
|
capture := func(stream, line string, ev any) {
|
||||||
if stream == "stdout" && strings.HasPrefix(line, "{") {
|
if stream == "stdout" && strings.HasPrefix(line, "{") {
|
||||||
@@ -378,9 +395,7 @@ func (e Env) RunCheck(ctx context.Context, subsetPct int, handle LineHandler) (C
|
|||||||
if subsetPct > 0 {
|
if subsetPct > 0 {
|
||||||
args = append(args, "--read-data-subset", fmt.Sprintf("%d%%", subsetPct))
|
args = append(args, "--read-data-subset", fmt.Sprintf("%d%%", subsetPct))
|
||||||
}
|
}
|
||||||
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
cmd := e.resticCmd(ctx, args...)
|
||||||
cmd.Env = e.envSlice()
|
|
||||||
cmd.Dir = e.WorkDir
|
|
||||||
|
|
||||||
var res CheckResult
|
var res CheckResult
|
||||||
sniff := func(stream, line string, ev any) {
|
sniff := func(stream, line string, ev any) {
|
||||||
|
|||||||
@@ -174,6 +174,43 @@ func TestRunStatsErrorsWithoutJSON(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBandwidthLimitFlagsInjected(t *testing.T) {
|
||||||
|
// Script echoes its argv to stdout. Each variant should produce
|
||||||
|
// the right --limit-* flags before the subcommand.
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
env Env
|
||||||
|
want []string
|
||||||
|
}{
|
||||||
|
{"both caps", Env{LimitUploadKBps: 1024, LimitDownloadKBps: 512}, []string{"--limit-upload 1024", "--limit-download 512"}},
|
||||||
|
{"only upload", Env{LimitUploadKBps: 256}, []string{"--limit-upload 256"}},
|
||||||
|
{"zero means omit", Env{LimitUploadKBps: 0, LimitDownloadKBps: 0}, nil},
|
||||||
|
{"negative means omit", Env{LimitUploadKBps: -1}, nil},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
bin := setupScriptBin(t, `echo "$@"`)
|
||||||
|
env := c.env
|
||||||
|
env.Bin = bin
|
||||||
|
lines, h := captureLines()
|
||||||
|
if err := env.RunUnlock(context.Background(), h); err != nil {
|
||||||
|
t.Fatalf("RunUnlock: %v", err)
|
||||||
|
}
|
||||||
|
joined := strings.Join(*lines, "\n")
|
||||||
|
for _, want := range c.want {
|
||||||
|
if !strings.Contains(joined, want) {
|
||||||
|
t.Fatalf("want %q in argv; got: %s", want, joined)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(c.want) == 0 {
|
||||||
|
if strings.Contains(joined, "--limit-upload") || strings.Contains(joined, "--limit-download") {
|
||||||
|
t.Fatalf("expected no limit flags; got: %s", joined)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRunStatsZeroSnapshots(t *testing.T) {
|
func TestRunStatsZeroSnapshots(t *testing.T) {
|
||||||
// Confirms RunStats succeeds and returns a valid *RepoStats when the
|
// Confirms RunStats succeeds and returns a valid *RepoStats when the
|
||||||
// repo has no snapshots (snapshots_count=0). A regression that
|
// repo has no snapshots (snapshots_count=0). A regression that
|
||||||
|
|||||||
@@ -58,5 +58,10 @@ func (s *Server) handleUpdateHostBandwidth(w stdhttp.ResponseWriter, r *stdhttp.
|
|||||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Fan out to the agent if connected. Errors are non-fatal — the
|
||||||
|
// next reconnect's onAgentHello will resync.
|
||||||
|
if s.deps.Hub != nil && s.deps.Hub.Connected(hostID) {
|
||||||
|
_ = s.pushBandwidthToAgent(r.Context(), hostID, req.BandwidthUpKBps, req.BandwidthDownKBps)
|
||||||
|
}
|
||||||
writeJSON(w, stdhttp.StatusOK, hostBandwidthView(req))
|
writeJSON(w, stdhttp.StatusOK, hostBandwidthView(req))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,78 @@
|
|||||||
|
// host_bandwidth_push.go — server → agent fan-out of host-wide
|
||||||
|
// bandwidth caps via config.update.
|
||||||
|
//
|
||||||
|
// Two entry points: pushBandwidthOnHello (called from onAgentHello,
|
||||||
|
// always pushes the current state so the agent picks up edits made
|
||||||
|
// while it was offline) and pushBandwidthToAgent (called after the
|
||||||
|
// PUT bandwidth handler succeeds, so an online agent re-arms within
|
||||||
|
// seconds).
|
||||||
|
//
|
||||||
|
// We always send pointer fields (zero-valued when uncapped) so the
|
||||||
|
// agent can distinguish "no change" (nil → field absent on the wire)
|
||||||
|
// from "explicitly cleared" (non-nil zero pointer). See
|
||||||
|
// api.ConfigUpdatePayload doc for the wire semantics.
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
||||||
|
)
|
||||||
|
|
||||||
|
// pushBandwidthOnHello ships the host's current bandwidth caps as a
|
||||||
|
// config.update on the supplied conn. Silent no-op on lookup error.
|
||||||
|
func (s *Server) pushBandwidthOnHello(ctx context.Context, hostID string, conn *ws.Conn) {
|
||||||
|
host, err := s.deps.Store.GetHost(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("on-hello: load host for bandwidth", "host_id", hostID, "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
payload := bandwidthPayload(host.BandwidthUpKBps, host.BandwidthDownKBps)
|
||||||
|
env, err := api.Marshal(api.MsgConfigUpdate, "", payload)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("on-hello: marshal bandwidth config.update", "host_id", hostID, "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := conn.Send(sendCtx, env); err != nil {
|
||||||
|
slog.Warn("on-hello: send bandwidth config.update", "host_id", hostID, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pushBandwidthToAgent ships the supplied caps via the hub. Caller is
|
||||||
|
// expected to check Hub.Connected first when it matters.
|
||||||
|
func (s *Server) pushBandwidthToAgent(ctx context.Context, hostID string, up, down *int) error {
|
||||||
|
env, err := api.Marshal(api.MsgConfigUpdate, "", bandwidthPayload(up, down))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
return s.deps.Hub.Send(sendCtx, hostID, env)
|
||||||
|
}
|
||||||
|
|
||||||
|
// bandwidthPayload builds a ConfigUpdatePayload with only the
|
||||||
|
// bandwidth fields populated. Pointers are passed through verbatim;
|
||||||
|
// callers wanting to clear a cap should pass a non-nil pointer to 0.
|
||||||
|
// On the on-hello path we materialize zero-valued pointers when the
|
||||||
|
// host record has no cap set, so the agent's stored state is always
|
||||||
|
// in sync (rather than retaining whatever value it last received).
|
||||||
|
func bandwidthPayload(up, down *int) api.ConfigUpdatePayload {
|
||||||
|
zero := 0
|
||||||
|
upPtr := up
|
||||||
|
if upPtr == nil {
|
||||||
|
upPtr = &zero
|
||||||
|
}
|
||||||
|
downPtr := down
|
||||||
|
if downPtr == nil {
|
||||||
|
downPtr = &zero
|
||||||
|
}
|
||||||
|
return api.ConfigUpdatePayload{
|
||||||
|
BandwidthUpKBps: upPtr,
|
||||||
|
BandwidthDownKBps: downPtr,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -399,6 +399,10 @@ func (s *Server) pushAdminCredsToAgent(ctx context.Context, hostID string) error
|
|||||||
// don't race a brand-new register against an old still-closing conn.
|
// don't race a brand-new register against an old still-closing conn.
|
||||||
func (s *Server) onAgentHello(ctx context.Context, hostID string, conn *ws.Conn) {
|
func (s *Server) onAgentHello(ctx context.Context, hostID string, conn *ws.Conn) {
|
||||||
s.pushRepoCredsOnHello(ctx, hostID, conn)
|
s.pushRepoCredsOnHello(ctx, hostID, conn)
|
||||||
|
// Bandwidth caps are sent unconditionally so an agent that
|
||||||
|
// reconnects after a cap edit picks up the new state without
|
||||||
|
// waiting for the next bandwidth PUT.
|
||||||
|
s.pushBandwidthOnHello(ctx, hostID, conn)
|
||||||
// Push the current schedule set in the same on-hello window so
|
// Push the current schedule set in the same on-hello window so
|
||||||
// the agent's local cron is in sync before any command.run lands.
|
// the agent's local cron is in sync before any command.run lands.
|
||||||
// An empty schedule list is a valid push: it tells the agent to
|
// An empty schedule list is a valid push: it tells the agent to
|
||||||
|
|||||||
Reference in New Issue
Block a user