ddc07609cb
P2R-13a. restic.Env gains LimitUploadKBps/LimitDownloadKBps which are
emitted as global --limit-upload/--limit-download flags before the
subcommand on every invocation. Agent dispatcher tracks host-wide
caps received via config.update; server pushes them on hello and
after PUT /api/hosts/{id}/bandwidth.
Also extends api.CommandRunPayload with optional per-job overrides
(BandwidthUpKBps/Down + PreHook/PostHook); the override consumers
land in T2/T6.
79 lines
2.8 KiB
Go
79 lines
2.8 KiB
Go
// host_bandwidth_push.go — server → agent fan-out of host-wide
|
|
// bandwidth caps via config.update.
|
|
//
|
|
// Two entry points: pushBandwidthOnHello (called from onAgentHello,
|
|
// always pushes the current state so the agent picks up edits made
|
|
// while it was offline) and pushBandwidthToAgent (called after the
|
|
// PUT bandwidth handler succeeds, so an online agent re-arms within
|
|
// seconds).
|
|
//
|
|
// We always send pointer fields (zero-valued when uncapped) so the
|
|
// agent can distinguish "no change" (nil → field absent on the wire)
|
|
// from "explicitly cleared" (non-nil zero pointer). See
|
|
// api.ConfigUpdatePayload doc for the wire semantics.
|
|
package http
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
|
)
|
|
|
|
// pushBandwidthOnHello ships the host's current bandwidth caps as a
|
|
// config.update on the supplied conn. Silent no-op on lookup error.
|
|
func (s *Server) pushBandwidthOnHello(ctx context.Context, hostID string, conn *ws.Conn) {
|
|
host, err := s.deps.Store.GetHost(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("on-hello: load host for bandwidth", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
payload := bandwidthPayload(host.BandwidthUpKBps, host.BandwidthDownKBps)
|
|
env, err := api.Marshal(api.MsgConfigUpdate, "", payload)
|
|
if err != nil {
|
|
slog.Error("on-hello: marshal bandwidth config.update", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
if err := conn.Send(sendCtx, env); err != nil {
|
|
slog.Warn("on-hello: send bandwidth config.update", "host_id", hostID, "err", err)
|
|
}
|
|
}
|
|
|
|
// pushBandwidthToAgent ships the supplied caps via the hub. Caller is
|
|
// expected to check Hub.Connected first when it matters.
|
|
func (s *Server) pushBandwidthToAgent(ctx context.Context, hostID string, up, down *int) error {
|
|
env, err := api.Marshal(api.MsgConfigUpdate, "", bandwidthPayload(up, down))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
return s.deps.Hub.Send(sendCtx, hostID, env)
|
|
}
|
|
|
|
// bandwidthPayload builds a ConfigUpdatePayload with only the
|
|
// bandwidth fields populated. Pointers are passed through verbatim;
|
|
// callers wanting to clear a cap should pass a non-nil pointer to 0.
|
|
// On the on-hello path we materialize zero-valued pointers when the
|
|
// host record has no cap set, so the agent's stored state is always
|
|
// in sync (rather than retaining whatever value it last received).
|
|
func bandwidthPayload(up, down *int) api.ConfigUpdatePayload {
|
|
zero := 0
|
|
upPtr := up
|
|
if upPtr == nil {
|
|
upPtr = &zero
|
|
}
|
|
downPtr := down
|
|
if downPtr == nil {
|
|
downPtr = &zero
|
|
}
|
|
return api.ConfigUpdatePayload{
|
|
BandwidthUpKBps: upPtr,
|
|
BandwidthDownKBps: downPtr,
|
|
}
|
|
}
|